实战:Spring Boot 2.7.8 原生 SSE 服务端开发

实战:Spring Boot 2.7.8 原生 SSE 服务端开发

目录

前言

一、SSE知识简介

1、SSE是什么

2、SSE工作原理

3、SSE适用场景

二、SpringBoot中SSE的实现

1、Maven中引入

2、SSE服务类实现

3、SSE控制器类实现

4、最简单页面实现

三、成果展示

1、SSE连接

2、群发消息

3、点对点消息

四、总结


前言

        在当今的互联网应用开发中,实时数据交互的需求日益增长。无论是股票交易系统中实时更新的股价信息,还是社交平台上的即时消息推送,亦或是物联网场景下传感器数据的实时传输,都对后端服务提出了更高的要求。传统的轮询机制虽然简单,但效率低下且资源消耗大;而WebSocket虽然功能强大,但在某些场景下显得过于复杂且实现成本较高。在这种背景下,Server-Sent Events(SSE)作为一种轻量级的、基于HTTP协议的单向实时通信技术,逐渐受到开发者的关注。

        SSE允许服务器主动向客户端推送数据,而无需客户端频繁发起请求,这不仅提高了数据传输的效率,还降低了服务器的负载。更重要的是,SSE的实现相对简单,它基于标准的HTTP协议,无需额外的协议支持,这使得它在许多场景下成为理想的解决方案。Spring Boot作为目前最流行的Java微服务框架之一,提供了强大的功能和极高的开发效率。在Spring Boot 2.7.8版本中,我们可以利用其原生支持,轻松实现SSE服务,从而为应用提供实时数据推送能力。

        本实战教程将带领读者从零开始,逐步构建一个基于Spring Boot 2.7.8的SSE服务端应用。我们将从基础的环境搭建开始,详细介绍如何创建一个Spring Boot项目,并引入必要的依赖。接着,我们会深入探讨SSE的核心概念,包括事件流的格式、数据推送的机制以及如何处理客户端的连接和重连。通过具体的代码示例,读者将学会如何在Spring Boot中配置和使用SSE,实现从服务器到客户端的实时数据推送。

        此外,我们还将讨论一些常见的问题和挑战,例如如何保证数据的实时性和准确性、如何处理高并发场景下的性能问题,以及如何确保服务的稳定性和可靠性。通过这些讨论,读者将能够更好地应对实际开发中可能遇到的各种情况,从而构建出高效、稳定且可扩展的SSE服务。无论你是刚刚接触Spring Boot的初学者,还是已经有一定经验的开发者,希望通过本教程的学习,你能够掌握SSE技术的核心要点,并将其应用到自己的项目中。让我们一起开启这场Spring Boot原生SSE服务端开发的实战之旅,探索实时数据交互的奥秘,提升你的应用性能和用户体验。

一、SSE知识简介

        在正式开始介绍在SpringBoot中如何实现SSE服务时,为了方便第一次学习的朋友对SSE的机制有一个基本的了解,这里我们首先对SSE进行一个简单的介绍。分别来简单讲讲SSE是什么?SSE的工作原理是什么以及SSE适用于什么场景。

1、SSE是什么

        SSE(Server-sent Events) 规范是 HTML 5 规范的一个组成部分,具体的规范文档见参考资源。该规范比较简单,主要由两个部分组成:第一个部分是服务器端与浏览器端之间的通讯协议,第二部分则是在浏览器端可供 JavaScript 使用的 EventSource 对象。通讯协议是基于纯文本的简单协议。服务器端的响应的内容类型是“text/event-stream”。响应文本的内容可以看成是一个事件流,由不同的事件所组成。每个事件由类型和数据两部分组成,同时每个事件可以有一个可选的标识符。不同事件的内容之间通过仅包含回车符和换行符的空行(“\r\n”)来分隔。每个事件的数据可能由多行组成。

