Mono 接口基础与实战
Mono 是 Project Reactor 响应式编程库中的核心接口之一,代表一个异步的、可能包含零个或一个元素的流。在响应式范式中,Mono 专门用于处理那些预期最多只有一个结果的异步操作。它遵循 Reactive Streams 规范,支持非阻塞的回压机制,让开发者能以声明式的方式构建高效的异步应用。
Mono 与 Flux 的区别
Mono 和 Flux 都是 Project Reactor 的核心抽象,但针对场景不同:
- Mono:处理零个或一个元素的异步序列。适合返回单个结果的异步操作,比如查询数据库中的一条记录、调用返回单个对象的 REST API,或者执行可能成功或失败但只产生单一结果的操作。
- Flux:处理零个或多个元素的异步序列。适合流式数据、集合操作或可能返回多个结果的场景,如消息队列消费、文件读取或数据库查询返回多条记录。
关键区别在于元素的数量预期:Mono 至多一个,而 Flux 是零到多个。
Mono 的使用
创建 Mono 的常见方法
创建 Mono 有多种方式,适应不同的业务场景:
// 1. 从确定值创建
Mono<String> mono1 = Mono.just("Hello, World!");
// 2. 创建空 Mono
Mono<String> mono2 = Mono.empty();
// 3. 从可能为 null 的值创建(如果 value 为 null,则返回空 Mono)
Mono<String> mono3 = Mono.justOrNull(someNullableValue);
// 4. 从可调用对象创建(延迟计算)
Mono<String> mono4 = Mono.fromCallable(() -> {
// 这里可以执行可能抛出异常的计算
return expensiveComputation();
});
// 5. 从 Future 创建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Result");
Mono<String> mono5 = Mono.fromFuture(future);
// 6. 从 Supplier 创建
Mono<String> mono6 = Mono.fromSupplier(() -> generateValue());
// 7. 创建错误
Mono<String> mono7 = Mono.error(new RuntimeException("Something went wrong"));
// 8. 延迟创建(直到有订阅者时才执行)
Mono<String> mono8 = Mono.defer(() -> {
return Mono.just(System.currentTimeMillis() + " - generated");
});
每种工厂方法都有其适用场景。just() 用于已知常量,fromCallable() 适合封装可能抛异常的计算逻辑,而 defer() 则确保每次订阅都重新触发计算,避免共享状态带来的副作用。
Mono 的操作符与链式调用
Mono 提供了丰富的操作符,支持声明式的链式调用:
Mono<User> userMono = getUserById(userId)
.filter(user -> user.isActive())
.map(user -> {
();
dto.setName(user.getName());
dto.setEmail(user.getEmail());
dto;
})
.flatMap(dto -> {
sendNotification(dto.getEmail())
.map(success -> dto);
})
.doOnNext(dto -> {
log.info(, dto.getName());
})
.doOnError(error -> {
log.error(, error);
})
.timeout(Duration.ofSeconds())
.retry();


