Java为我们提供了许多启动线程和管理线程的方法。在本文中,我们将介绍一些在Java中进行并发编程的选项。我们将介绍结构化并发的概念,然后讨论Java 21中一组预览类——它使将任务拆分为子任务、收集结果并对其进行操作变得非常容易,而且不会不小心留下任何挂起的任务。

1 基础方法

通过Lambda表达式启动平台线程的这种创建线程的方法最简单,适用于简单情况。

// Lambda表达式启动平台线程的一种方法。Thread.ofPlatform().start(() -> {    // 在这里执行在独立线程上运行的操作});

问题

  • 创建平台线程是昂贵的
  • 若应用程序用户量很大,平台线程数量可能增长到超出JVM支持的限制

显然,大多数应用程序服务器不鼓励这种行为。因此,继续下一种方法——Java Futures。

2 Java Future类

JDK 5引入,开发者需要改变思考方式。不再考虑启动新线程,而考虑将“任务”提交到线程池以供执行。JDK 5还引入ExecutorService,任务将提交到该服务。ExecutorService是一个定义了提交任务并返回Java Future的机制的接口。提交的任务需实现Runnable或Callable接口。

任务提交给表示单线程线程池

// 将Callable任务提交给表示单线程线程池的ExecutorServiceExecutorService service = Executors.newSingleThreadExecutor();Future future = service.submit(() -> {    // 进行一些工作并返回数据    return "Done";});// 在这里执行其他任务// 阻塞直到提交的任务完成String output = future.get();// 打印 "Done"System.out.println(output);// 继续执行后续任务

多个任务提交到ExecutorService

