CompletableFuture 的本质是将等待和执行分离,让多个任务可以并行执行,最后统一汇总结果。这种设计让异步编程从复杂的手动管理变成了简单的链式组合。
一、实战场景(Practical Scenarios):从简单到复杂
通过实际场景理解 CompletableFuture 的应用,从多数据源并行查询到异步任务链式处理,每个场景都展示了 CompletableFuture 解决实际问题的能力。
本文介绍 Java CompletableFuture 的实战应用。涵盖多数据源并行查询、异步任务链式处理、超时降级及批量处理四大场景。阐述线程池管理、异常处理及避免阻塞的最佳实践。提供性能优化建议,包括合理配置线程池、避免过度嵌套及批量处理优化,帮助开发者提升系统并发性能与稳定性。
CompletableFuture 的本质是将等待和执行分离,让多个任务可以并行执行,最后统一汇总结果。这种设计让异步编程从复杂的手动管理变成了简单的链式组合。
通过实际场景理解 CompletableFuture 的应用,从多数据源并行查询到异步任务链式处理,每个场景都展示了 CompletableFuture 解决实际问题的能力。
并行查询多个数据源,总耗时约等于最慢的那个查询,而不是所有查询的累加。这是 CompletableFuture 最典型的应用场景。
public UserProfile getUserProfile(Long userId) {
// 并行查询三个数据源
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> userService.getUserInfo(userId)); // 耗时 200ms
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(() -> orderService.getUserOrders(userId)); // 耗时 300ms
CompletableFuture<PointsInfo> pointsFuture = CompletableFuture.supplyAsync(() -> pointsService.getUserPoints(userId)); // 耗时 150ms
// 等待所有查询完成并组合结果
return CompletableFuture.allOf(userFuture, ordersFuture, pointsFuture).thenApply(v -> {
UserProfile profile = new UserProfile();
profile.setUserInfo(userFuture.join());
profile.setOrders(ordersFuture.join());
profile.setPoints(pointsFuture.join());
return profile;
}).join(); // 总耗时:300ms(最慢的那个),而不是 650ms(串行)
}
性能提升:
适用场景:
将多个异步任务串联,前一个任务的输出作为后一个任务的输入,形成清晰的处理链。这是理解 thenCompose 使用场景的关键示例。
public void processUserRequest(Long userId, String request) {
CompletableFuture.supplyAsync(() -> {
// 步骤 1: 验证用户
return userService.validateUser(userId);
}).thenCompose(isValid -> {
if (!isValid) {
throw new RuntimeException("用户验证失败");
}
// 步骤 2: 获取用户数据(异步)
return CompletableFuture.supplyAsync(() -> dataService.getUserData(userId));
}).thenAccept(data -> {
// 步骤 3: 发送通知
notificationService.sendNotification(userId, "数据处理完成");
}).exceptionally(ex -> {
// 异常处理
log.error("处理失败", ex);
notificationService.sendNotification(userId, "处理失败:" + ex.getMessage());
return null;
});
}
为什么使用 thenCompose 而不是 thenApply?
thenAccept 不会执行exceptionally 捕获并处理thenApply,这个查询会在验证任务的线程中同步执行,阻塞线程thenCompose,查询会在新的线程中异步执行,不阻塞对比示例:
// ❌ 错误用法:使用 thenApply(会产生嵌套的 Future)
.thenApply(isValid -> {
if (!isValid) {
throw new RuntimeException("用户验证失败");
}
return CompletableFuture.supplyAsync(() -> dataService.getUserData(userId));
// 返回类型:CompletableFuture<CompletableFuture<UserData>>
})
// ✅ 正确用法:使用 thenCompose(扁平化 Future)
.thenCompose(isValid -> {
if (!isValid) {
throw new RuntimeException("用户验证失败");
}
return CompletableFuture.supplyAsync(() -> dataService.getUserData(userId));
// 返回类型:CompletableFuture<UserData>
})
适用场景:
设置超时时间,超时后使用降级方案,提高系统可用性。这是生产环境中必须考虑的场景。
public String getDataWithFallback(String key) {
CompletableFuture<String> remoteFuture = CompletableFuture.supplyAsync(() -> remoteService.getData(key)); // 可能很慢或失败
CompletableFuture<String> localFuture = CompletableFuture.supplyAsync(() -> localCache.getData(key)); // 降级方案
// 设置超时时间
CompletableFuture<String> timeoutFuture = remoteFuture
.orTimeout(500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
log.warn("远程服务超时,使用本地缓存");
return localFuture.join();
}
return localFuture.join();
});
return timeoutFuture.join();
}
适用场景:
批量提交任务,并行执行,统一收集结果,提高处理效率。这是 CompletableFuture 在批量处理场景中的典型应用。
public List<String> processBatchTasks(List<String> tasks) {
// 批量提交任务
List<CompletableFuture<String>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> processTask(task)))
.collect(Collectors.toList());
// 统一等待和收集
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
性能提升:
适用场景:
根据任务特点选择合适的线程池大小和类型,避免线程泄漏和资源浪费。这是 CompletableFuture 性能优化的关键。
// ✅ 正确:为 IO 密集型任务使用自定义线程池
ExecutorService ioExecutor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build()
);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// IO 密集型任务
return httpClient.get(url);
}, ioExecutor);
// ✅ 正确:为 CPU 密集型任务使用 ForkJoinPool
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
// CPU 密集型任务
return computeHeavyTask();
});
最佳实践:
ForkJoinPool,线程数 = CPU 核心数务必处理异常,避免异常被吞掉,影响业务逻辑。
// ✅ 正确:使用 exceptionally 处理异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 可能抛出异常的操作
return riskyOperation();
}).exceptionally(ex -> {
// 异常处理
log.error("操作失败", ex);
return "默认值";
});
// ✅ 正确:使用 handle 统一处理成功和异常
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果")
.handle((result, ex) -> {
if (ex != null) {
log.error("处理异常", ex);
return "异常处理";
}
return result;
});
最佳实践:
尽量使用回调而不是阻塞等待,避免阻塞主线程。这是 CompletableFuture 的核心优势之一。
// ❌ 错误:在主线程中阻塞等待
String result = future.get(); // 阻塞主线程
// ✅ 正确:使用回调处理
future.thenAccept(result -> {
// 处理结果,不阻塞主线程
processResult(result);
});
适用场景:
join() 等待,因为是后台任务根据任务特点选择合适的线程池大小和类型,避免资源浪费和性能瓶颈。这是性能优化的第一步。
ForkJoinPoolThreadPoolExecutor核心结论:链式调用不要过深,影响可读性和性能。扁平化处理可以让代码更清晰,性能更好。
// ❌ 过度嵌套
CompletableFuture.supplyAsync(() -> ...)
.thenCompose(r1 -> CompletableFuture.supplyAsync(() -> ...)
.thenCompose(r2 -> CompletableFuture.supplyAsync(() -> ...)
.thenCompose(r3 -> ...)));
// ✅ 扁平化处理
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> ...);
CompletableFuture<String> f2 = f1.thenCompose(r1 -> CompletableFuture.supplyAsync(() -> ...));
CompletableFuture<String> f3 = f2.thenCompose(r2 -> CompletableFuture.supplyAsync(() -> ...));
// 批量提交任务
List<CompletableFuture<String>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> processTask(task)))
.collect(Collectors.toList());
// 统一等待和收集
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online