Java 线程池(第十篇):(收官篇)CompletableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作

completableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作

如果说前 1–9 篇解决的是 “线程池如何安全、稳定地跑”
那么这一篇解决的是:

如何把多个异步任务“编排”成一个可读、可控、可维护的并发流程。

这正是现代 Java 并发从 ThreadPoolExecutor → CompletableFuture 的进化方向。

一、为什么需要 CompletableFuture?

先看一个你一定写过的代码:

Future<User> f1 = pool.submit(() -> this.loadUser()); Future<User> f2 = pool.submit(() -> this.loadUser()); User user = f1.get(); Order order = f2.get(); 

问题很明显:

  • get() 阻塞
  • 顺序代码读起来像同步
  • 异常处理零散
  • 任务依赖一多,代码迅速失控

CompletableFuture 的核心价值只有一句话:

用“声明式”的方式,描述异步任务之间的关系,而不是用 get() 等结果。

二、CompletableFuture 和线程池的关系(先搞清楚)

1️⃣ CompletableFuture ≠ 线程池

  • CompletableFuture 是 异步任务编排工具
  • 线程池是 执行引擎

2️⃣ 默认线程池是 ForkJoinPool(不推荐直接用)

CompletableFuture.supplyAsync(() -> work()); 

默认使用:

ForkJoinPool.commonPool() 

👉 生产环境强烈建议:显式传入你自己的线程池(第五篇)

CompletableFuture.supplyAsync(() -> work(), ioPool); 

三、最核心的三种编排模式(80% 场景)

1️⃣ thenApply —— 单任务链式变换

CompletableFuture .supplyAsync(() -> 1, pool) .thenApply(x -> x + 1) .thenApply(x -> x * 2) .thenAccept(System.out::println); 
  • 同一条任务链
  • 上一步完成 → 执行下一步
  • 适合 数据转换

2️⃣ thenCompose —— 依赖型异步(避免嵌套)

CompletableFuture<User> f = loadUserAsync(id) .thenCompose(user -> loadProfileAsync(user)); 

等价于(但比它优雅得多):

CompletableFuture<User> f = loadUserAsync(id) .thenApply(user -> loadProfileAsync(user)) .get(); // ❌ 

一句话:

thenCompose = “异步版的 flatMap”

3️⃣ thenCombine —— 并行任务结果合并(非常常用)

CompletableFuture<User> userFuture = loadUserAsync(id); CompletableFuture<Order> orderFuture = loadOrderAsync(id); CompletableFuture<UserInfo> result = userFuture.thenCombine(orderFuture, (user, order) -> new UserInfo(user, order)); 

✔ 并行执行
✔ 都完成后才合并
✔ 没有 get()

四、allOf / anyOf:真正的“并行编排”

allOf:全部完成

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); 

⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)

List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); 

anyOf:任意一个完成

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); 

⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)

List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); 

anyOf:任意一个完成

CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2); any.thenAccept(r -> System.out.println("first = " + r)); 

常用于:

  • 多数据源兜底
  • 多节点竞速

五、异常处理:这是 CompletableFuture 的强项

1️⃣ exceptionally —— 兜底恢复

CompletableFuture .supplyAsync(() -> risky(), pool) .exceptionally(e -> { log.error("error", e); return defaultValue; }); 

2️⃣ handle —— 成功 / 失败都处理

future.handle((r, e) -> { if (e != null) { return fallback; } return r; }); 

一句工程经验:

CompletableFuture 的异常是“流的一部分”,不是打断流程。

六、超时控制(非常关键)

Java 9+ 推荐方式

future .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> fallback); 

或者:

future .completeOnTimeout(fallback, 2, TimeUnit.SECONDS); 

比 Future.get(timeout) 的优势:

  • 不阻塞线程
  • 超时是异步语义的一部分

七、CompletableFuture + 线程池的最佳实践

✔ 1)明确线程池职责

  • IO 任务 → ioPool
  • CPU 任务 → cpuPool
  • 定时 → scheduledPool

CompletableFuture.supplyAsync(this::loadData, ioPool)

✔ 2)不要在 CompletableFuture 里 get()

// ❌ 反模式 future.thenApply(r -> anotherFuture.get()); 

✔ 3)异常必须收敛在链路末端

future .thenApply(...) .thenApply(...) .exceptionally(this::fallback); 

✔ 4)避免在 commonPool 跑重任务

ForkJoinPool 是共享资源,容易拖垮 JVM。

八、一个完整实战 Demo

补充知识点:

Java 的“高阶函数”到底是什么:Runnable / Callable 就是函数参数

CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(this::loadUser, ioPool); CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(this::loadOrder, ioPool); CompletableFuture<UserInfo> result = userFuture .thenCombine(orderFuture, UserInfo::new) .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> { log.error("timeout or error", e); return UserInfo.empty(); }); result.thenAccept(info -> render(info)); 

