CompletableFuture是java8开始提供的API,解决了在多线程场景下繁琐的代码逻辑,日常开发中,我们都会用到线程池,一般会用execute()和submit()方法提交任务。但是当你用过CompletableFuture之后,就会发现以前的线程池处理任务有多难用,功能有多简陋,CompletableFuture又是多么简洁优雅。

要知道CompletableFuture已经随着Java8发布7年了,使用CompletableFuture 更便利使用多线程。

1. 使用线程池处理任务

@Testpublic void Test27(){// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(3);List list = Arrays.asList(1, 2, 3);// 2. 提交任务,并调用join()阻塞等待所有任务执行完成CompletableFuture.allOf(list.stream().map(key ->CompletableFuture.runAsync(() -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(1000L);} catch (InterruptedException e) {}System.out.println("关注微信公众号Java编程Code,获取更多学习资料" + key);}, executorService)).toArray(CompletableFuture[]::new)).join();executorService.shutdown();}

这样写法需要关注在于

  1. 获取结果时,调用的future.get()方法,会阻塞当前线程,直到返回结果,大大降低性能

  2. 有一半的代码在写怎么使用线程,其实我们不应该关心怎么使用线程,更应该关注任务的处理

2. 使用CompletableFuture重构任务处理

@Testpublic void Test28(){// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(3);List list = Arrays.asList(1, 2, 3);for (Integer key : list) {// 2. 提交任务CompletableFuture.supplyAsync(() -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(1000L);} catch (InterruptedException e) {}return "关注微信公众号Java编程Code,获取更多学习资料 " + key;}, executorService).whenCompleteAsync((result, exception) -> {// 3. 获取结果System.out.println(result);});;}executorService.shutdown();// 由于whenCompleteAsync获取结果的方法是异步的,所以要阻塞当前线程才能输出结果try {Thread.sleep(2000L);} catch (InterruptedException e) {e.printStackTrace();}}

输出结果:

代码中使用了CompletableFuture的两个方法,
supplyAsync()方法作用是提交异步任务,有两个传参,任务和自定义线程池。
whenCompleteAsync()方法作用是异步获取结果,也有两个传参,结果和异常信息。

代码经过CompletableFuture改造后,是多么的简洁优雅。
提交任务也不用再关心线程池是怎么使用了,获取结果也不用再阻塞当前线程了

还想同步获取异步多线程返回结果,可以使用whenComplete()方法,或者单独调用join()方法。
join()方法配合lamba Stream流是这样用的:

@Testpublic void Test29(){// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(3);List list = Arrays.asList(1, 2, 3);// 2. 提交任务List results = list.stream().map(key ->CompletableFuture.supplyAsync(() -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(1000L);} catch (InterruptedException e) {}return "关注微信公众号Java编程Code,获取更多学习资料" + key;}, executorService)).map(CompletableFuture::join).collect(Collectors.toList());executorService.shutdown();// 3. 获取结果System.out.println(results);}

输出结果:

多么的简洁优雅啊!原来executorService.submit()这种使用线程池的方式,可以彻底丢掉了

3. CompletableFuture更多妙用

3.1 等待所有任务执行完成

如果让你实现等待所有任务线程执行完成,再进行下一步操作,你会怎么做?
我猜你一定会使用 线程池+CountDownLatch,像下面这样:

 @Testpublic void Test30(){// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(3);List list = Arrays.asList(1, 2, 3);CountDownLatch countDownLatch = new CountDownLatch(list.size());for (Integer key : list) {// 2. 提交任务executorService.execute(() -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(1000L);} catch (InterruptedException e) {}System.out.println("关注微信公众号Java编程Code,获取更多学习资料" + key);countDownLatch.countDown();});}executorService.shutdown();// 3. 阻塞等待所有任务执行完成try {countDownLatch.await();} catch (InterruptedException e) {}}

输出结果:

采用Java8的写法,看一下使用CompletableFuture是怎么重构的:

@Testpublic void Test31(){// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(3);List list = Arrays.asList(1, 2, 3);// 2. 提交任务,并调用join()阻塞等待所有任务执行完成CompletableFuture.allOf(list.stream().map(key ->CompletableFuture.runAsync(() -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(1000L);} catch (InterruptedException e) {}System.out.println("关注微信公众号Java编程Code,获取更多学习资料" + key);}, executorService)).toArray(CompletableFuture[]::new)).join();executorService.shutdown();}

输出结果:

代码看着有点乱,其实逻辑很清晰。

  1. 遍历list集合,提交CompletableFuture任务,把结果转换成数组

  2. 再把数组放到CompletableFuture的allOf()方法里面

  3. 最后调用join()方法阻塞等待所有任务执行完成

CompletableFuture的allOf()方法的作用就是,等待所有任务处理完成。
这样写是不是简洁优雅了许多?

3.2 任何一个任务处理完成就返回

如果要实现这样一个需求,往线程池提交一批任务,只要有其中一个任务处理完成就返回。
该怎么做?如果你手动实现这个逻辑的话,代码肯定复杂且低效,有了CompletableFuture就非常简单了,只需调用anyOf()方法就行了

@Testpublic void Test32(){// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(3);List list = Arrays.asList(1, 2, 3,4,5);long start = System.currentTimeMillis();// 2. 提交任务CompletableFuture completableFuture = CompletableFuture.anyOf(list.stream().map(key ->CompletableFuture.supplyAsync(() -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(2000L);} catch (InterruptedException e) {}return "关注微信公众号Java编程Code,获取更多学习资料" + key;}, executorService)).toArray(CompletableFuture[]::new));executorService.shutdown();// 3. 获取结果System.out.println(completableFuture.join());}

输出结果:

3.3 一个线程执行完成,交给另一个线程接着执行

有这么一个需求:
一个线程处理完成,把处理的结果交给另一个线程继续处理,怎么实现?

你是不是想到了一堆工具,线程池、CountDownLatch、Semaphore、ReentrantLock、Synchronized,该怎么进行组合使用呢?AB组合还是BC组合?

别瞎想了,来看一下CompletableFuture是怎么用的

@Testpublic void Test33(){// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(3);// 2. 提交任务,并调用join()阻塞等待任务执行完成String result2 = CompletableFuture.supplyAsync(() -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(1000L);} catch (InterruptedException e) {}return "关注微信公众号Java编程Code,获取更多学习资料结果1";}, executorService).thenApplyAsync(result1 -> {// 睡眠一秒,模仿处理过程try {Thread.sleep(1000L);} catch (InterruptedException e) {}return result1 + "关注微信公众号Java编程Code,获取更多学习资料结果2";}, executorService).join();executorService.shutdown();// 3. 获取结果System.out.println(result2);}

输出结果:

代码主要用到了CompletableFuture的thenApplyAsync()方法,作用就是异步处理上一个线程的结果。

CompletableFuture真是太方便了

这么好用的CompletableFuture还有很多其他API。

4. CompletableFuture常用API

4.1 CompletableFuture常用API说明

  1. 提交任务
    supplyAsync
    runAsync

  2. 接力处理

    thenRun thenRunAsync
    thenAccept thenAcceptAsync
    thenApply thenApplyAsync
    handle handleAsync
    applyToEither applyToEitherAsync
    acceptEither acceptEitherAsync
    runAfterEither runAfterEitherAsync
    thenCombine thenCombineAsync
    thenAcceptBoth thenAcceptBothAsync

API太多,有点眼花缭乱,很容易分类。
带run的方法,无入参,无返回值。
带accept的方法,有入参,无返回值。
带supply的方法,无入参,有返回值。
带apply的方法,有入参,有返回值。
带handle的方法,有入参,有返回值,并且带异常处理。
以Async结尾的方法,都是异步的,否则是同步的。
以Either结尾的方法,只需完成任意一个。
以Both/Combine结尾的方法,必须所有都完成。

  1. 获取结果
    join 阻塞等待,不会抛异常
    get 阻塞等待,会抛异常
    complete(T value) 不阻塞,如果任务已完成,返回处理结果。如果没完成,则返回传参value。
    completeExceptionally(Throwable ex) 不阻塞,如果任务已完成,返回处理结果。如果没完成,抛异常。

CompletableFuture 多线程巧妙使用分享就到此结束了,多谢阅览。

Copyright © maxssl.com 版权所有 浙ICP备2022011180号