跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Javajava算法

ForkJoinPool 基本使用及原理解读

综述由AI生成ForkJoinPool 是 Java 7 引入的并行执行任务框架,核心采用分治算法与工作窃取策略。文章详解其基本原理,包括将大任务拆解为小任务并行计算,空闲线程从其他队列窃取任务以减少等待。通过 RecursiveAction 和 RecursiveTask 子类实现无返回值和有返回值的并行计算案例。同时梳理了 ForkJoinPool 源码中的关键注释,涵盖工作队列管理、线程控制、任务合并及公共池机制,帮助开发者深入理解底层实现与性能优化。

神经兮兮发布于 2025/2/23更新于 2026/6/321 浏览
ForkJoinPool 基本使用及原理解读

一、简介

ForkJoinPool 是 Java 7 开始提供的一个用于并行执行任务的框架。广泛用在 java8 的 parallelStream 和 CompletableFuture 中。其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果,得到最终的结果。这个描述实际上比较接近于单机版的 map-reduce,都是采用了分治算法。区别就在于 ForkJoin 机制可能只能在单个 jvm 上运行,而 map-reduce 则是在集群上执行。

此外,ForkJoinPool 采取工作窃取算法,以避免工作线程由于拆分了任务之后的 join 等待过程。这样处于空闲的工作线程将从其他工作线程的队列中主动去窃取任务来执行。这里涉及到的两个基本知识点是分治法和工作窃取。

此外本文也介绍了 ForkJoinPool 的一些实现原理,这将在后续源码介绍中,逐步详细说明。ForkJoin 不仅在 java8 之后的 stream 中广泛使用。

golang 等其他语言的协程机制,也是采用类似的原理来实现的。我们需要重点掌握 Fork-Join 的本质。

1.1 分治法

分治法的基本思想是将一个规模为 N 的问题分解为 K 个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。是一种分目标完成的程序算法。简单的问题,可以用二分法来完成。

二分法,就是我们之前在检索的时候经常用到的 Binary Search。这样可以迅速将时间复杂度从 O(n) 降低到 O(log n)。那么对应到 ForkJoinPool 对问题的处理也如此。基本原理如下:

这只是一个简化版本的 Fork-Join,实际上我们在日常工作中的应用可能比这要复杂很多。但是基本原理类似。这样就将一个大的任务,通过 fork 方法不断拆解,直到能够计算为止,之后,再将这些结果用 join 合并。这样逐次递归,就得到了我们想要的结果。这就是在 ForkJoinPool 中的分治法。

1.2 工作窃取 (work-stealing)

工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在 ForkJoinPool 中,工作任务的队列都采用双端队列 Deque 容器。

在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现 FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上 LIFO,而窃取其他线程的任务的时候,从队列头部取获取。示意图如下:

工作线程 worker1、worker2 以及 worker3 都从 taskQueue 的尾部 popping 获取 task,而任务也从尾部 Pushing,当 worker3 队列中没有任务的时候,就会从其他线程的队列中取 stealing,这样就使得 worker3 不会由于没有任务而空闲。这就是工作窃取算法的基本原理。

二、案例

在 JUC 中,实现 Fork-join 框架有两个类,分别是 ForkJoinPool 以及提交的任务抽象类 ForkJoinTask。

通常情况下我们都是直接继承 ForkJoinTask 的子类,Fork/Join 框架提供了两个子类:

  • RecursiveAction:一个递归无结果的 ForkJoinTask(没有返回值)任务
  • RecursiveTask:一个递归有结果的 (有返回值)任务
ForkJoinTask

2.1 不带返回值的计算

RecursiveAction 可以实现不带返回值的 fork-join 计算。实现如下:

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction {

    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 9;
    private int start;
    private int end;

    public PrintTask(int start, int end) {
        super();
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if(end - start  < THRESHOLD) {
            for(int i = start; i <= end; i++) {
                System.out.println(Thread.currentThread().getName() + ",i= " + i);
            }
        } else {
            int middle = (start + end) / 2;
            PrintTask firstTask = new PrintTask(start, middle);
            PrintTask secondTask = new PrintTask(middle + 1, end);
            invokeAll(firstTask, secondTask);
        }
    }
}

再执行如下 main 方法:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception {
        testNoResultTask();
    }

    private static void testNoResultTask() throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new PrintTask(1, 50));
        pool.awaitTermination(2, TimeUnit.SECONDS);
        pool.shutdown();
    }
}

上述代码执行:

ForkJoinPool-1-worker-1, i=1
ForkJoinPool-1-worker-1, i=2
ForkJoinPool-1-worker-1, i=3
ForkJoinPool-1-worker-1, i=4
ForkJoinPool-1-worker-1, i=5
ForkJoinPool-1-worker-1, i=6
ForkJoinPool-1-worker-2, i=26
ForkJoinPool-1-worker-1, i=7
ForkJoinPool-1-worker-2, i=27
ForkJoinPool-1-worker-3, i=14
ForkJoinPool-1-worker-1, i=8
ForkJoinPool-1-worker-3, i=15
ForkJoinPool-1-worker-2, i=28
ForkJoinPool-1-worker-3, i=16
ForkJoinPool-1-worker-3, i=17
ForkJoinPool-1-worker-3, i=18
ForkJoinPool-1-worker-3, i=19
ForkJoinPool-1-worker-3, i=20
ForkJoinPool-1-worker-1, i=9
ForkJoinPool-1-worker-5, i=45
ForkJoinPool-1-worker-3, i=21
ForkJoinPool-1-worker-4, i=33
ForkJoinPool-1-worker-7, i=39
ForkJoinPool-1-worker-2, i=29
ForkJoinPool-1-worker-7, i=40
ForkJoinPool-1-worker-4, i=34
ForkJoinPool-1-worker-4, i=35
ForkJoinPool-1-worker-4, i=36
ForkJoinPool-1-worker-4, i=37
ForkJoinPool-1-worker-4, i=38
ForkJoinPool-1-worker-3, i=22
ForkJoinPool-1-worker-3, i=23
ForkJoinPool-1-worker-5, i=46
ForkJoinPool-1-worker-5, i=47
ForkJoinPool-1-worker-5, i=48
ForkJoinPool-1-worker-1, i=10
ForkJoinPool-1-worker-1, i=11
ForkJoinPool-1-worker-1, i=12
ForkJoinPool-1-worker-1, i=13
ForkJoinPool-1-worker-5, i=49
ForkJoinPool-1-worker-3, i=24
ForkJoinPool-1-worker-7, i=41
ForkJoinPool-1-worker-7, i=42
ForkJoinPool-1-worker-7, i=43
ForkJoinPool-1-worker-2, i=30
ForkJoinPool-1-worker-7, i=44
ForkJoinPool-1-worker-3, i=25
ForkJoinPool-1-worker-5, i=50
ForkJoinPool-1-worker-2, i=31
ForkJoinPool-1-worker-2, i=32

可以看到上述线程将 1-50 并行的 print 出来。

2.2 带返回值的计算

通过实现 RecursiveTask 来进行带有返回值的计算。如我们需要计算 1-1000000 的累加结果。实现如下:

import java.util.concurrent.RecursiveTask;

public class CalculateTask extends RecursiveTask<Integer> {

    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 49;
    private int start;
    private int end;

    public CalculateTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int result = 0;
            for (int i = start; i <= end; i++) {
                result += i;
            }
            return result;
        } else {
            int middle = (start + end) / 2;
            CalculateTask firstTask = new CalculateTask(start, middle);
            CalculateTask secondTask = new CalculateTask(middle + 1, end);
            invokeAll(firstTask, secondTask);
            return firstTask.join() + secondTask.join();
        }
    }
}

主函数如下:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception{
        testHasResultTask();
    }
    
    public static void testHasResultTask() throws Exception {
        int result1 = 0;
        for (int i = 1; i <= 1000000; i++) {
            result1 += i;
        }
        System.out.println("循环计算 1-1000000 累加值:" + result1);

        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> task = pool.submit(new CalculateTask(1, 1000000));
        int result2 = task.get();
        System.out.println("并行计算 1-1000000 累加值:" + result2);
        pool.awaitTermination(2, TimeUnit.SECONDS);
        pool.shutdown();
    }
}

上述代码执行结果:

循环计算 1-1000000 累加值:1784293664
并行计算 1-1000000 累加值:1784293664

这样就非常容易的实现了一个基于并行的计算。

三、ForkJoin 源码注释

在我们看源码的过程中,先看 Doug Lea 的论文。在论文中作者详细阐述了 java 中 Fork/join 的基本原理。而且这些原理在实现的过程中,还在源码中写了大段的注释。

3.1 类注释

ForkJoinPool 是一个用于运行 ForkJoinTask 的 ExecutorService。提供了一个非 ForkJoinTask 客户端的提交点,以及执行管理和监控操作。

ForkJoinPool 不同于其他种类的 ExecutorService,其主要是通过工作窃取来进行:ForkJoinPool 中的所有线程都会尝试查找并执行提交到线程池中由其他活动创建的任务,如果不存在这些任务,则进行阻塞。当大多数任务派生其他子任务的时候,以及从外部客户端向 pool 中提交诸多小任务的时候,这可以实现高效处理,尤其是在构造函数中将 asyncMode 设置为 true 的时候。ForkJoinPool 也可能适用于从未加入的事件样式任务。

