Java 线程池(第十篇):(收官篇)CompletableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作
completableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作
如果说前 1–9 篇解决的是 “线程池如何安全、稳定地跑”,
那么这一篇解决的是:
如何把多个异步任务“编排”成一个可读、可控、可维护的并发流程。
这正是现代 Java 并发从 ThreadPoolExecutor → CompletableFuture 的进化方向。
一、为什么需要 CompletableFuture?
先看一个你一定写过的代码:
Future<User> f1 = pool.submit(() -> this.loadUser()); Future<User> f2 = pool.submit(() -> this.loadUser()); User user = f1.get(); Order order = f2.get(); 问题很明显:
- get() 阻塞
- 顺序代码读起来像同步
- 异常处理零散
- 任务依赖一多,代码迅速失控
CompletableFuture 的核心价值只有一句话:
用“声明式”的方式,描述异步任务之间的关系,而不是用 get() 等结果。
二、CompletableFuture 和线程池的关系(先搞清楚)
1️⃣ CompletableFuture ≠ 线程池
- CompletableFuture 是 异步任务编排工具
- 线程池是 执行引擎
2️⃣ 默认线程池是 ForkJoinPool(不推荐直接用)
CompletableFuture.supplyAsync(() -> work()); 默认使用:
ForkJoinPool.commonPool() 👉 生产环境强烈建议:显式传入你自己的线程池(第五篇)
CompletableFuture.supplyAsync(() -> work(), ioPool); 三、最核心的三种编排模式(80% 场景)
1️⃣ thenApply —— 单任务链式变换
CompletableFuture .supplyAsync(() -> 1, pool) .thenApply(x -> x + 1) .thenApply(x -> x * 2) .thenAccept(System.out::println); - 同一条任务链
- 上一步完成 → 执行下一步
- 适合 数据转换
2️⃣ thenCompose —— 依赖型异步(避免嵌套)
CompletableFuture<User> f = loadUserAsync(id) .thenCompose(user -> loadProfileAsync(user)); 等价于(但比它优雅得多):
CompletableFuture<User> f = loadUserAsync(id) .thenApply(user -> loadProfileAsync(user)) .get(); // ❌ 一句话:
thenCompose = “异步版的 flatMap”
3️⃣ thenCombine —— 并行任务结果合并(非常常用)
CompletableFuture<User> userFuture = loadUserAsync(id); CompletableFuture<Order> orderFuture = loadOrderAsync(id); CompletableFuture<UserInfo> result = userFuture.thenCombine(orderFuture, (user, order) -> new UserInfo(user, order)); ✔ 并行执行
✔ 都完成后才合并
✔ 没有 get()
四、allOf / anyOf:真正的“并行编排”
allOf:全部完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); ⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)
List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); anyOf:任意一个完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); ⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)
List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); anyOf:任意一个完成
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2); any.thenAccept(r -> System.out.println("first = " + r)); 常用于:
- 多数据源兜底
- 多节点竞速
五、异常处理:这是 CompletableFuture 的强项
1️⃣ exceptionally —— 兜底恢复
CompletableFuture .supplyAsync(() -> risky(), pool) .exceptionally(e -> { log.error("error", e); return defaultValue; }); 2️⃣ handle —— 成功 / 失败都处理
future.handle((r, e) -> { if (e != null) { return fallback; } return r; }); 一句工程经验:
CompletableFuture 的异常是“流的一部分”,不是打断流程。
六、超时控制(非常关键)
Java 9+ 推荐方式
future .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> fallback); 或者:
future .completeOnTimeout(fallback, 2, TimeUnit.SECONDS); 比 Future.get(timeout) 的优势:
- 不阻塞线程
- 超时是异步语义的一部分
七、CompletableFuture + 线程池的最佳实践
✔ 1)明确线程池职责
- IO 任务 → ioPool
- CPU 任务 → cpuPool
- 定时 → scheduledPool
CompletableFuture.supplyAsync(this::loadData, ioPool)
✔ 2)不要在 CompletableFuture 里 get()
// ❌ 反模式 future.thenApply(r -> anotherFuture.get()); ✔ 3)异常必须收敛在链路末端
future .thenApply(...) .thenApply(...) .exceptionally(this::fallback); ✔ 4)避免在 commonPool 跑重任务
ForkJoinPool 是共享资源,容易拖垮 JVM。
八、一个完整实战 Demo
补充知识点:
Java 的“高阶函数”到底是什么:Runnable / Callable 就是函数参数
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(this::loadUser, ioPool); CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(this::loadOrder, ioPool); CompletableFuture<UserInfo> result = userFuture .thenCombine(orderFuture, UserInfo::new) .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> { log.error("timeout or error", e); return UserInfo.empty(); }); result.thenAccept(info -> render(info)); 这个 Demo 覆盖了:
- 并行
- 合并
- 超时
- 异常
- 自定义线程池
九、和前 9 篇的“闭环关系”
你现在拥有的是一套完整体系:
- 线程池(1–5)
- 任务提交与异常(6–7)
- 可观测性(8)
- 背压(9)
- 异步编排(10) ← 收官
一句总结:
ThreadPoolExecutor 决定“系统能不能跑”,
CompletableFuture 决定“并发代码能不能写得优雅、可维护”。
十、全专栏终极总结
线程池是并发执行的基础设施背压决定系统是否稳定监控决定问题是否可见CompletableFuture 决定异步逻辑是否可维护并发不是“多开线程”,而是“正确组织任务关系”
到这里,已经是一个完整、工程级、的体系。