Java 线程池(第十篇):(收官篇)CompletableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作

completableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作

如果说前 1–9 篇解决的是 “线程池如何安全、稳定地跑”
那么这一篇解决的是:

如何把多个异步任务“编排”成一个可读、可控、可维护的并发流程。

这正是现代 Java 并发从 ThreadPoolExecutor → CompletableFuture 的进化方向。

一、为什么需要 CompletableFuture?

先看一个你一定写过的代码:

Future<User> f1 = pool.submit(() -> this.loadUser()); Future<User> f2 = pool.submit(() -> this.loadUser()); User user = f1.get(); Order order = f2.get(); 

问题很明显:

  • get() 阻塞
  • 顺序代码读起来像同步
  • 异常处理零散
  • 任务依赖一多,代码迅速失控

CompletableFuture 的核心价值只有一句话:

用“声明式”的方式,描述异步任务之间的关系,而不是用 get() 等结果。

二、CompletableFuture 和线程池的关系(先搞清楚)

1️⃣ CompletableFuture ≠ 线程池

  • CompletableFuture 是 异步任务编排工具
  • 线程池是 执行引擎

2️⃣ 默认线程池是 ForkJoinPool(不推荐直接用)

CompletableFuture.supplyAsync(() -> work()); 

默认使用:

ForkJoinPool.commonPool() 

👉 生产环境强烈建议:显式传入你自己的线程池(第五篇)

CompletableFuture.supplyAsync(() -> work(), ioPool); 

三、最核心的三种编排模式(80% 场景)

1️⃣ thenApply —— 单任务链式变换

CompletableFuture .supplyAsync(() -> 1, pool) .thenApply(x -> x + 1) .thenApply(x -> x * 2) .thenAccept(System.out::println); 
  • 同一条任务链
  • 上一步完成 → 执行下一步
  • 适合 数据转换

2️⃣ thenCompose —— 依赖型异步(避免嵌套)

CompletableFuture<User> f = loadUserAsync(id) .thenCompose(user -> loadProfileAsync(user)); 

等价于(但比它优雅得多):

CompletableFuture<User> f = loadUserAsync(id) .thenApply(user -> loadProfileAsync(user)) .get(); // ❌ 

一句话:

thenCompose = “异步版的 flatMap”

3️⃣ thenCombine —— 并行任务结果合并(非常常用)

CompletableFuture<User> userFuture = loadUserAsync(id); CompletableFuture<Order> orderFuture = loadOrderAsync(id); CompletableFuture<UserInfo> result = userFuture.thenCombine(orderFuture, (user, order) -> new UserInfo(user, order)); 

✔ 并行执行
✔ 都完成后才合并
✔ 没有 get()

四、allOf / anyOf:真正的“并行编排”

allOf:全部完成

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); 

⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)

List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); 

anyOf:任意一个完成

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); 

⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)

List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); 

anyOf:任意一个完成

CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2); any.thenAccept(r -> System.out.println("first = " + r)); 

常用于:

  • 多数据源兜底
  • 多节点竞速

五、异常处理:这是 CompletableFuture 的强项

1️⃣ exceptionally —— 兜底恢复

CompletableFuture .supplyAsync(() -> risky(), pool) .exceptionally(e -> { log.error("error", e); return defaultValue; }); 

2️⃣ handle —— 成功 / 失败都处理

future.handle((r, e) -> { if (e != null) { return fallback; } return r; }); 

一句工程经验:

CompletableFuture 的异常是“流的一部分”,不是打断流程。

六、超时控制(非常关键)

Java 9+ 推荐方式

future .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> fallback); 

或者:

future .completeOnTimeout(fallback, 2, TimeUnit.SECONDS); 

比 Future.get(timeout) 的优势:

  • 不阻塞线程
  • 超时是异步语义的一部分

七、CompletableFuture + 线程池的最佳实践

✔ 1)明确线程池职责

  • IO 任务 → ioPool
  • CPU 任务 → cpuPool
  • 定时 → scheduledPool

CompletableFuture.supplyAsync(this::loadData, ioPool)

✔ 2)不要在 CompletableFuture 里 get()

// ❌ 反模式 future.thenApply(r -> anotherFuture.get()); 

✔ 3)异常必须收敛在链路末端

future .thenApply(...) .thenApply(...) .exceptionally(this::fallback); 

✔ 4)避免在 commonPool 跑重任务

ForkJoinPool 是共享资源,容易拖垮 JVM。

八、一个完整实战 Demo

补充知识点:

Java 的“高阶函数”到底是什么:Runnable / Callable 就是函数参数

CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(this::loadUser, ioPool); CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(this::loadOrder, ioPool); CompletableFuture<UserInfo> result = userFuture .thenCombine(orderFuture, UserInfo::new) .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> { log.error("timeout or error", e); return UserInfo.empty(); }); result.thenAccept(info -> render(info)); 

这个 Demo 覆盖了:

  • 并行
  • 合并
  • 超时
  • 异常
  • 自定义线程池