静态的 commonPool 方法可以适用于大多数应用程序,任何未显示提交到指定 pool 的 ForkJoinTask 都将使用公共的 pool,这样会减少资源的使用,在不使用期间缓慢回收线程,并在后续使用时将其恢复。

对于需要单独或者自定义的 pool,可以使用给定目标并行度来创建 ForkJoinPool。默认情况下,并行度等于处理器的数量,pool 尝试通过动态添加,暂停或恢复内部工作线程来维护足够的活动(或可用)线程,即使某些任务因等待加入其他任务而停滞不前。但是,面对阻塞的 I/O 或其他非托管同步,无法保证此类调整。嵌套的 ManagedBlocker 接口可扩展可容纳的同步类型。

除了执行线程和生命周期的控制方法之外,此类还提供了状态检查方法,getStealCount。旨在帮助开发、调整和监控 fork / join 应用程序。同样,方法 toString 以方便的形式返回池的状态指示,以进行非正式监控。

与其他 ExecutorService 一样,下表总结了三种主要的任务执行方法。这些被设计为主要供尚未在当前池中进行派生/联接计算的客户端使用。这些方法的主要形式接受 ForkJoinTask 的实例,但是重载形式还允许混合执行基于普通 Runnable 和 Callable 的活动。但是,已经在池中执行的任务通常应改为使用表中列出的内部计算形式,除非使用通常不联接的异步事件样式任务,在这种情况下,方法选择之间几乎没有区别。

