Java ForkJoin 框架详解:分治法与并行编程
ForkJoin 框架基于分治法和工作窃取算法,适用于可分解的计算密集型任务。了 ForkJoinPool、ForkJoinTask 等核心组件,通过数组求和、斐波那契数列及文件遍历案例展示了 RecursiveTask 和 RecursiveAction 的用法。此外还涵盖了阈值选择、常见陷阱、性能优化策略,并对比了 Parallel Stream 与 CompletableFuture 在现代 Java 并发中的应用。

ForkJoin 框架基于分治法和工作窃取算法,适用于可分解的计算密集型任务。了 ForkJoinPool、ForkJoinTask 等核心组件,通过数组求和、斐波那契数列及文件遍历案例展示了 RecursiveTask 和 RecursiveAction 的用法。此外还涵盖了阈值选择、常见陷阱、性能优化策略,并对比了 Parallel Stream 与 CompletableFuture 在现代 Java 并发中的应用。


本文适合已经掌握 Java 多线程基础(如 Thread、Runnable、synchronized),并初步了解 JUC 并发工具(如线程池、ConcurrentHashMap)的开发者。ForkJoin 框架是 Java 并发编程的高级主题,理解它将帮助你掌握'分而治之'的并行计算思想,为后续学习大数据处理框架(如 MapReduce)打下坚实基础。
通过本文的系统学习,你将能够:
在并发编程中,我们经常遇到一些可以'分而治之'的大任务:比如遍历超大数组求和、处理海量文件、计算复杂递归函数(如斐波那契数列)、并行排序等。将这些大任务拆分成小任务并行执行,最后合并结果,往往能获得巨大的性能提升。
传统的 ThreadPoolExecutor 虽然也能处理多任务,但它面临一个核心挑战:当任务之间存在父子依赖关系时,如何高效调度? 例如,一个任务分解成两个子任务,这两个子任务完成后才能继续父任务。如果用传统线程池,你需要手动管理这些依赖,代码复杂且容易出错。
ForkJoin 框架正是 JDK 为这种场景提供的专门解决方案。它由并发大师 Doug Lea 设计,自 JDK 7 引入,是 java.util.concurrent 包中最精巧、最高效的组件之一。
分治法(Divide-and-Conquer)是一种古老的算法思想,其核心可以用十二个字概括:分解、解决、合并。
这种思想天然适合并行处理。典型的应用包括归并排序、快速排序、大数求和、矩阵运算等。
工作窃取(Work-Stealing)算法是 ForkJoin 框架性能卓越的核心所在。它解决了传统线程池中线程负载不均的问题。
假设我们将一个大任务拆分成 10 个小任务,并启动了 5 个线程。由于每个任务的执行时间可能不同,有的线程很快完成了自己的任务,有的线程还在忙。如果空闲线程只是等待,就浪费了宝贵的 CPU 资源。工作窃取正是让空闲线程主动去帮助繁忙线程的一种机制。
ForkJoinWorkerThread)都维护着一个双端队列(Deque),用于存放分配给它的任务。工作窃取的优点:
缺点:在某些极端情况下(如队列只剩一个任务),仍存在竞争;同时维护多个双端队列也增加了系统开销。
ForkJoin 框架主要由三个核心组件构成:
ForkJoinPool 是 ForkJoin 框架的线程池实现,它继承了 AbstractExecutorService,因此也是一个特殊的 ExecutorService。但与 ThreadPoolExecutor 不同,它的内部不是用一个共享的任务队列,而是维护了一个工作队列数组(WorkQueue[]),每个工作队列对应一个工作线程。
// 方式一:使用默认构造器(并行度 = CPU 核心数)
ForkJoinPool pool1 = new ForkJoinPool();
// 方式二:指定并行度
ForkJoinPool pool2 = new ForkJoinPool(4); // 使用 4 个线程
// 方式三:使用公共池(推荐!)
ForkJoinPool commonPool = ForkJoinPool.commonPool();
关于公共池(commonPool):从 JDK 8 开始,ForkJoinPool 提供了一个静态的 commonPool() 方法,返回一个全局共享的线程池实例。官方强烈推荐大多数应用程序使用这个公共池,因为它可以节省资源,让多个 ForkJoin 任务共享同一个线程池,避免创建大量线程。公共池的线程在空闲时会慢慢回收,需要时再重新创建。Parallel Stream 底层使用的正是这个公共池。
| 方法 | 描述 |
|---|---|
execute(ForkJoinTask) | 异步执行任务,无返回值 |
submit(ForkJoinTask) | 异步执行任务,返回 Future 对象 |
invoke(ForkJoinTask) | 同步执行任务,等待任务完成并返回结果 |
invokeAll(ForkJoinTask...) | 批量提交多个子任务,等待所有完成 |
ForkJoinTask 是提交给 ForkJoinPool 执行的任务的基类。它提供了 fork()、join() 等核心方法,并实现了 Future 接口。在实际开发中,我们几乎从不直接继承 ForkJoinTask,而是继承它的两个抽象子类:
适用于需要返回计算结果的任务,如数组求和、斐波那契数列计算。
核心方法:protected abstract V compute(),你需要在这个方法中实现任务的分解和计算逻辑。
适用于只需要执行动作而不需要返回结果的任务,如遍历目录、数组元素批量修改。
核心方法:protected abstract void compute()。
ForkJoinWorkerThread),或者推入 ForkJoinPool 的提交队列。fork() 方法会立即返回,不会阻塞。关键理解:fork() 和 join() 的配对使用,加上工作窃取机制,使得 ForkJoin 框架能够用少量线程高效处理大量有依赖关系的任务。
这是执行 ForkJoinTask 的线程。每个工作线程都关联着一个自己的双端队列,用于存放它 fork 出来的子任务。工作线程的生命周期由 ForkJoinPool 统一管理。
理论知识讲完了,现在通过三个由浅入深的实战案例,带你掌握 ForkJoin 的具体用法。
这是 ForkJoin 最经典的入门案例。我们计算一个超大数组中所有元素的和。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 使用 Fork/Join 计算数组求和
*/
public class ArraySumCalculator extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 10000; // 阈值:当数组长度小于此值时,不再拆分
public ArraySumCalculator(int[] array) {
this(array, 0, array.length);
}
private ArraySumCalculator(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// 1. 如果任务足够小,直接计算(不再分解)
if (length <= THRESHOLD) {
return computeDirectly();
}
// 2. 任务拆分
start + length / ;
(array, start, mid);
(array, mid, end);
leftTask.fork();
rightTask.compute();
leftTask.join();
leftResult + rightResult;
}
{
;
( start; i < end; i++) {
sum += array[i];
}
sum;
}
{
[] array = [];
( ; i < array.length; i++) {
array[i] = i + ;
}
();
(array);
System.currentTimeMillis();
pool.invoke(task);
System.currentTimeMillis();
System.out.println( + result);
System.out.println( + (endTime - startTime) + );
() array.length * (array.length + ) / ;
System.out.println( + result.equals(expected));
pool.shutdown();
}
}
fork() 了左任务,而右任务由当前线程同步执行。这是一种常见的优化写法,比同时 fork 两个任务再 join 更高效。错误的写法:
leftTask.fork();
rightTask.fork(); // 这样效率低下!
Long leftResult = leftTask.join();
Long rightResult = rightTask.join();
这种写法会先 fork 两个子任务,然后 join 等待。问题是:fork 之后,两个子任务都进入了工作队列,等待被其他空闲线程窃取执行。如果此时有空闲线程,没问题;但如果没有空闲线程,而当前线程又在 join 等待,就会浪费一个线程资源。正确的做法是:fork 一个任务,然后当前线程同步执行另一个任务,这样就能确保当前线程在等待期间不会闲着。
斐波那契数列是一个天然的递归问题,非常适合用 ForkJoin 来演示。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
public FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
// 创建子任务:f(n-1) 和 f(n-2)
FibonacciTask f1 = new FibonacciTask(n - 1);
FibonacciTask f2 = new FibonacciTask(n - 2);
// 异步执行 f1
f1.fork();
// 同步执行 f2
int result2 = f2.compute();
// 获取 f1 的结果
int result1 = f1.join();
return result1 + result2;
}
public static void main(String[] args) {
ForkJoinPool pool ();
;
pool.invoke( (n));
System.out.println( + n + + result);
}
}
注意:虽然这个例子展示了 ForkJoin 的使用,但斐波那契数列并不适合用 ForkJoin,因为它的计算量太小,而任务拆分开销太大,实际性能比普通递归还差。这个例子仅用于理解 API。
这个案例更有实际意义:统计一个目录及其子目录下所有.java 文件的数量。由于不需要返回值,我们使用 RecursiveAction。
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class FileCounter extends RecursiveAction {
private final File directory;
private final String extension;
private int count = 0; // 统计结果
public FileCounter(File directory, String extension) {
this.directory = directory;
this.extension = extension;
}
public int getCount() {
return count;
}
@Override
protected void compute() {
File[] files = directory.listFiles();
if (files == null) return;
List<FileCounter> subTasks = new ArrayList<>();
for (File file : files) {
if (file.isDirectory()) {
// 创建子任务处理子目录
FileCounter subTask = new FileCounter(file, extension);
subTask.fork();
subTasks.add(subTask);
} (file.getName().endsWith(extension)) {
count++;
}
}
(FileCounter subTask : subTasks) {
subTask.join();
count += subTask.getCount();
}
}
{
();
( (), );
pool.invoke(task);
System.out.println( + task.getCount() + );
}
}
ForkJoin 框架最适合解决以下类型的问题:
| 场景类型 | 示例 | 说明 |
|---|---|---|
| 计算密集型任务 | 大数组数学运算、矩阵乘法 | 任务需要大量 CPU 计算,分解后可以并行加速 |
| 可递归分解的任务 | 归并排序、快速排序、文件遍历 | 天然的分治结构 |
| 任务之间相互独立 | 图像处理(每个像素独立) | 无需同步,没有数据竞争 |
| 任务粒度适中 | 每个子任务计算量在数万到数百万次操作 | 太细则调度开销大,太粗则并行度不足 |
| 场景类型 | 原因 |
|---|---|
| I/O 密集型任务 | 线程会在 I/O 操作时阻塞,浪费 CPU,且工作窃取无法发挥作用 |
| 需要频繁同步的任务 | 锁竞争会抵消并行带来的好处 |
| 任务粒度太细 | 创建任务、调度、合并的开销超过计算本身 |
| 无法分解的串行任务 | 分治思想的前提就是可以分解 |
对于 I/O 密集型任务,可以考虑配合 ManagedBlocker 使用,或者改用 CompletableFuture。
阈值的选择是 ForkJoin 调优的关键。没有固定的公式,一般遵循以下原则:
getSurplusQueuedTaskCount() 方法判断当前线程的负载,动态决定是否继续拆分。如果在 compute() 方法中执行了 Thread.sleep()、等待 I/O 等阻塞操作,会导致工作线程被阻塞,无法执行其他任务,严重降低并行效率。解决方案是使用 ForkJoinPool.ManagedBlocker 接口,或者将阻塞部分放在 CompletableFuture 中处理。
// 错误的做法:fork 了子任务却没有 join
leftTask.fork();
rightTask.fork(); // 这里应该 join,但没有
递归调用太深可能导致 StackOverflowError。可以适当增大阈值,或者采用非递归的实现方式。
invokeAll() 方法是批量提交任务的便捷方式,它会等待所有任务完成。但要注意,invokeAll() 内部已经包含了 fork 操作,不需要再对子任务调用 fork。
// 正确用法
invokeAll(leftTask, rightTask); // 然后通过 leftTask.join() 获取结果
ForkJoinTask 在执行过程中可能抛出异常,但异常不会直接传播给调用者。需要通过 isCompletedAbnormally() 和 getException() 方法检查异常。
if (task.isCompletedAbnormally()) {
Throwable ex = task.getException();
ex.printStackTrace();
}
虽然 ForkJoin 框架内部不会死锁,但如果你在任务中等待其他任务的结果时形成了循环依赖,仍然可能死锁。确保任务依赖关系是树形的,而不是环形的。
为了充分发挥 ForkJoin 的性能,可以采取以下优化措施:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N 调整公共池的并行度。compute() 而不是 fork().join():如前面案例所示,fork 一个任务,同步执行另一个,可以减少任务调度开销。ForkJoinPool.commonPool()。getPoolSize()、getActiveThreadCount() 等方法监控线程池状态,分析性能瓶颈。从 JDK 8 开始,Stream API 引入了并行流(.parallelStream()),其底层正是基于 ForkJoin 框架的公共池实现的。
// 使用并行流计算数组和
long sum = Arrays.stream(array).parallel().sum();
并行流封装了 ForkJoin 的复杂性,让开发者可以用声明式的方式编写并行代码。但需要注意的是,并行流默认使用公共池,对于某些 I/O 操作或阻塞操作,可能不是最佳选择。
CompletableFuture 是 JDK 8 引入的异步编程工具,它内部使用了 ForkJoinPool.commonPool() 作为默认的异步执行器。你可以通过 thenApplyAsync()、thenComposeAsync() 等方法指定使用自定义的线程池,包括 ForkJoinPool。
| 框架 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| ForkJoin | 可分解的计算密集型任务 | 高效利用 CPU,自动负载均衡 | 不适合 I/O 任务 |
| ThreadPoolExecutor | 通用任务处理 | 灵活,可定制 | 处理依赖任务复杂 |
| CompletableFuture | 异步任务编排 | 功能强大,支持链式调用 | 学习曲线较陡 |
| Parallel Stream | 集合数据处理 | 声明式,简洁 | 控制粒度较粗 |
ForkJoinPool 内部维护了一个 WorkQueue 数组 workQueues。每个 WorkQueue 是一个双端队列,存储着 ForkJoinTask。工作线程与队列的对应关系是:
ForkJoinPool 还维护了一个复杂的控制信号量 ctl,用于管理线程的状态(活跃、等待、终止等)。
当工作线程自己的队列为空时,会调用 scan() 方法尝试窃取。它会随机选择一个其他线程的队列,从尾部获取一个任务。为了防止竞争,这个操作使用了 Unsafe 的 CAS 方法。
如果窃取也失败,线程会进入等待状态,将自己挂起。当有新任务提交时,挂起的线程会被唤醒。
当我们调用 pool.invoke(task) 时,流程如下:
ForkJoinPool 的外部提交队列(偶数下标)fork() 会将子任务放入当前工作线程自己的队列(奇数下标)

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online