这个 Demo 覆盖了:

  • 并行
  • 合并
  • 超时
  • 异常
  • 自定义线程池

九、和前 9 篇的“闭环关系”

你现在拥有的是一套完整体系:

  • 线程池(1–5)
  • 任务提交与异常(6–7)
  • 可观测性(8)
  • 背压(9)
  • 异步编排(10) ← 收官

一句总结:

ThreadPoolExecutor 决定“系统能不能跑”,
CompletableFuture 决定“并发代码能不能写得优雅、可维护”。

十、全专栏终极总结

线程池是并发执行的基础设施背压决定系统是否稳定监控决定问题是否可见CompletableFuture 决定异步逻辑是否可维护并发不是“多开线程”,而是“正确组织任务关系”

到这里,已经是一个完整、工程级、的体系。

Read more

C++:继承

C++:继承

Hello大家好! 很高兴与大家见面! 给生活添点快乐,开始今天的编程之路。 我的博客:<但愿. 我的专栏:C语言、题目精讲、算法与数据结构、C++ 欢迎点赞,关注 目录   一 继承的概念及定义        1.1继承的概念        1.2继承的定义               1.2.1定义格式               1.2.2类继承基类方式改变对应成员访问⽅式的变化               1.2.3  继承类模板【类继承类似】      二 基类和派⽣类间的转换          2.1不同的转换方式                 2.1.1会产生临时变量                 2.1.2不会产生临时变量(基类和派⽣类间的转换)                         2.1.2.1不会产生临时变量(

By Ne0inhk

C++物理引擎碰撞检测实战指南(从零搭建高精度检测系统)

第一章:C++物理引擎碰撞检测概述 在开发高性能的C++物理引擎时,碰撞检测是实现真实交互的核心模块之一。它负责判断两个或多个物体在虚拟空间中是否发生接触或穿透,从而触发后续的响应计算,如反弹、摩擦或形变。 基本原理与挑战 碰撞检测通常分为两个阶段:粗略检测(Broad Phase)和精细检测(Narrow Phase)。前者利用空间划分结构快速排除不可能相交的对象对,后者则精确计算潜在碰撞对象之间的几何交集。 * 粗略检测常用算法包括AABB树、网格哈希和四叉树 * 精细检测依赖于GJK、SAT或Minkowski和等数学方法 * 实时性要求高,需在每帧毫秒级内完成所有检测任务 典型AABB碰撞检测实现 轴对齐包围盒(AABB)是最基础且高效的碰撞判定方式,适用于大多数刚体模拟场景。以下是一个简单的二维AABB碰撞检测代码示例: // 定义AABB结构体 struct AABB { float minX, maxX; float minY, maxY; }; // 检测两个AABB是否重叠 bool checkCollision(const AABB& a, c

By Ne0inhk
【C++】内存管理

【C++】内存管理

生活是属于每个人自己的感受,不属于任何人的看法。 前言   这是我自己学习C++的第四篇博客总结。后期我会继续把C++学习笔记开源至博客上。   上一期笔记是关于C++的类和对象,没看的同学可以过去看看: 【C++】类和对象(二)-ZEEKLOG博客https://blog.ZEEKLOG.net/hsy1603914691/article/details/146941363 C/C++的内存分布  1. 栈----存放非静态局部变量、函数参数、返回值等等,栈是向下增长的。2. 内存映射段----是高效的I/O映射方式,用于装载一个共享的动态内存库。用户可使用系统接口创建共享共享内存,做进程间通信。3. 堆----用于程序运行时动态内存分配,堆是可向上增长的。4. 数据段(静态区)--存储全局变量和静态变量。5. 代码段(常量区)--存放可执行的代码和只读常量。 C语言中动态内存管理方式  1. malloc、calloc、

By Ne0inhk

C++ max函数超超超详细

C++ max函数超超超详细 一、简介 在C++编程中,max函数是一个非常有用的工具,主要用于找出两个或多个对象中的最大值。它存在于标准库中,在不同的头文件中包含不同的重载形式,不过主要涉及到<algorithm>和<utility>头文件。这个函数在处理各种数据类型,无论是基本数据类型还是自定义类型时都非常有用。 二、基础用法 (一)两个参数的基本类型 1. 当处理两个基本数据类型(如int、double等)时,std::max的用法相对简单。 * 代码解释: * 首先包含了<iostream>用于输入输出操作,<algorithm>用于使用std::max函数。 * 在main函数中定义了两个int类型的变量a和b,分别赋值为5和10。 * 然后调用std::max(a, b),这个函数会比较a和b的大小,

By Ne0inhk