结构化并发 ForkJoinPool & StructuredTaskScope

2023-12-13 10:47:05

Java 通过引入结构化并发 API 简化并发编程。结构化并发将在不同线程中运行的相关任务组视为单一工作单元,从而简化错误处理和取消操作、提高可靠性并增强可见性

结构化并发由 JEP 428 提出,并在 JDK 19 作为孵化API。它由 JEP 437 在 JDK 20 中重新孵化,并带有一个继承Scoped Values的版本更新 (JEP 429)

非结构化并发 ExecutorService

java.util.concurrent.ExecutorService API 是在 Java 5 中引入的,可帮助开发人员同时执行子任务

在下列代码中,对于两个无关联性的IO操作(如RPC、MySQL调用),通常使用线程池来并发执行findUser()fetchOrder()两个子任务

Response handle() throws ExecutionException, InterruptedException {
    Future<String>  user  = esvc.submit(() -> findUser());
    Future<Integer> order = esvc.submit(() -> fetchOrder());
    String theUser  = user.get();   // Join findUser
    int    theOrder = order.get();  // Join fetchOrder
    return new Response(theUser, theOrder);
}
  • 如果findUser()抛出异常,那么handle()在调用user.get()时会抛出异常,但是fetchOrder()将继续在自己的线程中运行。这称为线程泄漏
  • 如果handle()被中断,findUser()fetchOrder()将继续执行,会导致线程泄露
  • 如果findUser()需要很长时间执行,但fetchOrder()失败了,handle()主线程需要block在user.get()后才能感知异常

这不仅会导致产生错误的可能性更大,而且会使诊断和排除此类错误变得更加困难。例如,线程转储等可视化工具无法对任务之间的堆栈进行关联和追踪

ExecutorService 和 Future 允许这种非结构化使用,所以它们不会强制执行甚至跟踪任务和子任务之间的关系,即使这种关系很常见并且有用

结构化并发

结构化并发(Structured Concurrency)是一种并发编程模型,旨在简化和规范并发代码的编写和管理。它提供了一种结构化的方式来组织和控制并发任务的执行,以减少常见的并发编程问题,如资源泄漏、死锁和竞态条件等

结构化并发的核心思想是将并发任务组织为层次结构,并通过一些基本原则来管理它们的执行:

  1. 嵌套关系:并发任务可以嵌套在其他任务中,形成层次结构。父任务可以控制子任务的执行,并等待子任务完成。
  2. 继承关系:子任务的生命周期受父任务的控制。当父任务完成或取消时,它会自动取消所有的子任务。
  3. 顺序执行:任务按照顺序执行,一个任务的完成是下一个任务的前提条件。这样可以确保任务之间的依赖关系和顺序执行的一致性。
  4. 取消机制:任务可以被取消,取消操作会向下传递,取消所有嵌套的子任务。

结构化并发模型的一个重要特性是异常传播。当一个任务抛出异常时,它会被传播到父任务,并取消整个任务层次结构的执行。这有助于避免未处理的异常导致的问题,并提供了一种可靠的错误处理机制

StructuredTaskScopeForkJoinPool不同,ForkJoinPool是为计算密集型任务设置的,StructuredTaskScope默认使用虚线程,主要面向I/O密集型

计算密集型 ForkJoinPool

ForkJoinPool提供了一种结构化并发的机制,用于高效地执行并行任务,并在Java中广泛用于处理递归和分治算法等需要并行处理的场景。

ForkJoinPool适用于计算密集型任务

ForkJoinPool 是 Java 标准库中提供的一个用于并行执行任务的线程池实现。它基于工作窃取(work-stealing)算法,其中线程可以从其他线程的任务队列中窃取任务来执行

在 ForkJoinPool 中,任务被划分为更小的子任务,然后递归地执行这些子任务,直到达到某个终止条件。具体流程可见 parallelStream/ForkJoinPool 详解

IO密集型 StructuredTaskScope

StructuredTaskScope 允许开发人员将任务构建为一系列并发子任务,并将它们作为一个整体进行协调。子任务的成功的结果或异常由父任务聚合和处理

StructuredTaskScope适用于IO密集型任务

这是之前使用线程池的handle()示例,改为使用StructuredTaskScope而编写的

Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Supplier<String>  user  = scope.fork(() -> findUser());
        Supplier<Integer> order = scope.fork(() -> fetchOrder());

        scope.join()            // Join both subtasks
             .throwIfFailed();  // ... and propagate errors

        // Here, both subtasks have succeeded, so compose their results
        return new Response(user.get(), order.get());
    }
}

与原始示例相比,理解涉及的线程的生命周期很容易:在所有情况下,它们的生命周期都限定在一个词法范围内,即try-with-resources语句的主体部分。此外,使用StructuredTaskScope确保了许多有价值的属性:

  1. 错误处理与短路: 如果任务中的任一子任务失败,如果另一任务尚未完成,则将其取消。(由ShutdownOnFailure实现的关闭策略来管理)
  2. 取消传播: 如果运行handle()的线程在调用join()之前或期间被中断,当线程退出作用域时,两个子任务将自动取消
  3. 清晰度: 上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功还是失败
  4. 可见性: 线程转储(thread dump)清楚地显示了任务层次结构,其中运行子任务的线程显示为作用域的子级

StructuredTaskScope 是一个预览版 API,默认禁用。要启用需指定JVM参数--enable-preview


参考资料:

  1. JEP 453: Structured Concurrency (Preview)
  2. parallelStream/ForkJoinPool 详解

文章来源:https://blog.csdn.net/why_still_confused/article/details/134866793
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。