前言

创建线程的方式只有两种:继承Thread或者实现Runnable接口。 但是这两种方法都存在一个缺陷,没有返回值

Java 1.5 以后,可以通过向线程池提交一个Callable来获取一个包含返回值的Future对象

Future接口的局限性

当Future的线程进行了一个非常耗时的操作,那我们的主线程也就阻塞了。

当我们在简单业务上,可以使用Future的另一个重载方法get(long,TimeUnit)来设置超时时间,避免我们的主线程被无穷尽地阻塞。

单纯使用Future接口或者FutureTask类并不能很好地完成以下我们所需的业务

  • 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
  • 等待Future集合中的所有任务都完成。
  • 仅等待Future集合种最快结束的任务完成,并返回它的结果。
  • 通过编程方式完成一个Future任务的执行
  • 当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果

什么是CompletableFuture

在Java 8中, 新增类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果

CompletableFuture被设计在Java中进行异步编程。主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

  • CompletableFuture实现了Future接口,因此有异步执行返回结果的能力。
  • CompletableFuture实现了CompletionStage接口,该接口是Java8新增得一个接口,用于异步执行中的阶段处理,其大量用在Lambda表达式计算过程中,目前只有CompletableFuture一个实现类。
public class CompletableFuture implements Future, CompletionStage {

方法命名规则

  • 带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程

  • 带有Apply标识方法都是可以获取返回值+有返回值的

  • 带有Accept标识方法都是可以获取返回值

  • 带有run标识的方法不可以获取返回值和无返回值,只是运行

get方法和join方法

  • join:阻塞获取结果或抛出非受检异常。
  • get: 阻塞获取结果或抛出受检测异常,需要显示进行try…catch处理

不同线程池使用

默认线程池执行

/** * 默认线程池 * 运行结果: * main.................start..... * main.................end...... * 当前线程:ForkJoinPool.commonPool-worker-9 * 运行结果:5 */@Testpublic void defaultThread() {    System.out.println("main.................start.....");    CompletableFuture.runAsync(() -> {        System.out.println("当前线程:" + Thread.currentThread().getName());        int i = 10 / 2;        System.out.println("运行结果:" + i);    });    System.out.println("main.................end......");}

默认使用 ForkJoinPool.commonPool(),commonPool是一个会被很多任务共享的线程池,commonPool 设计时的目标场景是运行 非阻塞的 CPU 密集型任务,为最大化利用 CPU,其线程数默认为 CPU 数量- 1

  • 哪些地方使用了commonPool
    • CompletableFuture
    • Parallel Streams
  • 为什么要引入commonPool
    • 为了避免任何并行操作都引入一个线程池,最坏情况会导致在单个JVM上创建了太多的池线程,降低效率。
  • commonPool线程池是怎么创建和使用的
    • ForkJoinTask一定会运行在一个ForkJoinPool中,如果没有显式地交它提交到ForkJoinPool,会使用一个common池(全进程共享)来执行任务。

自定义线程池执行

自定义一个线程池

private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,        0L, TimeUnit.MILLISECONDS,        new LinkedBlockingQueue());

使用定义的线程池

/** * 自定义线程池 * 运行结果: * main.................start..... * main.................end...... * 当前线程:pool-1-thread-1 * 运行结果:5 */@Testpublic void myThread() {    System.out.println("main.................start.....");    CompletableFuture.runAsync(() -> {        System.out.println("当前线程:" + Thread.currentThread().getName());        int i = 10 / 2;        System.out.println("运行结果:" + i);    },executor);    System.out.println("main.................end......");}

开启一个异步

runAsync-无返回值

使用runAsync开启一个异步任务线程,该方法无结果返回,适合一些不需要结果的异步任务