Call from non-fork/join clientsCall from within fork/join computations
Arrange async execution
Await and obtain result
Arrange exec and obtain Future{@link ForkJoinTask#fork} (ForkJoinTasks are Futures)

默认情况下,共的池是使用默认构造函数构造的,但是可以通过三个参数来设置和控制这些参数。(System#getProperty)。

参数说明
java.util.concurrent.ForkJoinPool.common.parallelism并行度级别,非负整数
java.util.concurrent.ForkJoinPool.common.threadFactory线程产生的工厂类
java.util.concurrent.ForkJoinPool.common.exceptionHandler拒绝策略

如果存在 SecurityManager 且没有指定工厂类,那么默认的池将使用一个工厂类提供的线程。该线程没有启动权限。系统的类加载器用于加载这些类。在建立这些设置的时候出现任何错误,将使用默认参数,通过将 parallelism 属性设置为零和/或使用可能返回 {@code null} 的工厂,可以禁用或限制公共池中线程的使用。但是这样做可能会导致未连接的任务永远无法执行。

实现注意:ForkJoinPool 将运行线程的最大数量限制为 32767。尝试创建大于最大数目的池会导致 {@code IllegalArgumentException}。这个实现只在池关闭或者内部资源耗尽的时候才拒绝提交任务。通过抛出 RejectedExecutionException 异常。

3.2 关于原理的注释

3.2.1 ForkJoinPool 实现概述

此类及其内部的嵌套类为一组工作线程提供了主要的功能和控制,来自非 ForkJoin 线程的提交进入提交队列,之后通常将这些提交拆分为子任务,这些子任务可能会被其他线程窃取。优先规则优先处理其自身队列按照 LIFO 或者 FIFO(取决于模式定义)的任务。然后处理其他队列中的随机 FIFO 窃取任务。该框架最初是实现工作窃取来支持树形并行性工具的。随着时间的流逝,其可伸缩性优势导致了扩展和更改。以更好的支持更多不同使用的上下文。由于大部分内部方法和嵌套类是相互关联的,因此此处介绍他们的主要原理和描述。单个方法和嵌套类仅仅包含有关详细信息的简短注释。

3.2.2 WorkQueues

大多数操作发生在工作窃取队列中,(在内嵌的 workQueue 中)。

这个队列是 Deques 的特殊形式。仅支持四种可能的最终操作中的三种,push、pop 和 poll(也称为窃取)。

在进一步的约束下,push 和 pop 仅从其所有线程处或者扩展线程调用,此时处于锁定状态,而 poll 可以从其他线程调用,如果你不熟悉他们,则在继续阅读之前 Herlihy 和 Shavit 的著作《The Art of Multiprocessor programming》第十六章对此进行了更加详细的描述。

主要的工作窃取队列的设计大致与 Chase 和 Lev 的论文和 Michael,Saraswat 和 Vechev 撰写的。

与之最大的不同在于由于 GC 的要求,即使在生成大量的任务的程序中,我们也需要尽快清空已占用的空间。为了实现这一点,我们将 CAS 与 pull 窃取操作从索引的 base 和 top 移动到 slots 本身。

添加任务则采用经典的数组 push(task) 操作:

q.array[q.top] = task; ++q.top;

实际的代码需要对数组进行非空检查和大小检查。需要正确的屏蔽访问,并可能的等待 worker 开始扫描的信号。见下文,成功的 pop 和 poll 都需要通过 cas 操作实现从非 null 到 null。
pop 操作的过程为 (始终由任务所有者执行) :

if ((base != top) and
     (the task at top slot is not null) and
        (CAS slot to null))
       decrement top and return task;

而 poll 操作 (通常由窃取者执行) 是:

if ((base != top) and
    (the task at base slot is not null) and
    (base has not changed) and
       (CAS slot to null))
      increment base and return task;

由于我们依赖于引用的情况,因此不需要在 base 和 top 上做标记,他们是在任何基于循环数组队列中使用的简单整数。请参见 ArrayDeque。对索引的更新保证 top==base 意味着队列是空的。如果没有完全提交 push、pop 或者 poll 则可能会出错,使队列看起来不空,方法 isEmpty()用来检查最后一个元素不空的情况。因此,单独考虑的轮询操作不是无等待的,一个窃取线程无法成功的继续直到另外一个正在进行的窃取线程完成。(或者如果先前是空的则这是一次 push 操作。)然而,总的来说,我们至少保证了概率的非阻塞性,如果一次窃取微能成功,窃取线程总是会随机选择下一个不同的队列进行下一次尝试。因此,为了让一次窃取继续,任何正在进行的轮询或者对任何空队列的新推送都可以完成。这就是为什么我们通常使用方法 pollAt 及其变量,在 base 索引处尝试一次,否则就考虑其他操作,而不是执行方法 poll,后者会重试。

这种方法还支持用户模式,在这种模式下,本地任务采用 FIFO 而不是 LIFO,只需要使用 poll 而不是 pop。这在从不连接任务的消息传递框架中非常有用。然而,这两种模式都不考虑亲和力、负载、缓存位置等。因此很少在给定的机器上提供最佳性能,但通过平均这些因素,可移植地提供良好的吞吐量。此外,即使我们试图使用这些信息,我们通常也没有利用这些信息的基础,例如,某些任务集从缓存亲和力中获利,但其他任务集则受到缓存污染效应的损害。因此,即使他需要扫描,长期吞吐量通常最好使用随机选择策略,而不是定向选择策略,因此只要适用,就可以使用足够质量的廉价随机化。各种 Marsaglia xorshift(有些具有不同的移位常数)在使用点内联。

工作队列也可以用类似的方法处理提交到 pool 的任务。我们不能将这些任务混合在 worker 使用的同一个队列中。相反我们使用散列的形式将提交队列与提交线程随机关联起来,ThreadLocalRandom probe 的值用作选择现有队列的 hashcode。并且可以在与其他提交者发生竞争的时候随机重新定位。从本质上讲,提交者的行为类似于 worker,只不过他们被限制在执行他们被提交的本地任务或者在 countedcompleter 的情况下与其他具有相同根任务的任务。在共享模式下插入任务需要一个锁,主要是为了在调整大小的情况下进行保护,但是我们只使用了一个简单的 spinlock,使用字段 qlock。因为遇到繁忙队列的提交者会继续尝试或者创建其他队列,他们仅在创建和注册新队列的时候才会阻塞。另外,qlock 在关闭时饱和到不可锁定值 -1,在成功的情况下,解锁任然可以并且通过更便宜的顺序写入 qlock 来执行,但是在不成功的情况下使用 cas。

3.2.3 管理

工作窃取机制的主要吞吐量的优势来自于分散控制。worker 大多从自己或者彼此手中接任务,速度可以超过每秒 10 亿次,pool 自身创建、激活、阻塞、停用和终止线程。所有的这些都只需要最少的中心信息。只有少数的属性可以全局跟踪和维护,因为我们将它们打包到少数变量中。通常在不阻塞或者锁定的情况下保持原子性。几乎所有的基本的原子控制状态都保存在两个 volatile 变量中。这两个变量最常被读取,而不是写入,做为状态和一致性检查。另外,字段 config 保持不变性的配置状态。

字段 ctl 包含 64 位信息,这些信息用于原子的添加,停用、入队(在事件队列上)。出队或重新激活 worker 所需要的信息,为了实现将这些内容都打包到一个字段上。我们将最大并行度设置为 (1<<15)-1,这个值已经远远超出了现实中的工作范围。以允许对这个值进行 id、计数、以及取反操作。适用于 16 位的子字段。

字段 runState 保存可锁定状态位,STARTED、STOP 等。还保护对 workQueues 的更新。当用作锁的时候,它通常仅保留几条指令。唯一的例外是一次性数组的初始化和不常见的调整大小。因此几乎总是在经过短暂的自旋之后才可用。自旋之后,方法 awaitRunStateLock(仅在初始化 cas 失败的时候才调用)在内置 Monitor 上的使用 wait/notify 的机制来进行阻塞。对于高度竞争的锁,这将是一个很糟糕的主意。但是大多数 pool 在自旋之后没有竞争的情况下运行,因此,做为更保守的替代方法,它可以很好地工作,因为我们没有内部对象的 monitor,因此可以使用 stealCounter(这个方法是原子的,但是它也必须延迟初始化,请参阅 externalSubmit)。

runState 和 ctl 仅在一种情况下交互,决定添加一个工作线程(请参阅 tryAddWorker)在这种情况下,ctl 的 CAS 是在持有锁的情况下进行的。

记录工作队列

工作队列记录在工作队列数组中。该数组在首次使用的时候创建(请参阅 externalSubmit)。并在必要的时候进行扩展。在记录新工作线程和取消记录终止工作线程时对数组的更新受到 runState 锁的保护,但彼此并发可读,并且可以直接访问该数组。我们还确保对数组引用本身的读取永远不会过时。为了简化基于索引的操作,数组大小始终为 2 的幂,并且所有读取器都必须允许空槽位。worker 队列处于奇数的索引处,共享的(提交)队列处于偶数索引。最多 64 个槽位。以限制增长。即便队列需要扩展以增加更多的 worker 也如此。以这种方式将它们分组在一起可简化并加速对任务的 scan 操作。

所有工作线程的创建都是按需创建的。由任务提交,替换、终止的 worker 或补偿被阻止的 worker 触发。但是,所有其他的支持代码已设置为可与其他策略一起使用。为确保不保留会阻止 GC 的工作程序的引用。对 workerQueue 的所有访问均通过对 workerQueues 数组的索引(这是此处某些混乱代码的来源之一)进行。本质上,workQueues 数组用作弱引用机制。因此,例如 ctl 的 stack top 子字段存储索引,而不是引用。

排队空闲的 worker 与 HPC 工作窃取框架不同,我们不能让 worker 无限制的轮询发现任何任务。并且除非有任务可用。否则我们不能 start/resume 这些 workers。另外一方面,在提交或者新生成的任务的时候,我们必须迅速使它们生效。在许多情况下,激活工作人员的加速时间是整体性能的主要限制因素。在程序启动的时候,通过 JIT 编译和分配会更加复杂。因为,我们尽可能的简化这个过程。

ctl 字段

原子的维护活动和 worker 的数量。以及用于放置等待线程的队列,以便可以定位它们发出的信号。主动计数也起着静态的指标作用。因此当工作人员认为没有更多的要执行的任务的时候,主动计数就会减少。队列实际上就是 Treiber 堆栈的一种形式。堆栈是按最近使用的顺序激活线程的理想选择。这改善了性能和局部性。克服了易于争用和无法释放工作程序的缺点。(除非其在堆栈上位于顶部)。当 worker 找不到任务的时候,将他们推入闲置的 worker 堆栈,由 ctl 较低的 32 位字段表示。我们将 worker 停放。最高堆栈状态保存工作程序的 scanState 字段值。其索引和状态,以及一个版本计数器。该计数器除了 count 字段之外,还用作版本标记,用以提供对 Treiber 堆栈 ABA 问题的保护。

工作程序和 pool 都使用 scanState 来管理和跟踪工作程序是不活动的。(可能处于阻塞,等待信号),这是对任务进行扫描(当两个都不持有它的线程正在忙于运行任务时)。取消激活工作程序之后,将设置其 scanState 字段。并阻止其执行任务。即使它必须对其进行扫描一次,也可以避免排队。请注意,scanState 更新延迟队列 CAS 释放,因此使用时需要注意。排队时,scanState 的低 16 位必须保持其池的索引。因此我们在初始化的时候将索引放置在此处。请参见(registerWorker)否则将其保留在索引处,或在必要时将其还原。

内存排序,有关分析请参阅 Le, Pop, Cohen, and Nardelli 撰写的类似于本文中使用的工作窃取算法中的内存排序要求。我们通常需要比最小顺序更强的命令,因为有时我们必须向 worker 发出信号。要求像 Dekker 一样的全屏障以防止信号丢失。要安排足够的排序而又不花费过多的费用,则需要在表示访问限制的受支持方法之间进行权衡。最重要的操作是从队列中获取并更新 ctl 状态,这需要完整的 CAS。使用 Unsafe 提供的 volatile 的模拟读取 Array 的槽位。从其他线程访问 WorkQueue 的 base,top 和 array 需要对这些读取中的任何一个进行 volatile 加载。我们申明 base 索引为 volatile 的约定,并始终在其他字段之前读取,或者线程必须确保有序的更新,因此写操作将使用有序的内部函数。除非他们可以负担其他写操作的内容。其他 WorkQueue 字段(例如 currentSteal)也具有类似的约定和原理,这些字段仅由所有者写入但被其他人观察到。

创建 worker

要创建一个工作程序,我们预增加总数,用作保留,并尝试通过其工厂构建一个 ForkJoinWorkerThread。新线程将调用 registerWorker,在此构造一个 WorkQueue。并在 workQueues 数组中分配一个索引。必要时扩展该数组。然后启动线程。如果这些步骤有任何异常。或者 worker 返回空值,则 deregisterWorker 会调整计数并进行相应的记录,如果返回空值。则 pool 将继续以少于目标数的 worker 状态运行。如果出现异常,则通常将异常传播到某些外部调用的地方。辅助索引的分配避免了在 workQueues 数组的开头开始依次进行打包时发生的扫描偏差。我们将数组视为简单的 2 的幂的哈希表。并根据需要进行扩展。seedIndex 增量可确保在需要调整大小或注销并替换 worker 之前不会发生冲突。此后将发生冲突的可能性保持在较低水平,我们不能在这里将 ThreadLocalRandom 的 getProbe() 用于类似的目的。因为线程尚未启动。但是这样做是为了实现外部线程创建提交队列。

停用并等待,排队遇到了一些固有的种类,最值得注意的是,产生任务的线程可能会错过看到(和发信号)另一个寻找 worker 但是尚未进入等待队列的线程。当 worker 找不到要进行窃取的任务的时候,它会停用并排队,通常,由于 GC 或者 OS 调度。缺少任务是短暂的,为了减少错误警报的停用,扫描程序会在扫描期间计算队列的状态和校验和。此处和其他地方使用的稳定性检查是快照技术的概率变体,请参阅 Herlihy&Shavit。工作者放弃并尝试仅在两次扫描的总和稳定之后才停用它。此外,为避免丢失信号,它们在成功入队后重复此扫描过程,直到再次稳定。在这种状态下,工作程序无法执行/运行它看到的任务,直到将其从队列中释放为止,因此工作程序本身最终会尝试释放其自身或任何后续任务(请参见 tryRelease)。否则,在进行空扫描时,停用的工作人员会在阻塞(通过停车)之前使用自适应本地自旋构造(请参阅 awaitWork)。请注意有关 Thread.interrupts 围绕停放和其他阻塞的不寻常约定:由于中断仅用于提醒线程检查终止(无论如何在阻塞时进行检查),因此我们在调用任何 Park 之前清除状态(使用 Thread.interrupted),以便由于通过其他一些不相关的调用来设置状态以中断用户代码,因此 Park 不会立即返回。

信号和激活

当且仅当至少可以找到并执行一个线程的时候,才创建或者激活工作程序。在由 worker 或外部提交者将其推送到之前(可能是)的空队列的时候,会在空闲状态向 worker 发出信号。或者工作量小于给定的并行度,则会创建工作线程。每当其他线程从工作中删除任务并注意到那里还有其他任务时,这些主要信号就会被其他人支持。在大多数平台上,发信号(释放)的开销时间非常长,发信号的线程与线程实际取得进展之间的时间可能非常长,因此值得从关键路径上分担这些延迟。另外,由于不活动的工作人员通常是在重新扫描或旋转而不是阻塞,所以我们设置并清除了 WorkQueues 的' parker'字段,以减少不必要的 unpark 的调用。这需要进行二次重新检查,以避免信号遗漏。

Trimming workers.需要在不使用的一段时间之后释放资源,如果 pool 在 IDLE_TIMEOUT 期间保持静止,则在处于静止状态时开始等待的工作程序将超时并终止,(请参阅 awaitWork)随着线程数的减少,该时间段将增加,最终遣散所有 worker。同样,当存在两个以上的备用线程时,多余的线程会在下一个静态点立即终止。(两次填充可避免滞后现象。)

Shutdown and Termination,调用 shutdownNow 会使用 tryTerminate 以原子的方式设置 runState 位。调用线程以及此后终止的所有其他工作线程,通过设置其(qlock)状态。取消其未处理的任务并唤醒它们,反复执行直到稳定为止,以帮助终止其他线程(但循环受工作线程数量限制)。调用 non-abrupt shutdown() 通过检查是否应该终止终止来开始此操作。这主要取决于维持共识的'ctl'的活动计数位 - 每当静态时,trywaiter 从 awaitWork 调用。但是,外部提交者不参与该共识。因此,tryTerminate 会扫描队列(直到稳定),以确保缺少正在进行中的提交,并且工作人员将在触发终止的'停止'阶段之前对其进行处理。(注意:启用关闭功能后,如果调用 helpQuiescePool 会发生内在冲突。两者都等待静止,但是 tryTerminate 偏向于在 helpQuiescePool 完成之前不会触发。)

3.2.4 Joining Tasks

当一个 worker 正在等待 join 另外一个被窃取的任务的时候,可以采取任何行动。因为我们将许多任务复制到一个工作的 pool 上,所以我们不能仅仅让他阻塞,如同 Thread.join 中一样。我们也不能仅仅将 joiner 的运行时堆栈重新分配给另一个,然后在以后替换它,这将是' continuation'的一种形式,即使可能,也不一定是一个好主意,因为我们既需要无阻塞的任务,又需要继续执行进展。相反,我们结合了两种策略:

  • Helping:安排加入者执行一些如果没有发生窃取将要运行的任务。
  • Compensating:除非已有足够的活动线程,否则方法 tryCompensate()可能会创建或重新激活备用线程以补偿阻塞的连接器,直到它们解除阻塞为止。
  • 第三种形式(在 tryRemoveAndExec 中实现)相当于帮助了一个假设的补偿器:如果我们可以很容易地看出补偿器的可能动作是窃取并执行要加入的任务,则加入线程可以直接这样做,而无需执行任何操作。补偿线程(尽管以较大的运行时堆栈为代价,但通常值得进行权衡)。

ManagedBlocker 扩展 API 不能使用 Helping,因此仅依赖于方法 awaitBlocker 中的补偿。

helpStealer 中的算法需要一种'线性帮助'形式。每个 worker(在 currentSteal 字段中)记录它从其他 worker(或提交)中窃取的最新任务。它还记录(在 currentJoin 字段中)它当前正在主动加入的任务。helpStealer 方法使用这些标记来尝试寻找可以帮助完成主动加入的任务的工作人员(即从中窃取任务并执行该任务)。因此,如果要加入的任务没有被窃取,则连接器执行的任务将由其自己的本地双端队列执行。这是 Wagner&Calder 的它的不同之处在于:

  1. 我们仅在窃取时在工人之间维护依赖关系链接,而不使用按任务记帐。有时这需要对 workQueues 数组进行线性扫描以找到盗窃者,但是通常不需要这样做,因为盗窃者会留下提示(可能会变得陈旧/错误),以查找盗窃者的位置。这只是一个提示,因为一个工人可能曾多次偷窃,而提示仅记录了其中一个(通常是最新的)。提示将成本隔离到需要的时间,而不是增加每个任务的开销。
  2. 它是'浅'的,忽略了嵌套和潜在的周期性相互窃取。
  3. 这是故意的:字段 currentJoin 仅在主动加入时更新,这意味着我们在长期任务,GC 停顿等过程中会错过链中的链接(这是正常的,因为在这种情况下进行阻塞通常是个好主意)。
  4. 我们使用校验和来限制找到工作的尝试的次数,然后回退到暂停该工作程序,并在必要时将其替换为另一个。

CountedCompleters 的 helping 操作不需要跟踪 currentJoins:方法 helpComplete 可以执行和执行与正在等待的任务具有相同根目录的任何任务(优先于本地 polls 而不是非本地轮询)。但是,这仍然需要完成程序链的遍历,因此效率不如使用没有显式联接的 CountedCompleters。

补偿的目的并不是要确保在任何给定时间都运行无阻塞线程的目标并行度。此类的某些先前版本对任何阻塞的连接立即采用补偿。但是,实际上,绝大多数阻塞是 GC 和其他 JVM 或 OS 活动的暂时性副产品,这些副产品由于更换而变得更糟。当前,仅在通过检查字段 WorkQueue.scanState 确认所有据称活动的线程正在处理任务之后才尝试进行补偿,从而消除了大多数误报。同样,在最不常见的情况下,绕过补偿(允许更少的线程)是很少有好处的:当队列为空的工人(因此没有继续任务)在联接上阻塞时,仍然有足够的线程来确保活动。

补偿机制可能是有界的。commonPool 的界限(请参阅 commonMaxSpares)可以使 JVM 在耗尽资源之前更好地应对编程错误和滥用。在其他情况下,用户可能会提供限制线程构造的工厂。与其他所有池一样,此池中的边界影响不精确。当线程注销时,总的工作人员计数会减少,而不是在线程退出并且 JVM 和 OS 回收资源时减少。因此,同时处于活动状态的线程数可能会暂时超出限制。

3.2.5 Common Pool

静态的公共的 pool 在静态初始化之后始终存在。由于不需要使用它,或者任何其他创建的 pool,因此我们将初始构造开销和占用空间最小化到大约十二个字段的设置,而没有嵌套分配。在第一次提交到 pool 期间,大多数引导发生在 externalSubmit 方法中。

当外部线程提交到公共的 pool 的时候,他们可以在 join 的时候执行子任务的处理,(请参阅 externalHelpComplete 和相关方法)通过此呼叫者帮助策略,可以明智地将公共池并行度级别设置为比可用核心总数少一个(或多个),对于纯呼叫者运行,甚至可以设置为零。我们不需要记录是否将外部的提交提交到公共 pool 中 - 否则,外部 helping 方法会迅速返回。否则,这些提交者将被阻止等待完成,因此在不适用的情况下,额外的工作量(使用大量的任务状态检查)在限制 ForkJoinTask.join 之前是有限的轮换等待的一种奇怪形式。

作为托管环境中更合适的默认值,除非存在系统属性覆盖,否则当存在 SecurityManager 时,我们将使用子类 InnocuousForkJoinWorkerThread 的工作程序。这些工作程序没有权限设置,不属于任何用户定义的 ThreadGroup,并且在执行任何顶级任务后请擦除所有 ThreadLocals(请参阅 WorkQueue.runTask)。关联的机制(主要在 ForkJoinWorkerThread 中)可能取决于 JVM,并且必须访问特定的 Thread 类字段才能实现此效果。

3.2.6 Style notes

内存排序主要依赖于 Unsafe 内部函数,这些内部函数承担进一步的责任,即明确执行空检查和边界检查,否则将由 JVM 隐式执行。这样写代码可能会非常丑陋,但也反映了需要控制非常活跃的代码中很少有不变量的异常情况下的结果。因此,这些显式检查无论如何都会以某种形式存在。使用之前,所有字段都被读入本地,如果它们是引用,则进行空检查。通常以'C'类样式在方法或块的开头列出声明,并在首次遇到时使用内联分配来完成。数组边界检查通常是通过用 array.length-1 进行掩码来执行的,array.length-1 依赖于不变的条件,即这些数组是用正长度创建的,而该长度正好是自相矛盾地检查的。几乎所有显式检查都会导致绕过/返回,而不是引发异常,因为它们可能由于关机期间的取消/吊销而合法地出现。

在类 ForkJoinPool,ForkJoinWorkerThread 和 ForkJoinTask 之间,有很多表示层级的耦合。WorkQueue 的字段维护由 ForkJoinPool 管理的数据结构,因此可以直接访问。减少这种情况几乎没有意义,因为无论如何表示形式的任何相关的将来更改都将需要伴随算法更改。几种方法本质上无处不在,因为它们必须累积对局部变量中保存的字段的一致读取集。还有其他编码异常(包括一些看上去不必要的悬挂式空检查),即使在解释(未编译)时也可以帮助某些方法合理地执行。
该文件中的声明顺序为(除少数例外):

  1. 静态实用程序函数
  2. 嵌套(静态)类
  3. 静态字段
  4. 字段以及在解压缩某些字段时使用的常量
  5. 内部控制方法
  6. 对 ForkJoinTask 方法的回调和其他支持
  7. 导出的方法
  8. 以最小相关顺序进行初始化的静态代码块

目录

  1. 一、简介
  2. 1.1 分治法
  3. 1.2 工作窃取 (work-stealing)
  4. 二、案例
  5. 2.1 不带返回值的计算
  6. 2.2 带返回值的计算
  7. 三、ForkJoin 源码注释
  8. 3.1 类注释
  9. 3.2 关于原理的注释
  10. 3.2.1 ForkJoinPool 实现概述
  11. 3.2.2 WorkQueues
  12. 3.2.3 管理
  13. 3.2.4 Joining Tasks
  14. 3.2.5 Common Pool
  15. 3.2.6 Style notes
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • Graph-RAG:知识图谱与大模型融合
  • AI 大模型入门教程:从零基础到精通
  • LLM 中 Attention 机制的实现原理与优化策略
  • 项目经理指南:嵌入、Copilot 与 AI Agent 模式场景解析及 LLM 策略选择
  • AI 大模型发展趋势:技术演进、应用场景与商业模式深度解析
  • LangChain Agent 核心概念与实战开发指南
  • Python 核心技能体系:800 案例与 34 章实战指南
  • ARIS 开源:基于 Claude Code 的全自动科研与论文工作流
  • Flutter 组件 tavily_dart 在鸿蒙系统的适配与进阶应用
  • 大型语言模型(LLM)技术综述:架构、训练与应用详解
  • 基于 LangChain 与 Ollama 构建本地 LLM 应用实战
  • Web-Check 本地部署与公网远程访问实战
  • 大模型行业三大核心竞争力:资金、人才与数据
  • 大模型原理基础:从感知机到神经网络
  • GitHub Copilot Pro 学生认证免费获取详细指南
  • 中国人工智能大模型技术发展白皮书深度解析
  • 10 个实用且优秀的 Python 开发库推荐
  • OpenClaw 与 Ollama 本地部署指南
  • 大语言模型 (LLM) 快速理解
  • Git Clone 命令行临时设置代理的两种方法

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online