try (ExecutorService service = Executors.newFixedThreadPool(3)) {    Future future1 = service.submit(() -> {           // 执行任务1并返回TaskResult     });    Future future2 = service.submit(() -> {           // 执行任务2并返回TaskResult     });        Future future3 = service.submit(() -> {           // 执行任务3并返回TaskResult     });    /* 所有异常上抛 */    // get()将阻塞直到任务1完成    TaskResult result1 = future1.get();    // get()将阻塞直到任务2完成    TaskResult result2 = future2.get();    // get()将阻塞直到任务3完成    TaskResult result3 = future3.get();    // 处理result1、result2、result3    handleResults(result1, result2, result3);}

所有这些任务将并行运行,然后父线程可用future.get()方法检索每个任务的结果。

3 上述实现的问题

如在上面代码中用Platform线程,则存在一个问题。获取TaskResult的get()方法将阻塞线程,由于与阻塞Platform线程相关的可扩展性问题,这代价可能很昂贵。然而,使用Java 21——如用Virtual Threads,则在get()期间,底层的平台线程不会被阻塞。

若task2、task3在task1前完成,须等到task1完成,然后处理task2和task3结果。

若task2或task3执行过程失败,则问题更糟。假设整个用例应在任何任务失败时就失败,代码将等到task1完成,然后抛异常。这不是我们的期望,它将为最终用户创建一个非常迟钝的体验。

3.1 基本问题

ExecutorService类对提交给它的各种任务之间关系一无所知。因此,它不知道若一个任务失败,该发生点啥。即示例中提交的三个任务被视为独立任务,而非用例的一部分。这并不是ExecutorService类的失败,因为它没有设计为处理提交的任务之间的任何关系。

3.2 另一个问题

ExecutorService周围使用try-with-resources块,确保在try块退出时调用ExecutorService的close方法。close方法确保所有提交给执行器服务的任务在继续执行之前终止。

若用例要求在任何任务失败时立即失败,那我们运气不好。close方法将等待所有提交的任务完成。

但若不用try-with-resources块,则不能保证在块退出前三个任务都结束。将保留未清理终止的“未明确终止的线程”。任何其他自定义实现都须确保在失败时立即取消其他任务。

因此,尽管用Java Future是处理可拆分为子任务的任务的一种不错方法,但还不够完美。开发须将用例的“感知”编码到逻辑中,但这很难!

注意,对Platform线程存在于Java Futures的问题之一即阻塞问题——Java 21使用Virtual线程时,这问题不再存在。因为使用Virtual Threads时,使用future.get()方法阻塞线程将释放底层的Platform线程。

使用CompletableFuture Pipelines也可解决阻塞问题,但这里不深入探讨。有更简单的方法来解决Java 21阻塞问题,没错就是Virtual Threads!但我们需要找到一种更好解决方案,以处理可拆分为多个子任务且“知道”用例的任务。这就引出结构化并发的基本思想。

4 结构化并发

想象,从方法内部向ExecutorService提交的任务,然后方法退出。现在更难推断代码,因为不知道此提交的任务可能的副作用,且这可能导致难以调试的问题。该问题的图解:

结构化并发基本思想是从一个块(方法或块)内启动的所有任务应在该块结束前终止。即:

  • 代码的结构边界(块)

  • 和该块内提交的任务的运行时边界

重合。这使应用程序代码更容易理解,因为一个块内提交的所有任务的执行效果都被限制在该块内。块外查看代码时,不必担心任务是否仍在运行。

ExecutorService的try-with-resources块是对结构化并发的一次良好尝试,其中从块内提交的所有任务在块退出时完成。但它还不够,因为它可能导致父线程等待时间超过必要时间。其改进版——StructuredTaskScope

5 StructuredTaskScope

Java 21 Virtual Thread作为一项功能被引入,它在大多情况下实际上消除了阻塞问题。但即使使用Virtual线程和Futures,仍存在“不干净终止任务”和“等待时间比必要时间长”的问题。

StructuredTaskScope类在Java 21中作为预览功能提供,旨在解决这问题。它试图提供比Executor Service的try-with-resources块更干净的结构化并发模型。StructuredTaskScope类知道提交的任务之间的关系,因此它可对它们进行更智能假设。

使用StructuredTaskScope的示例

在任一任务失败时,立即返回用例。

StructuredTaskScope.ShutdownOnFailure()返回一个StructuredTaskScope的引用,该引用知道若一个任务失败,那其他任务也须终止,因为它“知道”提交的任务之间的关系。

 try(var scope = new StructuredTaskScope.ShutdownOnFailure()) {                                                                                        // 想象一下LongRunningTask实现Supplier     var dataTask = new LongRunningTask("dataTask", ...);       var restTask = new LongRunningTask("restTask", ...);                                                                               // 并行运行任务     Subtask dataSubTask = scope.fork(dataTask);                Subtask restSubTask = scope.fork(restTask);                                                                                         // 等待所有任务成功完成或第一个子任务失败。      // 如果一个失败,向所有其他子任务发送取消请求     // 在范围上调用join方法,等待两个任务都完成或如果一个任务失败     scope.join();                                                            scope.throwIfFailed();                                                                                                                            // 处理成功的子任务结果                                     System.out.println(dataSubTask.get());                                   System.out.println(restSubTask.get());                               }                                                                       

企业用例

其中两个任务可并行运行:

  • 一个DB任务
  • 一个Rest API任务

目标是并行运行这些任务,然后将结果合并到单个对象中并返回。

调用ShutdownOnFailure()静态方法创建一个StructuredTaskScope类。然后使用StructuredTaskScope对象fork方法(将fork方法考虑为submit方法)并行运行两个任务。幕后,StructuredTaskScope类默认使用Virtual线程来运行任务。每次fork一个任务,都创建一个新Virtual线程(Virtual线程永不会被池化)并运行任务。

然后在范围上调用join方法,等待两个任务都完成或如果一个任务失败。更重要的——若一个任务失败,join()方法将自动向其他任务(剩余运行任务)发送取消请求并等待其终止。这很重要,因为取消请求将确保在块退出时没有不必要的悬挂任务。

若其他线程向父线程发取消请求,也是如此。在最后,若块内部任何位置抛异常——StructuredTaskScope的close方法将确保向子任务发送取消请求并终止任务。StructuredTaskScope美妙在于——若子线程创建自己的StructuredTaskScope(子任务本身有自己的子任务),取消时它们都会得到干净处理。

开发在这里的一个职责是确保它们编写的任务须正确处理在取消期间设置在线程上的中断标志。任务有责任读取此中断标志并干净终止自己。若任务未正确处理中断标志,那用例的响应性将受影响。

6 使用StructuredTaskScope

当一个用例需要将任务分解为子任务,可能还需将子任务进一步分解为更多子任务时,使用StructuredTaskScope是合适的。本文看到的示例是用例需在任一子任务失败时立即返回。但StructuredTaskScope远不止如此。

  • 在第一个任务成功时返回
  • 在所有任务完成时返回(成功或失败)
  • 制作自己的StructuredTaskScope版本

6.1 StructuredTaskScope优点

  • 代码易阅读,因为无论哪种用例,代码看着都一样
  • 子线程失败时会在适当时被干净终止。没有不必要的悬挂线程
  • 使用StructuredTaskScope与Virtual Threads一起,意味与阻塞相关可扩展性问题不存在。这也难怪,默认情况下,StructuredTaskScope在底层使用Virtual Threads

7 总结

总的来说,StructuredTaskScope类是Java中处理将任务拆分为多个子任务的用例的良好补充。子线程在失败时自动取消,不同用例的代码一致性以及更好地理解代码的能力,使其成为在Java中实现Structured Concurrency的理想选择。

Virtual Threads和StructuredTaskScope类共同组成了一个完美的组合。Virtual Threads使我们能够在JVM中创建数十万个线程,而StructuredTaskScope类使我们能够有效地管理这些线程。

让我们等待它退出预览并成为一个正式特性!

本文由博客一文多发平台 OpenWrite 发布!