2、SSE工作原理

        客户端发起请求:客户端通过 EventSource API 向服务器发起一个 HTTP GET 请求,请求头中包含 Accept: text/event-stream,表明希望接收事件流。

        服务器响应:服务器收到请求后,保持连接开放,并设置响应头 Content-Type: text/event-streamCache-Control: no-cache,以确保数据流不会被缓存。

        数据推送:服务器通过保持开放的连接,以事件流的形式向客户端发送数据。每个事件由字段组成,如 data(消息内容)、event(事件类型)、id(消息编号)和 retry(重连间隔)。

        客户端接收:客户端通过监听事件流来获取数据,并在接收到事件后进行处理。

        自动重连:如果连接中断,客户端会根据 retry 字段的值自动尝试重新连接

3、SSE适用场景

        说到SSE的使用场景,必不可少的就要提一下它的一个如影随形的技术点,WebSocket。众所周知,WebSocket是一个全双弓的通道,可以同时收发消息,同时WebSocket在面向Https的安全领域处理起来也是非常严格的。因此,在一些场景中,比如我们不需要进行全双工操作,只需要被动的接受服务器端的信息推送即可。总结起来,SSE适用的场景如下:

        实时通知:如新闻更新、消息提醒、股票价格变动等,服务器可以实时向客户端推送最新信息。

        流式数据:如日志流、传感器数据等,客户端可以持续接收服务器发送的数据流。

        单向通信场景:当只需要服务器向客户端推送数据,而客户端无需向服务器发送数据时,SSE 是一个简单高效的选择。

        通过以上的介绍,相信大家对SSE的了解又加深了。介绍完这些基础知识后,下面我们来重点介绍一下如何在SpringBoot中进行SSE服务的实现。

二、SpringBoot中SSE的实现

        Spring Boot作为目前最流行的Java微服务框架之一,提供了强大的功能和极高的开发效率。在Spring Boot 2.7.8版本中,我们可以利用其原生支持,轻松实现SSE服务,从而为应用提供实时数据推送能力。因此本节将重点介绍如何在SpringBoot中进行SSE的实现,分别从Maven依赖引入、SSE服务类实现、SSE控制器类和基于Thymeleaf的最简单页面实现等4个方面进行介绍。

1、Maven中引入

        这里直接给出我们的一个比较简单的Pom依赖引入的示例,关键代码如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.yelang</groupId> <artifactId>baidu-sse-client</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <!-- 兼容JDK 8的Spring Boot版本 --> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <java.version>1.8</java.version> <!-- 使用JDK 8 --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <!-- Spring Boot WebFlux (包含WebClient) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <!-- Spring Boot Web (可选,如果也需要传统Web MVC) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Jackson JSON处理 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!-- SpringBoot集成thymeleaf模板 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <!-- 测试依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <!-- 确保使用JDK 8编译 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>

2、SSE服务类实现

        引入相关资源后,接下来就是非常重要的SSE服务类的实现,这是整个SSE的核心。它不仅包含着连接的创建、销毁,同时还包括了消息的发送,包括群发和单点发送。下面我们就分别来介绍这些功能实现。核心代码如下:

package org.yelang.service; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Service public class SseService { // 保存所有连接的 emitter private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>(); private final AtomicInteger counter = new AtomicInteger(0); /** * -创建新的 SSE 连接 */ public SseEmitter createEmitter(String clientId) { // 设置超时时间(0表示永不超时) SseEmitter emitter = new SseEmitter(0L); emitters.put(clientId, emitter); // 设置完成和超时回调 emitter.onCompletion(() -> { emitters.remove(clientId); System.out.println("SSE连接完成: " + clientId); }); emitter.onTimeout(() -> { emitters.remove(clientId); System.out.println("SSE连接超时: " + clientId); }); emitter.onError((e) -> { emitters.remove(clientId); System.out.println("SSE连接错误: " + clientId + ", 错误: " + e.getMessage()); }); return emitter; } /** * -发送消息给所有客户端 */ public void sendToAll(String message) { emitters.forEach((clientId, emitter) -> { try { SseEmitter.SseEventBuilder event = SseEmitter.event().data(message) .id(String.valueOf(counter.incrementAndGet())).name("message").reconnectTime(5000L); emitter.send(event); } catch (IOException e) { emitter.completeWithError(e); emitters.remove(clientId); } }); } /** * -发送消息给特定客户端 */ public void sendToClient(String clientId, String message) { SseEmitter emitter = emitters.get(clientId); if (emitter != null) { try { SseEmitter.SseEventBuilder event = SseEmitter.event().data(message) .id(String.valueOf(counter.incrementAndGet())).name("message"); emitter.send(event); } catch (IOException e) { emitter.completeWithError(e); emitters.remove(clientId); } } } /** * -获取当前连接数 */ public int getConnectionCount() { return emitters.size(); } }

        需要说明的是,这里为了演示方便,连接时间设置为永不超时。于此同时,为了方便对连接进行统一的管理,这里我们使用一个HashMap来进行保存。对比群发和私发的消息的区别就是,群发是向所有客户端统一进行消息的推送,而私发就只有通信双方才了解。

3、SSE控制器类实现

        与常规的MVC应用一样,我们在后台也要实现一个SSE后台,这样才能为前端页面提供连接和发布消息。这里我们提供以下方法:

序号方法名参数说明
1public String index(Model model)跳转SSE管理首页
2public SseEmitter streamSse(@RequestParam(value = "clientId", required = false) String clientId)建立 SSE 连接
3public String broadcastMessage(@RequestParam String message)广播消息
4public String sendToClient(@RequestParam String clientId, @RequestParam String message)点对点发送消息
5public int getConnectionCount()获取SSE连接数

        下面给出实例代码:

package org.yelang.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.yelang.service.SseService; import java.util.UUID; @Controller @RequestMapping("/sseman") public class SseController { @Autowired private SseService sseService; /** * -首页 */ @GetMapping("/index") public String index(Model model) { model.addAttribute("connectionCount", sseService.getConnectionCount()); return "sse/index"; } /** * -建立 SSE 连接 */ @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter streamSse(@RequestParam(value = "clientId", required = false) String clientId) { if (clientId == null || clientId.trim().isEmpty()) { clientId = UUID.randomUUID().toString(); } return sseService.createEmitter(clientId); } /** * -发送消息给所有客户端 */ @PostMapping("/broadcast") @ResponseBody public String broadcastMessage(@RequestParam String message) { sseService.sendToAll(message); return "消息已广播"; } /** * - 发送消息给特定客户端 */ @PostMapping("/send-to-client") @ResponseBody public String sendToClient(@RequestParam String clientId, @RequestParam String message) { sseService.sendToClient(clientId, message); return "消息已发送给客户端: " + clientId; } /** * -获取连接数 */ @GetMapping("/connection-count") @ResponseBody public int getConnectionCount() { return sseService.getConnectionCount(); } }

4、最简单页面实现

        下面我们以Thymeleaf为例,重点讲解前端界面的SSE集成。大家可以根据实际的业务需要来修改。为了进行页面的展示,首先来定义页面样式,设置如下:

<style> body { font-family: Arial, sans-serif; max-width: 90%; margin: 0 auto; padding: 5px; } .container { border: 1px solid #ddd; border-radius: 5px; padding: 10px; margin-bottom: 10px; } .message-area { height: 300px; border: 1px solid #ccc; padding: 10px; overflow-y: auto; background-color: #f9f9f9; } .input-group { margin-bottom: 10px; } input, button { padding: 8px; margin: 5px; } button { background-color: #007bff; color: white; border: none; border-radius: 3px; cursor: pointer; } button:hover { background-color: #0056b3; } .status { color: #28a745; font-weight: bold; } .error { color: #dc3545; } .message { margin: 5px 0; padding: 5px; border-left: 3px solid #007bff; background-color: white; } /* 新增样式:双列布局 */ .row-container { display: flex; gap: 20px; } .row-container .container { flex: 1; margin-bottom: 10px; } /* 响应式设计:小屏幕时恢复单列布局 */ @media (max-width: 768px) { .row-container { flex-direction: column; } } </style>

        同时,在页面区域,我们分类连接区、发送区和消息展示区,这里我们提供两个面板进行数据的搜集和发送相应的事件。

<!-- 新增:将连接状态和发送消息放在同一行 --> <div> <div> <h4>连接状态</h4> <div> <span>当前连接数: </span> <span th:text="${connectionCount}">0</span> </div> <div> <span>我的客户端ID: </span> <span>未连接</span> </div> <div> <button onclick="connectSSE()">连接 SSE</button> <button onclick="disconnectSSE()">断开连接</button> <button onclick="refreshConnectionCount()">刷新连接数</button> </div> </div> <div> <h4>发送消息</h4> <div> <input type="text" placeholder="输入要广播的消息"> <button onclick="broadcastMessage()">广播给所有客户端</button> </div> <div> <input type="text" placeholder="目标客户端ID"> <input type="text" placeholder="输入私密消息"> <button onclick="sendPrivateMessage()">发送给特定客户端</button> </div> </div> </div> <div> <h4>接收消息</h4> <div> <!-- 消息将在这里显示 --> </div> </div>

        方便进行页面的标记,这里我们创建一个生成clientid的方法,使用随机数生成的方式,参考代码如下:

function generateClientId() { return 'client_' + Math.random().toString(36).substr(2, 9); }

        在html中创建sse连接,并与后台连接的代码如下:

let eventSource = null; let clientId = null; var ctx = "/bdsse/sseman"; function connectSSE() { if (eventSource) { addMessage('警告', '已经连接到SSE服务器'); return; } // 生成客户端ID clientId = generateClientId(); document.getElementById('clientId').textContent = clientId; // 创建 EventSource 连接 eventSource = new EventSource(ctx + '/sse?clientId=' + clientId); // 处理消息事件 eventSource.onmessage = function(event) { addMessage('服务器消息', event.data); }; // 处理自定义事件 eventSource.addEventListener('message', function(event) { addMessage('自定义消息', event.data); }); // 处理连接打开 eventSource.onopen = function(event) { addMessage('系统', 'SSE连接已建立'); refreshConnectionCount(); }; // 处理错误 eventSource.onerror = function(event) { if (eventSource.readyState === EventSource.CLOSED) { addMessage('系统', 'SSE连接已关闭'); } else { addMessage('错误', 'SSE连接错误: ' + event); } }; addMessage('系统', '正在连接SSE服务器...'); }

        通过这样就可以通过请求后台接口连接上SSE服务端,如果想要断开连接可以调用以下方法:

 function disconnectSSE() { console.log("断开连接"); if (eventSource) { eventSource.close(); eventSource = null; addMessage('系统', 'SSE连接已断开'); refreshConnectionCount(); } else { addMessage('警告', '没有活动的SSE连接'); } }

        更多具体的应用和代码将在下一节中进行详细讲解。

三、成果展示

        本节将结合具体的页面和相关SSE的处理方法讲解一个实际的消息发送与接收实例。

1、SSE连接

        在控制台中启动服务后,在浏览器中输入访问地址,即可打开如下界面:

        可以看到在消息栏中已经显示连接SSE服务器成功。接下来就可以进行消息广播和点对点发送。

2、群发消息

        群发消息很容易理解,就是通知所有连接的客户端,同时在客户端显示发送的消息。群发消息的处理代码如下:

function broadcastMessage() { const message = document.getElementById('broadcastMessage').value; if (!message) { alert('请输入消息'); return; } fetch(ctx + '/broadcast', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: 'message=' + encodeURIComponent(message) }) .then(response => response.text()) .then(data => { addMessage('操作', data); document.getElementById('broadcastMessage').value = ''; }) .catch(error => { addMessage('错误', '发送失败: ' + error); }); }

        为了方便演示,我们打开两个标签页,在新打开的标签中也连接SSE服务器,界面如下:

        在任意一个客户端中输入需要群发的消息,比如:hello world,大家好。然后点击“广播给所有客户端”按钮,再来看每个客户端可以收到以下内容:

3、点对点消息

        使用SSE除了可以实现群发消息之外,也可以实现向某指定客户端发送消息,即点对点私发消息,实现代码如下:

 function sendPrivateMessage() { const targetClientId = document.getElementById('targetClientId').value; const message = document.getElementById('privateMessage').value; if (!targetClientId || !message) { alert('请输入客户端ID和消息'); return; } fetch(ctx + '/send-to-client', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: 'clientId=' + encodeURIComponent(targetClientId) + '&message=' + encodeURIComponent(message) }) .then(response => response.text()) .then(data => { addMessage('操作', data); document.getElementById('privateMessage').value = ''; }) .catch(error => { addMessage('错误', '发送失败: ' + error); }); }

        将目标客户端的clientID记下来之后,就可以实现向这个客户端进行发送,然后点击发送特定客户端,在接收端和发送端可以看到以下消息:

        来检查一下其它第三方的客户端,能否接收消息:

        从图上可以看到,目标客户端成功接收消息,而非目标客户端则没有收到消息,即实现了点对点私发消息。

四、总结

        以上就是文本的主要内容,本实战教程将带领读者从零开始,逐步构建一个基于Spring Boot 2.7.8的SSE服务端应用。我们将从基础的环境搭建开始,详细介绍如何创建一个Spring Boot项目,并引入必要的依赖。接着,我们会深入探讨SSE的核心概念,包括事件流的格式、数据推送的机制以及如何处理客户端的连接和重连。通过具体的代码示例,读者将学会如何在Spring Boot中配置和使用SSE,实现从服务器到客户端的实时数据推送。行文仓促,定有不足之处,欢迎各位朋友在评论区批评指正,不胜感激。

Read more

MySQL 从入门到精通完全教程

目录 1. 前言 2. MySQL 基础认知 3. MySQL 安装与配置 4. MySQL 核心语法 5. 高级查询技巧 6. MySQL 函数 7. 数据约束 8. 事务管理 9. 索引优化 10. 存储过程与函数 11. 用户与权限管理 12. 性能优化实战 13. 常见问题与解决方案 1. 前言 1.1 什么是MySQL? MySQL 是一款开源的关系型数据库管理系统(RDBMS),基于SQL(结构化查询语言)实现数据管理,广泛应用于Web开发(如PHP+MySQL、Python+MySQL),特点是轻量、高效、跨平台、

By Ne0inhk
MySQL 进阶:库与表的DDL核心操作全指南(含实战案例)

MySQL 进阶:库与表的DDL核心操作全指南(含实战案例)

🔥草莓熊Lotso:个人主页 ❄️个人专栏: 《C++知识分享》《Linux 入门到实践:零基础也能懂》 ✨生活是默默的坚持,毅力是永久的享受! 🎬 博主简介: 文章目录 * 前言: * 一. 数据库(库)的核心操作 * 1.1 创建数据库:指定字符集与校验规则 * 1.1.1 语法格式 * 1.1.2 实战案例 * 1.2 字符集与校验规则:影响查询和排序 * 1.2.1 查看系统默认配置 * 1.2.2 查看支持的字符集和校验规则 * 1.2.3 校验规则的实际影响 * 1.3 操纵数据库:查询、修改、

By Ne0inhk
SpringBoot 整合 Langchain4j 实现会话记忆存储深度解析

SpringBoot 整合 Langchain4j 实现会话记忆存储深度解析

目录 一、前言 二、AI大模型会话记忆介绍 2.1 AI 大模型的会话记忆是什么 2.2 AI 大模型为什么需要会话记忆 2.3 AI 大模型会话记忆常用实现方案 2.4 LangChain4j 会话记忆介绍 2.4.1 LangChain4j 会话记忆介绍 2.4.2 LangChain4j 会话记忆类型 三、Langchain4j 会话记忆操作案例使用 3.1 前置准备 3.1.1 导入依赖文件 3.1.2 添加配置文件 3.1.3 前置案例 3.

By Ne0inhk