/*** * 无返回值 *  runAsync *  结果: * main.................start..... * main.................end...... * 当前线程:33 * 运行结果:5 */@Testpublic void runAsync() {    System.out.println("main.................start.....");    CompletableFuture.runAsync(() -> {        System.out.println("当前线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("运行结果:" + i);    }, executor);    System.out.println("main.................end......");}

supplyAsync-有返回值

使用completableFuture.get()方法获取结果,这时程序会阻塞到这里直到结果返回。

/** * 有返回值 * supplyAsync * 结果: * main.................start..... * 当前线程:33 * 运行结果:5 * main.................end.....5 */@Testpublic void supplyAsync() throws ExecutionException, InterruptedException {    System.out.println("main.................start.....");    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {        System.out.println("当前线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("运行结果:" + i);        return i;    }, executor);    System.out.println("main.................end....." + completableFuture.get());}

如果要超时就得往下执行,请使用completableFuture.get(long timeout, TimeUnit unit)方法。

线程串行化方法

带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程

thenApply-上面任务执行完执行+获取返回值+有返回值

/** * 上面任务执行完执行+可以拿到结果+可以返回值 * 结果: * thenApplyAsync当前线程:33 * thenApplyAsync运行结果:5 * thenApplyAsync任务2启动了。。。。。上步结果:5 * main.................end.....hello10 *  * @throws ExecutionException * @throws InterruptedException */@Testpublic void thenApplyAsync() throws ExecutionException, InterruptedException {    CompletableFuture thenApplyAsync = CompletableFuture.supplyAsync(() -> {        System.out.println("thenApplyAsync当前线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("thenApplyAsync运行结果:" + i);        return i;    }, executor).thenApplyAsync(result -> {        System.out.println("thenApplyAsync任务2启动了。。。。。上步结果:" + result);        return "hello" + result * 2;    }, executor);    System.out.println("main.................end....." + thenApplyAsync.get());}

thenAccept-上面任务执行完执行+获取返回值

/** * 上面任务执行完执行+可以拿到结果 * 结果: * thenAcceptAsync当前线程:33 * thenAcceptAsync运行结果:5 * thenAcceptAsync任务2启动了。。。。。上步结果:5 * @throws ExecutionException * @throws InterruptedException */@Testpublic void thenAcceptAsync() throws ExecutionException, InterruptedException {    CompletableFuture thenAcceptAsync = CompletableFuture.supplyAsync(() -> {        System.out.println("thenAcceptAsync当前线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("thenAcceptAsync运行结果:" + i);        return i;    }, executor).thenAcceptAsync(result -> {        System.out.println("thenAcceptAsync任务2启动了。。。。。上步结果:" + result);    }, executor);}

thenRun-上面任务执行完执行

/** * 上面任务执行完执行 * 结果 * main.................start..... * 当前线程:33 * 运行结果:5 * 任务2启动了。。。。。 */@Testpublic void thenRunAsync() throws ExecutionException, InterruptedException {    System.out.println("main.................start.....");    CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {        System.out.println("当前线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("运行结果:" + i);        return i;    }, executor).thenRunAsync(() -> {        System.out.println("任务2启动了。。。。。");    }, executor);}

thenCompose-接收返回值并生成新的任务

当原任务完成后接收返回值,返回一个新的任务

  • thenApply()转换的是泛型中的类型,相当于将CompletableFuture 转换生成新的CompletableFuture
  • thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
/** * 当原任务完成后接收返回值,返回一个新的任务 * 结果: * hello: thenCompose */@Testpublic void thenCompose() {    CompletableFuture cf = CompletableFuture.completedFuture("hello")            .thenCompose(str -> CompletableFuture.supplyAsync(() -> {                return str + ": thenCompose";            },executor));    System.out.println(cf.join());}

任务组合

thenCombine-消费两个结果+返回结果

/** * 两任务组合 都要完成 * completableFuture.thenCombine()获取两个future返回结果,有返回值 * 结果: * 任务1线程:33 * 任务1运行结果:5 * 任务2线程:34 * 任务2运行结果: * 任务5启动。。。结果1:5。。。结果2:hello * 任务5结果hello-->5 */@Testpublic void thenCombine() throws ExecutionException, InterruptedException {    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务1线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("任务1运行结果:" + i);        return i;    }, executor);    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务2线程:" + Thread.currentThread().getId());        System.out.println("任务2运行结果:");        return "hello";    }, executor);    CompletableFuture thenCombineAsync = future1.thenCombineAsync(future2, (result1, result2) -> {        System.out.println("任务5启动。。。结果1:" + result1 + "。。。结果2:" + result2);        return result2 + "-->" + result1;    }, executor);    System.out.println("任务5结果" + thenCombineAsync.get());}

thenAcceptBoth-消费两个结果+无返回

/** * 两任务组合 都要完成 * completableFuture.thenAcceptBoth() 获取两个future返回结果,无返回值 * 结果: * 任务1线程:33 * 任务1运行结果:5 * 任务2线程:34 * 任务2运行结果: * 任务4启动。。。结果1:5。。。结果2:hello */@Testpublic void thenAcceptBothAsync() throws ExecutionException, InterruptedException {    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务1线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("任务1运行结果:" + i);        return i;    }, executor);    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务2线程:" + Thread.currentThread().getId());        System.out.println("任务2运行结果:");        return "hello";    }, executor);    CompletableFuture thenAcceptBothAsync = future1.thenAcceptBothAsync(future2, (result1, result2) -> {        System.out.println("任务4启动。。。结果1:" + result1 + "。。。结果2:" + result2);    }, executor);}

runAfterBoth-两个任务完成接着运行

/** * 两任务组合 都要完成 * completableFuture.runAfterBoth() 组合两个future * 结果: * 任务1线程:33 * 任务1运行结果:5 * 任务2线程:34 * 任务2运行结果: * 任务3启动。。。 */@Testpublic void runAfterBothAsync() {    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务1线程:" + Thread.currentThread().getId());        int i = 10 / 2;        System.out.println("任务1运行结果:" + i);        return i;    }, executor);    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务2线程:" + Thread.currentThread().getId());        System.out.println("任务2运行结果:");        return "hello";    }, executor);    CompletableFuture runAfterBothAsync = future1.runAfterBothAsync(future2, () -> {        System.out.println("任务3启动。。。");    }, executor);}

两任务完成一个就执行

applyToEither-其中一个执行完执行+获取返回值+有返回值

/** * 两任务组合,一个任务完成就执行 * objectCompletableFuture.applyToEither() 其中一个执行完执行+获取返回值+有返回值 * 结果: * 任务1线程:33 * 任务2线程:34 * 任务2运行结果: * 任务5开始执行。。。结果:hello * 任务5结果:hello world * 

* Process finished with exit code 0 */@Testpublic void applyToEither() throws ExecutionException, InterruptedException { CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程:" + Thread.currentThread().getId()); int i = 10 / 2; try { Thread.sleep(3000); System.out.println("任务1运行结果:" + i); } catch (InterruptedException e) { e.printStackTrace(); } return i; }, executor); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程:" + Thread.currentThread().getId()); System.out.println("任务2运行结果:"); return "hello"; }, executor); CompletableFuture applyToEitherAsync = future1.applyToEitherAsync(future2, result -> { System.out.println("任务5开始执行。。。结果:" + result); return result.toString() + " world"; }, executor); System.out.println("任务5结果:" + applyToEitherAsync.get());}

acceptEither-其中一个执行完执行+获取返回值

/** * 两任务组合,一个任务完成就执行 * objectCompletableFuture.acceptEither() 其中一个执行完执行+获取返回值 * 结果: * 任务1线程:33 * 任务2线程:34 * 任务2运行结果: * 任务4开始执行。。。结果:hello */@Testpublic void acceptEither() {    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务1线程:" + Thread.currentThread().getId());        int i = 10 / 2;        try {            Thread.sleep(3000);            System.out.println("任务1运行结果:" + i);        } catch (InterruptedException e) {            e.printStackTrace();        }        return i;    }, executor);    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务2线程:" + Thread.currentThread().getId());        System.out.println("任务2运行结果:");        return "hello";    }, executor);    CompletableFuture acceptEitherAsync = future1.acceptEitherAsync(future2, result -> {        System.out.println("任务4开始执行。。。结果:" + result);    }, executor);}

runAfterEither-有一任务完成就执行

/** * 两任务组合,一个任务完成就执行 * 

* objectCompletableFuture.runAfterEither() 其中一个执行完执行 * 结果: * 任务1线程:33 * 任务2线程:34 * 任务2运行结果: * 任务3开始执行。。。 */@Testpublic void runAfterEither() { CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程:" + Thread.currentThread().getId()); int i = 10 / 2; try { Thread.sleep(3000); System.out.println("任务1运行结果:" + i); } catch (InterruptedException e) { e.printStackTrace(); } return i; }, executor); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程:" + Thread.currentThread().getId()); System.out.println("任务2运行结果:"); return "hello"; }, executor); CompletableFuture runAfterEitherAsync = future1.runAfterEitherAsync(future2, () -> { System.out.println("任务3开始执行。。。"); }, executor);}

多任务组合

allOf-等待全部完成后才执行

/** * 多任务组合 * allOf 等待所有任务完成 * 结果: * 任务1 * 任务3 * 任务2 * allOf任务1-------任务2-------任务3 * * @throws ExecutionException * @throws InterruptedException */@Testpublic void allOf() throws ExecutionException, InterruptedException {    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务1");        return "任务1";    }, executor);    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {        try {            Thread.sleep(2000);            System.out.println("任务2");        } catch (InterruptedException e) {            e.printStackTrace();        }        return "任务2";    }, executor);    CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务3");        return "任务3";    }, executor);    CompletableFuture allOf = CompletableFuture.allOf(future1, future2, future3);    //等待所有任务完成    //allOf.get();    allOf.join();    System.out.println("allOf" + future1.get() + "-------" + future2.get() + "-------" + future3.get());}

anyOf-等待其中之一完成后就执行

/** * 多任务组合 * anyOf 只要一个任务完成 * 结果: * 任务1 * anyOf--最先完成的是任务1 * 任务3 * 等等任务2 * 任务2 * * @throws ExecutionException * @throws InterruptedException */@Testpublic void anyOf() throws ExecutionException, InterruptedException {    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务1");        return "任务1";    }, executor);    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {        try {            Thread.sleep(2000);            System.out.println("任务2");        } catch (InterruptedException e) {            e.printStackTrace();        }        return "任务2";    }, executor);    CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {        System.out.println("任务3");        return "任务3";    }, executor);    CompletableFuture anyOf = CompletableFuture.anyOf(future1, future2, future3);    System.out.println("anyOf--最先完成的是" + anyOf.get());    //等待future2打印    System.out.println("等等任务2");    Thread.sleep(3000);}

感知异常

handle-捕获结果或异常并返回新结果

入参为结果或者异常,返回新结果

/** * 入参为结果或者异常,返回新结果 * 结果: * main.................start..... * 当前线程:33 * main.................end.....报错返回 */@Testpublic void handle() throws ExecutionException, InterruptedException {    System.out.println("main.................start.....");    final CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {        System.out.println("当前线程:" + Thread.currentThread().getId());        int i = 10 / 0;        System.out.println("运行结果:" + i);        return i;    }, executor).handleAsync((in, throwable) -> {        if (throwable != null) {            return "报错返回";        }        return "正确了";    });    System.out.println("main.................end....." + completableFuture.get());}

whenComplete-感知结果或异常并返回相应信息

whenComplete虽然得到异常信息,但是不能修改返回信息

/** * 有返回值并且有后续操作 whenComplete * 

* 结果: * main.................start..... * 当前线程:33 * 异步完成。。。。结果是:null...异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: 除以零 * 报错了2 * * @throws ExecutionException * @throws InterruptedException */@Testpublic void whenComplete() { System.out.println("main.................start....."); final CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).whenComplete((result, throwable) -> { //whenComplete虽然得到异常信息,但是不能修改返回信息 System.out.println("异步完成。。。。结果是:" + result + "...异常是:" + throwable); }); try { System.out.println("main.................end..T..." + completableFuture.get()); } catch (InterruptedException e) { System.out.println("报错了1"); } catch (ExecutionException e) { System.out.println("报错了2"); }}

exceptionally-捕获异常并返回指定值

/** * 方法完成后的感知 * 感知错误并返回指定值 exceptionally * 结果: * main.................start..... * 当前线程:33 * 执行了exceptionally * main.................end.....0 * @throws ExecutionException * @throws InterruptedException */@Testpublic void exceptionally() throws ExecutionException, InterruptedException {    System.out.println("main.................start.....");    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {        System.out.println("当前线程:" + Thread.currentThread().getId());        int i = 10 / 0;        System.out.println("运行结果:" + i);        return i;    }, executor).exceptionally(throwable -> {        //R apply(T t);        //exceptionally可以感知错误并返回指定值        System.out.println("执行了exceptionally");        return 0;    });    System.out.println("main.................end....." + completableFuture.get());}

写在最后

  • 有收获的,点赞鼓励!
  • 收藏文章,方便回看!
  • 评论交流,互相进步!

Copyright © maxssl.com 版权所有 浙ICP备2022011180号