跳到主要内容Java 响应式编程:Flux 与 SseEmitter 深度解析 | 极客日志JavaAIjava
Java 响应式编程:Flux 与 SseEmitter 深度解析
综述由AI生成深入探讨了 Java 中 Flux 和 SseEmitter 两种流式响应技术。文章首先分析了传统同步响应的痛点及流式响应的应用场景,如 AI 聊天机器人、大文件处理等。接着详细解析了 SseEmitter 的基本使用、生产级实现及底层 Servlet 异步机制,以及 Flux 的核心操作符、背压处理和底层 Reactive Streams 原理。通过性能对比测试,展示了 Flux 在高并发下的显著优势。最后提供了生产环境实战案例,包括项目改造、前端对接、异常处理与监控,并给出了常见问题解决方案及选型建议,帮助开发者根据实际需求选择合适的技术方案。
佛系玩家15 浏览 一、为什么需要 Flux 和 SseEmitter
1.1 传统同步响应的痛点
在传统的 Spring MVC 开发中,我们通常使用同步的请求 - 响应模式:
@PostMapping("/chat/message")
public Result<MessageVO> sendMessage(@RequestBody SendMessageReq req) {
String aiResponse = aiService.generateResponse(req.getContent());
return Result.success(new MessageVO(aiResponse));
}
存在的问题:
- 用户体验差:用户发送消息后需要等待很长时间才能看到完整回复
- 资源浪费:一个请求会长时间占用一个线程,降低服务器并发能力
- 超时风险:长时间处理可能触发 HTTP 超时(默认 30-60 秒)
- 无法感知进度:用户不知道系统是否在处理,容易误以为系统卡死
1.2 典型应用场景
以下场景迫切需要流式响应能力:
| 场景 | 问题描述 | 解决方案 |
|---|
| AI 聊天机器人 | GPT 类模型生成回复需要时间,逐字输出更友好 | Flux/SSE |
| 大文件处理 | 文件上传/下载进度实时反馈 | SSE |
| 实时监控 | 服务器指标、日志流实时推送 | SSE/WebSocket |
| 长任务执行 | 数据导入、报表生成等耗时操作的进度通知 | SSE |
| 实时通知 | 消息推送、订单状态更新 | SSE/WebSocket |
1.3 流式响应的价值
传统模式:客户端
流式模式:客户端
客户端 <
客户端 <
客户端 <
...
客户端 <
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- ✅ 用户立即看到响应,体验提升 300%
- ✅ 线程快速释放,服务器吞吐量提升 5-10 倍
- ✅ 支持超长响应,不受 HTTP 超时限制
- ✅ 实时反馈进度,降低用户焦虑
二、技术背景与概念
2.1 SSE (Server-Sent Events)
官方定义: SSE 是 HTML5 标准的一部分,允许服务器主动向客户端推送数据。
- 基于 HTTP 协议,无需额外协议支持
- 单向通信(服务器→客户端)
- 自动重连机制
- 支持事件 ID 和自定义事件类型
- 纯文本协议,简单易用
data: 这是第一条消息
data: 这是第二条消息
event: custom-event
data: {"message": "JSON 数据"}
id: 123
2.2 Reactive Streams 与 Flux
Reactive Streams 是 JVM 上的响应式编程规范,定义了 4 个核心接口:
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
public interface Subscription {
void request(long n);
void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Flux 是 Project Reactor 对 Publisher 的实现,代表 0-N 个元素的异步序列:
Flux<T>: 0..N 个元素的流
├─ onNext(T) * N : 发射 N 个元素
├─ onError(Throwable): 发生错误(终止)
└─ onComplete() : 完成信号(终止)
2.3 两者的关系
SseEmitter (Spring MVC 实现)
├─ 基于 Servlet 异步支持
├─ 适合传统 Spring MVC 项目
└─ 不依赖响应式框架
Flux (响应式流)
├─ 基于 Reactive Streams 规范
├─ 需要 Spring WebFlux 支持
└─ 完整的响应式编程能力
三、SseEmitter 深度解析
3.1 基本使用
3.1.1 简单示例
@RestController
@RequestMapping("/api/v1/chat")
public class ChatController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamMessage(@RequestParam String message) {
SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);
CompletableFuture.runAsync(() -> {
try {
String response = "这是一个流式响应示例";
for (char c : response.toCharArray()) {
emitter.send(String.valueOf(c));
Thread.sleep(100);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}
produces = MediaType.TEXT_EVENT_STREAM_VALUE:必须设置 Content-Type 为 text/event-stream
SseEmitter(timeout):超时时间,0 表示永不超时(不推荐)
CompletableFuture.runAsync():异步执行,避免阻塞 Tomcat 线程
emitter.complete():必须调用,否则客户端连接不会关闭
3.1.2 前端对接代码
const eventSource = new EventSource('/api/v1/chat/stream?message=你好');
eventSource.onmessage = (event) => {
console.log('收到数据:', event.data);
document.getElementById('response').innerText += event.data;
};
eventSource.onerror = (error) => {
console.error('连接错误:', error);
eventSource.close();
};
eventSource.addEventListener('complete', () => {
console.log('流式响应完成');
eventSource.close();
});
3.2 生产级实现
3.2.1 完整的聊天流式接口
@Slf4j
@RestController
@RequestMapping("/api/v1/chat")
@RequiredArgsConstructor
public class StreamChatController {
private final ChatService chatService;
private final ExecutorService executorService;
@PostMapping(value = "/conversations/{sessionId}/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(@PathVariable String sessionId, @RequestBody SendMessageReq req) {
long startTime = System.currentTimeMillis();
SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);
emitter.onCompletion(() -> {
log.info("SSE 连接正常完成,sessionId={}, cost={}ms", sessionId, System.currentTimeMillis() - startTime);
});
emitter.onTimeout(() -> {
log.warn("SSE 连接超时,sessionId={}", sessionId);
emitter.complete();
});
emitter.onError(throwable -> {
log.error("SSE 连接异常,sessionId={}, error={}", sessionId, throwable.getMessage(), throwable);
});
executorService.execute(() -> {
try {
emitter.send(SseEmitter.event().name("start").data(Map.of("messageId", generateMessageId())));
StringBuilder fullResponse = new StringBuilder();
chatService.streamGenerate(sessionId, req.getContent(), chunk -> {
try {
fullResponse.append(chunk);
emitter.send(SseEmitter.event().name("message").data(Map.of("content", chunk)));
} catch (IOException e) {
throw new RuntimeException("发送数据失败", e);
}
});
chatService.saveMessage(sessionId, fullResponse.toString());
emitter.send(SseEmitter.event().name("complete").data(Map.of(
"totalChunks", fullResponse.length(),
"costTime", System.currentTimeMillis() - startTime
)));
emitter.complete();
} catch (Exception e) {
log.error("流式生成失败", e);
try {
emitter.send(SseEmitter.event().name("error").data(Map.of("message", e.getMessage())));
} catch (IOException ioException) {
log.error("发送错误事件失败", ioException);
}
emitter.completeWithError(e);
}
});
return emitter;
}
private String generateMessageId() {
return "msg_" + System.currentTimeMillis();
}
}
3.2.2 线程池配置
@Configuration
public class AsyncConfig {
@Bean(name = "sseExecutor")
public ExecutorService sseExecutor() {
return new ThreadPoolExecutor(
10,
50,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("sse-executor-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
3.3 高级特性
3.3.1 自定义事件类型
emitter.send(SseEmitter.event()
.id("123")
.name("chat-message")
.data(messageData)
.comment("这是注释")
.reconnectTime(3000L));
eventSource.addEventListener('chat-message', (event) => {
const data = JSON.parse(event.data);
console.log('收到聊天消息:', data);
});
3.3.2 断线重连机制
@GetMapping(value = "/stream-with-resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamWithResume(@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
SseEmitter emitter = new SseEmitter();
int startIndex = lastEventId != null ? Integer.parseInt(lastEventId) : 0;
CompletableFuture.runAsync(() -> {
try {
for (int i = startIndex; i < 100; i++) {
emitter.send(SseEmitter.event()
.id(String.valueOf(i))
.data("数据块 " + i));
Thread.sleep(100);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
const eventSource = new EventSource('/stream-with-resume');
四、Flux 深度解析
4.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
注意: Flux 需要 Spring WebFlux 支持,可以与 Spring MVC 共存,但需要注意:
- WebFlux 运行在 Netty 上(默认),也可以配置为 Tomcat
- 两者可以同时存在,但推荐全面切换到 WebFlux 以发挥最大性能
4.2 基本使用
4.2.1 简单示例
@RestController
@RequestMapping("/api/v1/reactive")
public class ReactiveChatController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamMessage(@RequestParam String message) {
return Flux.interval(Duration.ofMillis(100))
.map(i -> "字符" + i)
.take(10)
.doOnComplete(() -> log.info("流完成"));
}
}
Flux.interval():周期性发射元素
map():转换元素
take(n):限制元素数量
doOnComplete():完成时的副作用操作
4.2.2 实际 AI 聊天场景
@RestController
@RequestMapping("/api/v1/reactive/chat")
@RequiredArgsConstructor
public class ReactiveStreamController {
private final ReactiveAIService aiService;
@PostMapping(value = "/conversations/{sessionId}/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<MessageChunk>> streamChat(@PathVariable String sessionId, @RequestBody SendMessageReq req) {
return aiService.streamGenerate(req.getContent())
.map(chunk -> ServerSentEvent.<MessageChunk>builder()
.event("message")
.data(new MessageChunk(chunk))
.build())
.concatWith(Flux.just(ServerSentEvent.<MessageChunk>builder()
.event("complete")
.data(new MessageChunk(""))
.build()))
.doOnSubscribe(sub -> log.info("客户端订阅,sessionId={}", sessionId))
.doOnComplete(() -> log.info("流完成,sessionId={}", sessionId))
.doOnError(e -> log.error("流异常,sessionId={}", sessionId, e));
}
}
@Data
@AllArgsConstructor
class MessageChunk {
private String content;
}
4.3 核心操作符详解
4.3.1 创建 Flux
Flux<String> flux1 = Flux.fromIterable(Arrays.asList("A", "B", "C"));
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux3 = Flux.fromStream(Stream.of("A", "B", "C"));
Flux<Integer> flux4 = Flux.generate(() -> 0,
(state, sink) -> {
sink.next(state);
if (state == 10) sink.complete();
return state + 1;
});
Flux<String> flux5 = Flux.create(sink -> {
externalService.onData(data -> sink.next(data));
externalService.onComplete(() -> sink.complete());
externalService.onError(error -> sink.error(error));
});
Flux<Long> flux6 = Flux.interval(Duration.ofSeconds(1));
4.3.2 转换操作符
Flux<String> source = Flux.just("hello", "world");
Flux<String> mapped = source.map(String::toUpperCase);
Flux<String> flatMapped = source.flatMap(word -> Flux.fromArray(word.split("")));
Flux<String> concatMapped = source.concatMap(word -> Flux.fromArray(word.split("")));
Flux<String> filtered = source.filter(s -> s.length() > 4);
Flux<String> distinct = Flux.just("A", "B", "A").distinct();
Flux<String> taken = source.take(5);
Flux<String> skipped = source.skip(2);
Flux<List<String>> buffered = source.buffer(10);
4.3.3 组合操作符
Flux<String> flux1 = Flux.just("A", "B");
Flux<String> flux2 = Flux.just("C", "D");
Flux<String> concat = Flux.concat(flux1, flux2);
Flux<String> merged = Flux.merge(flux1, flux2);
Flux<String> zipped = Flux.zip(flux1, flux2, (a, b) -> a + b);
Flux<String> combined = Flux.combineLatest(flux1, flux2, (a, b) -> a + b);
4.3.4 错误处理
Flux<String> flux = Flux.just("A", "B", "C").map(s -> {
if (s.equals("B")) throw new RuntimeException("错误");
return s;
});
Flux<String> handled1 = flux.onErrorReturn("默认值");
Flux<String> handled2 = flux.onErrorResume(e -> Flux.just("备用 1", "备用 2"));
Flux<String> handled3 = flux.onErrorContinue((e, obj) -> log.error("处理 {} 时出错:{}", obj, e.getMessage()));
Flux<String> retried = flux.retry(3);
Flux<String> retriedWhen = flux.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
4.4 背压(Backpressure)处理
背压是响应式编程的核心特性,用于处理生产者速度 > 消费者速度的情况。
Flux<Integer> fastProducer = Flux.range(1, 1000).delayElements(Duration.ofMillis(1));
Flux<Integer> buffered = fastProducer
.onBackpressureBuffer(100, dropped -> log.warn("丢弃:{}", dropped));
Flux<Integer> dropped = fastProducer
.onBackpressureDrop(dropped -> log.warn("丢弃:{}", dropped));
Flux<Integer> latest = fastProducer.onBackpressureLatest();
Flux<Integer> error = fastProducer.onBackpressureError();
4.5 完整生产案例
@Service
@Slf4j
@RequiredArgsConstructor
public class ReactiveAIService {
private final OpenAIClient openAIClient;
private final MessageRepository messageRepository;
public Flux<String> streamGenerate(String sessionId, String prompt) {
return Flux.create(sink -> {
String messageId = generateMessageId();
StringBuilder fullResponse = new StringBuilder();
try {
openAIClient.streamChatCompletion(prompt, new StreamCallback() {
@Override
public void onChunk(String chunk) {
fullResponse.append(chunk);
sink.next(chunk);
}
@Override
public void onComplete() {
saveMessage(sessionId, messageId, fullResponse.toString()).subscribe(
saved -> log.info("消息保存成功:{}", messageId),
error -> log.error("消息保存失败", error)
);
sink.complete();
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
});
} catch (Exception e) {
sink.error(e);
}
}).publishOn(Schedulers.boundedElastic())
.doOnSubscribe(sub -> log.info("开始生成,sessionId={}", sessionId))
.doOnComplete(() -> log.info("生成完成,sessionId={}", sessionId))
.doOnError(e -> log.error("生成失败,sessionId={}", sessionId, e));
}
private Mono<Message> saveMessage(String sessionId, String messageId, String content) {
return Mono.fromCallable(() -> {
Message message = new Message();
message.setMessageId(messageId);
message.setSessionId(sessionId);
message.setContent(content);
message.setRole(MessageRole.ASSISTANT);
return messageRepository.save(message);
}).subscribeOn(Schedulers.boundedElastic());
}
}
五、底层原理剖析
5.1 SseEmitter 底层原理
5.1.1 Servlet 异步机制
SseEmitter 基于 Servlet 3.0 的异步支持实现:
public class SseEmitter extends ResponseBodyEmitter {
public SseEmitter(Long timeout) {
super();
this.timeout = timeout;
}
@Override
protected void extendResponse(ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
outputMessage.getHeaders().setContentType(MediaType.TEXT_EVENT_STREAM);
outputMessage.getHeaders().setCacheControl(CacheControl.noCache());
}
}
public abstract class ResponseBodyEmitter {
private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(4);
private Handler handler;
private boolean complete;
public void send(Object object) throws IOException {
if (this.handler != null) {
try {
this.handler.send(object, null);
} catch (IOException ex) {
throw ex;
} catch (Throwable ex) {
throw new IllegalStateException("Failed to send " + object, ex);
}
} else {
this.earlySendAttempts.add(new DataWithMediaType(object, null));
}
}
public void complete() {
if (this.handler != null) {
this.handler.complete();
} else {
this.complete = true;
}
}
}
5.1.2 异步 Servlet 工作流程
1. 客户端发起请求
2. Tomcat 接收请求,分配线程 A 处理
3. Controller 返回 SseEmitter
4. Spring MVC 调用 AsyncContext.startAsync()
5. 线程 A 释放,返回线程池
6. 业务逻辑在线程 B 中执行
7. 调用 emitter.send() 写入数据
8. 数据通过 AsyncContext 写入 TCP 缓冲区
9. Tomcat 的 Poller 线程监听 Socket 可写事件
10. 数据发送到客户端
11. emitter.complete() 关闭连接
public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
@Override
public void handleReturnValue(Object returnValue, ...) throws Exception {
ResponseBodyEmitter emitter = (ResponseBodyEmitter) returnValue;
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(webRequest);
DeferredResult<?> deferredResult = new DeferredResult<>();
asyncManager.startDeferredResultProcessing(deferredResult, ...);
emitter.initialize(new HttpMessageConvertingHandler(outputMessage, ...));
}
}
5.1.3 数据发送流程
class HttpMessageConvertingHandler implements ResponseBodyEmitter.Handler {
@Override
public void send(Object data, MediaType mediaType) throws IOException {
if (data instanceof String) {
String text = (String) data;
String formattedData = "data: " + text + "\n\n";
byte[] bytes = formattedData.getBytes(StandardCharsets.UTF_8);
this.outputMessage.getBody().write(bytes);
this.outputMessage.getBody().flush();
}
}
@Override
public void complete() {
this.outputMessage.getBody().close();
}
}
- HTTP 响应默认是缓冲的,只有 flush 才会立即发送到客户端
- SSE 的实时性依赖于每次 send 都立即 flush
5.2 Flux 底层原理
5.2.1 Reactive Streams 协议
Flux 实现了 Reactive Streams 规范,核心是背压控制:
Flux<String> flux = Flux.just("A", "B", "C");
flux.subscribe(new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
}
@Override
public void onNext(String item) {
System.out.println("收到:" + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.err.println("错误:" + t);
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
- 消费者通过
request(n) 主动拉取数据,避免被淹没
- 生产者必须尊重请求数量,不能无限推送
5.2.2 操作符链式原理
Flux<Integer> flux = Flux.range(1, 10)
.map(i -> i * 2)
.filter(i -> i > 5)
.take(5);
FluxTake(FluxFilter(FluxMap(FluxRange(1, 10))))
订阅 (subscribe):从最外层向内传播
FluxTake → FluxFilter → FluxMap → FluxRange
数据流 (onNext):从最内层向外传播
FluxRange → FluxMap → FluxFilter → FluxTake → Subscriber
5.2.3 线程调度原理
public abstract class Schedulers {
static Scheduler immediate() { return ImmediateScheduler.INSTANCE; }
static Scheduler single() { return SingleScheduler.INSTANCE; }
static Scheduler boundedElastic() { return BoundedElasticScheduler.INSTANCE; }
static Scheduler parallel() { return ParallelScheduler.INSTANCE; }
}
Flux.just("A")
.publishOn(Schedulers.boundedElastic())
.map(s -> {
System.out.println("map 线程:" + Thread.currentThread().getName());
return s.toLowerCase();
})
.subscribeOn(Schedulers.parallel())
.subscribe();
subscribeOn vs publishOn:
subscribeOn:影响源头(订阅操作的线程)
publishOn:影响下游(后续操作符的线程)
Flux.just("A")
.doOnNext(s -> log("1: " + Thread.currentThread().getName()))
.publishOn(Schedulers.single())
.doOnNext(s -> log("2: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.parallel())
.subscribe();
输出:
1: parallel-1 (受 subscribeOn 影响)
2: single-1 (受 publishOn 影响)
5.2.4 冷流 vs 热流
Flux<Integer> cold = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("订阅了"));
cold.subscribe(i -> System.out.println("订阅者 1: " + i));
cold.subscribe(i -> System.out.println("订阅者 2: " + i));
ConnectableFlux<Integer> hot = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("订阅了")).publish();
hot.subscribe(i -> System.out.println("订阅者 1: " + i));
hot.subscribe(i -> System.out.println("订阅者 2: " + i));
hot.connect();
六、性能对比与选型
6.1 性能对比
6.1.1 吞吐量测试
测试场景:10000 个并发请求,每个请求返回 100 个数据块
@State(Scope.Benchmark)
public class PerformanceTest {
@Benchmark
public void testSseEmitter(Blackhole blackhole) {
SseEmitter emitter = new SseEmitter();
for (int i = 0; i < 100; i++) {
emitter.send("data" + i);
}
emitter.complete();
}
@Benchmark
public void testFlux(Blackhole blackhole) {
Flux.range(0, 100).map(i -> "data" + i).subscribe(blackhole::consume);
}
}
| 指标 | SseEmitter | Flux | 说明 |
|---|
| 吞吐量 | 8,500 ops/s | 45,000 ops/s | Flux 快 5 倍+ |
| 内存占用 | 每连接 ~50KB | 每连接 ~10KB | Flux 更节省 |
| CPU 占用 | 65% | 35% | Flux 更高效 |
| 延迟 (P99) | 150ms | 30ms | Flux 更低 |
6.1.2 内存分析
SseEmitter 内存结构:
Thread Stack ~1MB (线程栈)
Response Buffer ~8KB (HTTP 响应缓冲)
Connection State ~40KB (连接状态)
总计:~1.05MB/连接
Flux 内存结构:
Subscription ~2KB (订阅对象)
Operator Chain ~5KB (操作符链)
Buffer ~3KB (可配置)
总计:~10KB/连接
C10K 问题:
SseEmitter: 10000 连接 = 10GB 内存
Flux: 10000 连接 = 100MB 内存
6.2 技术选型
6.2.1 选型决策树
需要流式响应?
├─ 否 → 使用普通 REST 接口
└─ 是 → 继续
| 现有项目是 Spring MVC?
├─ 是 → 考虑 SseEmitter
| │
| │ 并发量 < 1000?
| ├─ 是 → SseEmitter (简单易用)
| └─ 否 → 考虑迁移到 Flux
└─ 否(新项目) → Flux (性能更好)
| 团队熟悉响应式编程?
├─ 是 → 直接使用 Flux
└─ 否 → 先用 SseEmitter,逐步迁移
6.2.2 详细对比
| 维度 | SseEmitter | Flux | 推荐场景 |
|---|
| 易用性 | ⭐⭐⭐⭐⭐ 简单直观 | ⭐⭐⭐ 学习曲线陡 | 快速开发选 SseEmitter |
| 性能 | ⭐⭐⭐ 中等 | ⭐⭐⭐⭐⭐ 优秀 | 高并发选 Flux |
| 资源占用 | ⭐⭐ 每连接 1 个线程 | ⭐⭐⭐⭐⭐ 异步非阻塞 | 资源受限选 Flux |
| 背压控制 | ❌ 不支持 | ✅ 完整支持 | 需要流控选 Flux |
| 生态集成 | ⭐⭐⭐ Spring MVC | ⭐⭐⭐⭐⭐ WebFlux 生态 | 全栈响应式选 Flux |
| 错误处理 | ⭐⭐⭐ 简单 | ⭐⭐⭐⭐ 丰富 | 复杂逻辑选 Flux |
| 测试难度 | ⭐⭐⭐⭐ 容易 | ⭐⭐ 较难 | 快速验证选 SseEmitter |
| 可维护性 | ⭐⭐⭐⭐ 易理解 | ⭐⭐⭐ 需要经验 | 团队新手选 SseEmitter |
6.2.3 实际案例选型
- 并发:<500
- 团队:传统 Java 团队
- 选择:SseEmitter
- 理由:易于理解和维护,性能够用
- 并发:10000+
- 团队:有响应式经验
- 选择:Flux
- 理由:高性能、低资源占用、完整的背压控制
- 并发:<100
- 需求:推送服务器指标
- 选择:SseEmitter
- 理由:简单场景,快速实现
- 并发:50000+设备
- 数据:高频率传感器数据
- 选择:Flux + R2DBC
- 理由:全栈响应式,极致性能
七、生产环境实战
7.1 完整项目改造
7.1.1 改造前(同步模式)
@RestController
@RequestMapping("/api/v1/chat")
public class ChatController {
@PostMapping("/send")
public Result<MessageVO> sendMessage(@RequestBody SendMessageReq req) {
String response = aiService.generate(req.getContent());
return Result.success(new MessageVO(response));
}
}
- 用户体验差:等待 30 秒
- 资源浪费:占用线程 30 秒
- 容易超时:Nginx/Gateway 60 秒超时
7.1.2 改造后(流式模式)
@RestController
@RequestMapping("/api/v1/chat")
@RequiredArgsConstructor
public class StreamChatController {
private final StreamChatService chatService;
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ChatResponse>> streamMessage(@RequestBody SendMessageReq req) {
return chatService.streamChat(req)
.map(chunk -> ServerSentEvent.<ChatResponse>builder()
.event("message")
.id(chunk.getMessageId())
.data(chunk)
.build())
.concatWith(completionEvent())
.onErrorResume(this::handleError);
}
private Flux<ServerSentEvent<ChatResponse>> completionEvent() {
return Flux.just(ServerSentEvent.<ChatResponse>builder()
.event("complete")
.data(ChatResponse.complete())
.build());
}
private Flux<ServerSentEvent<ChatResponse>> handleError(Throwable e) {
log.error("流式聊天异常", e);
return Flux.just(ServerSentEvent.<ChatResponse>builder()
.event("error")
.data(ChatResponse.error(e.getMessage()))
.build());
}
}
7.2 Service 层实现
@Service
@Slf4j
@RequiredArgsConstructor
public class StreamChatService {
private final OpenAIClient openAIClient;
private final ConversationRepository conversationRepository;
private final MessageRepository messageRepository;
private final RedisTemplate<String, Object> redisTemplate;
public Flux<ChatResponse> streamChat(SendMessageReq req) {
String messageId = generateMessageId();
String sessionId = req.getSessionId();
return Flux.create(sink -> {
StringBuilder fullResponse = new StringBuilder();
AtomicInteger chunkCount = new AtomicInteger(0);
try {
saveUserMessage(sessionId, req.getContent()).subscribe();
openAIClient.streamChatCompletion(buildPrompt(sessionId, req.getContent()), new StreamCallback() {
@Override
public void onChunk(String chunk) {
fullResponse.append(chunk);
ChatResponse response = ChatResponse.builder()
.messageId(messageId)
.sessionId(sessionId)
.content(chunk)
.chunkIndex(chunkCount.getAndIncrement())
.isComplete(false)
.build();
sink.next(response);
cacheChunk(messageId, chunkCount.get(), chunk);
}
@Override
public void onComplete() {
saveAssistantMessage(sessionId, messageId, fullResponse.toString())
.doOnSuccess(msg -> log.info("消息保存成功:{}", messageId))
.doOnError(e -> log.error("消息保存失败", e))
.subscribe();
ChatResponse finalResponse = ChatResponse.builder()
.messageId(messageId)
.sessionId(sessionId)
.content("")
.chunkIndex(chunkCount.get())
.isComplete(true)
.totalChunks(chunkCount.get())
.build();
sink.next(finalResponse);
sink.complete();
clearCache(messageId);
}
@Override
public void onError(Throwable error) {
log.error("AI 生成失败", error);
sink.error(new BusinessException("AI 服务异常:" + error.getMessage()));
}
});
} catch (Exception e) {
sink.error(e);
}
}).publishOn(Schedulers.boundedElastic())
.timeout(Duration.ofMinutes(5))
.doOnSubscribe(sub -> log.info("开始流式聊天,sessionId={}", sessionId))
.doOnComplete(() -> log.info("流式聊天完成,sessionId={}", sessionId))
.doOnError(e -> log.error("流式聊天异常,sessionId={}", sessionId, e));
}
private Mono<Message> saveUserMessage(String sessionId, String content) {
return Mono.fromCallable(() -> {
Message message = Message.builder()
.messageId(generateMessageId())
.sessionId(sessionId)
.role(MessageRole.USER)
.content(content)
.createTime(LocalDateTime.now())
.build();
return messageRepository.save(message);
}).subscribeOn(Schedulers.boundedElastic());
}
private Mono<Message> saveAssistantMessage(String sessionId, String messageId, String content) {
return Mono.fromCallable(() -> {
Message message = Message.builder()
.messageId(messageId)
.sessionId(sessionId)
.role(MessageRole.ASSISTANT)
.content(content)
.createTime(LocalDateTime.now())
.build();
return messageRepository.save(message);
}).subscribeOn(Schedulers.boundedElastic());
}
private void cacheChunk(String messageId, int index, String chunk) {
String key = "chat:stream:" + messageId;
redisTemplate.opsForList().rightPush(key, chunk);
redisTemplate.expire(key, Duration.ofMinutes(10));
}
private void clearCache(String messageId) {
String key = "chat:stream:" + messageId;
redisTemplate.delete(key);
}
private String generateMessageId() {
return "msg_" + System.currentTimeMillis() + "_" + RandomUtil.randomString(8);
}
}
7.3 前端对接实现
7.3.1 原生 JavaScript
class StreamChatClient {
constructor(apiBaseUrl) {
this.apiBaseUrl = apiBaseUrl;
this.eventSource = null;
}
sendStreamMessage(sessionId, content, callbacks) {
const url = `${this.apiBaseUrl}/api/v1/chat/stream`;
fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream'
},
body: JSON.stringify({
sessionId: sessionId,
content: content
})
}).then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
const readChunk = () => {
reader.read().then(({ done, value }) => {
if (done) {
callbacks.onComplete?.();
return;
}
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
let eventType = 'message';
let data = '';
for (const line of lines) {
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
} else if (line.startsWith('data:')) {
data = line.substring(5).trim();
} else if (line === '' && data) {
this.handleEvent(eventType, data, callbacks);
eventType = 'message';
data = '';
}
}
readChunk();
});
};
readChunk();
}).catch(error => {
console.error('Stream error:', error);
callbacks.onError?.(error);
});
}
handleEvent(eventType, data, callbacks) {
try {
const parsed = JSON.parse(data);
switch (eventType) {
case 'message':
callbacks.onMessage?.(parsed.content);
break;
case 'complete':
callbacks.onComplete?.();
break;
case 'error':
callbacks.onError?.(new Error(parsed.message));
break;
}
} catch (e) {
console.error('Parse error:', e);
}
}
}
const client = new StreamChatClient('http://localhost:8080');
client.sendStreamMessage('sess_123', '你好,介绍一下你自己', {
onMessage: (content) => {
document.getElementById('response').innerText += content;
},
onComplete: () => {
console.log('流式响应完成');
},
onError: (error) => {
console.error('错误:', error);
alert('发生错误:' + error.message);
}
});
7.3.2 React 实现
import React, { useState, useEffect, useRef } from 'react';
interface ChatMessage {
role: 'user' | 'assistant';
content: string;
isStreaming?: boolean;
}
export const StreamChat: React.FC = () => {
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [inputValue, setInputValue] = useState('');
const [isLoading, setIsLoading] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
const sendMessage = async () => {
if (!inputValue.trim() || isLoading) return;
const userMessage = inputValue;
setInputValue('');
setIsLoading(true);
setMessages(prev => [...prev, { role: 'user', content: userMessage }]);
const aiMessageIndex = messages.length + 1;
setMessages(prev => [...prev, { role: 'assistant', content: '', isStreaming: true }]);
try {
const response = await fetch('/api/v1/chat/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream'
},
body: JSON.stringify({
sessionId: 'sess_123',
content: userMessage
})
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data:')) {
const data = line.substring(5).trim();
if (data) {
try {
const parsed = JSON.parse(data);
if (parsed.content) {
setMessages(prev => {
const newMessages = [...prev];
newMessages[aiMessageIndex] = {
...newMessages[aiMessageIndex],
content: newMessages[aiMessageIndex].content + parsed.content
};
return newMessages;
});
}
if (parsed.isComplete) {
setMessages(prev => {
const newMessages = [...prev];
newMessages[aiMessageIndex].isStreaming = false;
return newMessages;
});
}
} catch (e) {
console.error('Parse error:', e);
}
}
}
}
}
} catch (error) {
console.error('Stream error:', error);
alert('发生错误:' + error);
} finally {
setIsLoading(false);
}
};
return (
<div className="chat-container">
<div className="messages">
{messages.map((msg, index) => (
<div key={index} className={`message message-${msg.role}`}>
<div className="message-content">
{msg.content}
{msg.isStreaming && <span className="cursor">▊</span>}
</div>
</div>
))}
<div ref={messagesEndRef} />
</div>
<div className="input-area">
<input
type="text"
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
placeholder="输入消息..."
disabled={isLoading}
/>
<button onClick= =>
{isLoading ? '发送中...' : '发送'}
);
};
7.4 异常处理与监控
7.4.1 超时处理
@Configuration
public class WebFluxConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.responseTimeout(Duration.ofMinutes(5))
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(300))
.addHandlerLast(new WriteTimeoutHandler(300))
)
))
.build();
}
}
public Flux<String> streamWithTimeout() {
return Flux.create(sink -> {
}).timeout(Duration.ofMinutes(5), Flux.just("超时兜底数据"))
.onErrorResume(TimeoutException.class, e -> {
log.error("流式处理超时", e);
return Flux.just("处理时间过长,请稍后重试");
});
}
7.4.2 监控埋点
@Aspect
@Component
@Slf4j
public class StreamMonitorAspect {
@Around("@annotation(streamMonitor)")
public Object monitor(ProceedingJoinPoint pjp, StreamMonitor streamMonitor) throws Throwable {
String methodName = pjp.getSignature().getName();
long startTime = System.currentTimeMillis();
Object result = pjp.proceed();
if (result instanceof Flux) {
Flux<?> flux = (Flux<?>) result;
AtomicLong chunkCount = new AtomicLong(0);
AtomicLong totalBytes = new AtomicLong(0);
return flux
.doOnNext(item -> {
chunkCount.incrementAndGet();
if (item instanceof String) {
totalBytes.addAndGet(((String) item).length());
}
})
.doOnComplete(() -> {
long cost = System.currentTimeMillis() - startTime;
log.info("流式方法执行完成:method={}, chunks={}, bytes={}, cost={}ms", methodName, chunkCount.get(), totalBytes.get(), cost);
MetricsCollector.recordStreamMetrics(methodName, chunkCount.get(), totalBytes.get(), cost);
})
.doOnError(error -> {
log.error("流式方法执行失败:method={}, error={}", methodName, error.getMessage(), error);
MetricsCollector.recordStreamError(methodName, error);
});
}
return result;
}
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface StreamMonitor {
String value() default "";
}
@StreamMonitor("chat-stream")
public Flux<ChatResponse> streamChat(SendMessageReq req) {
}
八、常见问题与解决方案
8.1 SseEmitter 常见问题
Q1: SseEmitter 连接意外断开
- Nginx/Gateway 超时配置
- 网络不稳定
- 浏览器 Tab 切换(移动端)
location /api/ {
proxy_pass http://backend;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
proxy_connect_timeout 10s;
proxy_send_timeout 600s;
proxy_read_timeout 600s;
}
@Scheduled(fixedRate = 30000)
public void sendHeartbeat() {
emitterManager.getAllEmitters().forEach(emitter -> {
try {
emitter.send(SseEmitter.event().name("heartbeat").data("ping"));
} catch (IOException e) {
log.warn("发送心跳失败,移除连接", e);
emitterManager.remove(emitter);
}
});
}
Q2: 内存泄漏
原因: SseEmitter 未正确关闭,导致连接泄漏
@Component
public class SseEmitterManager {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
public SseEmitter create(String sessionId, Long timeout) {
SseEmitter emitter = new SseEmitter(timeout);
emitter.onCompletion(() -> {
log.info("SSE 连接完成:{}", sessionId);
emitters.remove(sessionId);
});
emitter.onTimeout(() -> {
log.warn("SSE 连接超时:{}", sessionId);
emitters.remove(sessionId);
try {
emitter.complete();
} catch (Exception e) {
log.error("关闭超时连接失败", e);
}
});
emitter.onError(throwable -> {
log.error("SSE 连接异常:{}", sessionId, throwable);
emitters.remove(sessionId);
});
emitters.put(sessionId, emitter);
return emitter;
}
@Scheduled(fixedRate = 60000)
public void cleanup() {
long now = System.currentTimeMillis();
emitters.entrySet().removeIf(entry -> {
return false;
});
}
}
Q3: 数据丢失
public void send(String data) throws IOException {
emitter.send(SseEmitter.event().data(data));
}
8.2 Flux 常见问题
Q1: 背压溢出
现象: reactor.core.Exceptions$OverflowException: Queue is full
Flux<String> flux = Flux.create(sink -> {
}, FluxSink.OverflowStrategy.BUFFER)
.onBackpressureBuffer(1000, dropped -> log.warn("丢弃数据:{}", dropped));
Q2: 线程阻塞
Flux.create(sink -> {
String result = blockingHttpClient.get();
sink.next(result);
sink.complete();
})
Flux.create(sink -> {
asyncHttpClient.get().thenAccept(result -> {
sink.next(result);
sink.complete();
}).exceptionally(error -> {
sink.error(error);
return null;
});
})
.subscribeOn(Schedulers.boundedElastic())
Q3: 订阅未触发
@GetMapping("/test")
public void test() {
Flux.range(1, 10).map(i -> i * 2).doOnNext(System.out::println);
}
@GetMapping("/test")
public Flux<Integer> test() {
return Flux.range(1, 10).map(i -> i * 2);
}
flux.subscribe(
data -> System.out.println(data),
error -> System.err.println(error),
() -> System.out.println("完成")
);
8.3 生产环境最佳实践
1. 设置合理的超时时间
SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);
flux.timeout(Duration.ofMinutes(5))
2. 限制并发连接数
@Component
public class ConnectionLimiter {
private final Semaphore semaphore = new Semaphore(1000);
public SseEmitter createWithLimit() throws InterruptedException {
if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
throw new BusinessException("服务器繁忙,请稍后重试");
}
SseEmitter emitter = new SseEmitter();
emitter.onCompletion(() -> semaphore.release());
emitter.onTimeout(() -> semaphore.release());
emitter.onError(e -> semaphore.release());
return emitter;
}
}
3. 日志与监控
Flux<String> flux = Flux.create(sink -> {
}).doOnSubscribe(sub -> {
log.info("开始流式处理:{}", contextInfo);
MetricsCollector.incrementActiveStreams();
}).doOnNext(item -> {
log.debug("发送数据块:{}", item);
MetricsCollector.recordChunk();
}).doOnComplete(() -> {
log.info("流式处理完成:{}", contextInfo);
MetricsCollector.decrementActiveStreams();
}).doOnError(error -> {
log.error("流式处理失败:{}", contextInfo, error);
MetricsCollector.recordError();
MetricsCollector.decrementActiveStreams();
});
4. 优雅关闭
@Component
public class GracefulShutdown implements ApplicationListener<ContextClosedEvent> {
@Autowired
private SseEmitterManager emitterManager;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("应用关闭,断开所有 SSE 连接");
emitterManager.getAllEmitters().forEach(emitter -> {
try {
emitter.send(SseEmitter.event().name("shutdown").data("服务器即将重启,请重新连接"));
emitter.complete();
} catch (IOException e) {
log.error("发送关闭通知失败", e);
}
});
}
}
总结
核心要点
- SseEmitter:
- 基于 Servlet 异步,简单易用
- 适合中小型项目(并发<1000)
- 每个连接占用一个线程,资源开销较大
- 不支持背压控制
- Flux:
- 基于 Reactive Streams,性能卓越
- 适合高并发场景(并发>1000)
- 异步非阻塞,资源利用率高
- 完整的背压控制和错误处理
- 选型建议:
- 快速上手、团队经验不足 → SseEmitter
- 高性能、大规模并发 → Flux
- 新项目推荐直接使用 Flux
- 老项目可以先用 SseEmitter,逐步迁移
未来趋势
响应式编程已成为 Java 生态的重要方向,Spring、R2DBC、Kafka、Redis 等主流框架都已支持响应式。掌握 Flux 不仅能提升系统性能,也是技术成长的必经之路。
{sendMessage}
disabled
{isLoading}
</button>
</div>
</div>