九、和前 9 篇的“闭环关系”

你现在拥有的是一套完整体系:

  • 线程池(1–5)
  • 任务提交与异常(6–7)
  • 可观测性(8)
  • 背压(9)
  • 异步编排(10) ← 收官

一句总结:

ThreadPoolExecutor 决定“系统能不能跑”,
CompletableFuture 决定“并发代码能不能写得优雅、可维护”。

十、全专栏终极总结

线程池是并发执行的基础设施背压决定系统是否稳定监控决定问题是否可见CompletableFuture 决定异步逻辑是否可维护并发不是“多开线程”,而是“正确组织任务关系”

到这里,已经是一个完整、工程级、的体系。

Read more

AI驱动开发实战:基于飞算JavaAI的在线考试系统设计与实现

AI驱动开发实战:基于飞算JavaAI的在线考试系统设计与实现

摘要 在软件工程领域,我们正处在一个由人工智能驱动的深刻变革时代。AI不再仅仅是应用的功能,更逐渐成为缔造应用的强大工具。本文以飞算科技(CalEx Tech)举办的「飞算JavaAI炫技赛」为契机,完整记录了笔者如何选用“在线考试系统的设计与实现”这一课题,借助IntelliJ IDEA中的飞算JavaAI插件,从一句自然语言指令开始,历经AI驱动的需求分析、接口设计、数据库建模、全量代码生成,最终成功部署并上线一个功能完备的系统。本文旨在通过详实的步骤、深度的代码剖析和最终的成果展示,为同行者揭示AI辅助编程在真实项目中的巨大潜力与最佳实践。 第一章:序曲——为开发环境注入AI之力 任何伟大的工程都始于坚实的基础。在我们的AI编程之旅开启之前,首要任务是将我们的“利器”——飞算JavaAI插件,无缝集成到我们最熟悉的集成开发环境(IDE)IntelliJ IDEA中。 1.1 环境准备 在开始之前,请确保您的开发环境满足以下基本条件: * IDE: IntelliJ IDEA 2021.x 或更高版本。 * JDK: Java

By Ne0inhk
JAVA API (三):从基础爬虫构建到带条件数据提取 —— 详解 URL、正则与爬取策略

JAVA API (三):从基础爬虫构建到带条件数据提取 —— 详解 URL、正则与爬取策略

个人主页-爱因斯晨 文章专栏-Java学习 相关文章:API (一) 相关文章:API(二) 持续努力中,感谢支持 一、爬虫基础 (一)爬虫的基本概念 * 定义:爬虫是按照一定规则自动抓取网络信息的程序,在 Java 环境下,可借助URL、HttpURLConnection等 API 来实现。 * 应用场景:广泛应用于数据采集,如电商平台的价格监控、各类新闻的聚合;还可用于信息分析,如舆情监测等。 (二)Java 实现简单爬虫的步骤 建立网络连接:利用URL类确定目标网页的地址,再通过openConnection()方法获取HttpURLConnection对象。 URL url =newURL("https://example.com");HttpURLConnection conn =(HttpURLConnection) url.openConnection(); 设置请求参数:

By Ne0inhk

Java 线程池中 execute() 和 submit() 的区别(源码 & 实战全解析)

前言 在 Java 并发编程中,线程池是核心技术之一,而 execute() 和 submit() 是线程池最常用的两个方法。很多开发者只停留在表面认识——execute 抛异常,submit 返回 Future,但这种理解远远不够。 本文将从源码层面深度解析这两个方法的本质差异,并通过实战案例演示它们的适用场景。 一、核心差异一览 维度execute()submit()返回值voidFuture异常传播任务内异常会直接抛出到 UncaughtExceptionHandler,主线程无法感知异常被 FutureTask 捕获并存储,调用 get() 时才抛出 ExecutionException任务类型仅支持 Runnable支持 Runnable 和 Callable适用场景不关心结果的异步任务(如日志发送、数据清理)需要获取结果或处理异常的任务(如计算、RPC 调用)接口定义Executor 接口ExecutorService 接口 二、源码层面解析 2.1 submit(

By Ne0inhk
Java 大视界 -- 基于 Java+Storm 构建实时日志分析平台:从日志采集到告警可视化(440)

Java 大视界 -- 基于 Java+Storm 构建实时日志分析平台:从日志采集到告警可视化(440)

Java 大视界 -- 基于 Java+Storm 构建实时日志分析平台:从日志采集到告警可视化(440) * 引言: * 正文: * 一、实时日志分析平台的核心架构设计 * 1.1 架构分层与核心组件 * 1.2 组件选型的实战思考(10 余年经验沉淀,数据真实有出处) * 二、日志采集层:Flume 的高可用配置(生产级优化) * 2.1 Flume 的核心配置(抗住十万级 / 秒流量,注释完整) * 2.2 Flume 的高可用部署(避免单点故障,实战步骤清晰) * 2.2.1 多 Agent 冗余部署 * 2.2.2 Nginx

By Ne0inhk