跳到主要内容Spring Boot 集成 WebSocket 实现后台向前端推送 | 极客日志Java大前端java
Spring Boot 集成 WebSocket 实现后台向前端推送
Spring Boot 集成 WebSocket 的技术方案。涵盖原生 WebSocket 与 STOMP over WebSocket 两种模式,包括配置、消息收发、安全认证及集群部署。通过代码示例演示了如何构建实时通知系统,解决了 HTTP 轮询效率低的问题,实现了服务器主动推送数据给前端的功能,并提供了性能优化与常见问题排查指南。
人间过客1 浏览 1. 引言
随着互联网应用的不断发展,用户对实时性的要求越来越高。传统的 HTTP 协议是基于请求 - 响应模式的,客户端发起请求,服务器返回响应,连接即关闭。这种'拉取'模式在处理实时数据(如股票行情、即时消息、游戏对战、系统通知等)时显得力不从心:要么客户端频繁轮询造成资源浪费,要么服务器有新数据却无法主动通知客户端。
WebSocket 协议的出现完美解决了这一难题。它允许服务器主动向客户端推送数据,实现真正的双向通信。Spring Boot 作为当今最流行的 Java 微服务框架,对 WebSocket 提供了良好的支持。本文将深入浅出地讲解如何在 Spring Boot 中集成 WebSocket,实现后台向前端推送信息,涵盖原生 WebSocket、STOMP 协议、安全集成、集群部署等方方面面,力求让读者能够全面掌握这一技术。
2. WebSocket 基础
2.1 什么是 WebSocket?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它由 IETF 在 2011 年定为标准 RFC 6455,并被 Web API 定义为 W3C 标准。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
2.2 WebSocket 与 HTTP 的关系
WebSocket 与 HTTP 协议是相辅相成的。WebSocket 在建立连接时使用 HTTP 协议的 Upgrade 机制进行协议升级:客户端发起一个带有特殊头部(Connection: Upgrade 和 Upgrade: websocket)的 HTTP 请求,服务器如果支持 WebSocket,则返回 101 状态码(Switching Protocols),之后连接便从 HTTP 协议切换到 WebSocket 协议,后续通信不再使用 HTTP 格式。
- 相同点:
- 都基于 TCP 协议。
- 默认端口也是 80 和 443(ws 对应 80,wss 对应 443)。
- 不同点:
- HTTP 是半双工,WebSocket 是全双工。
- HTTP 需要频繁建立连接(尤其是 HTTP/1.1 虽支持 keep-alive,但仍是请求 - 响应模式),WebSocket 连接一旦建立,可以持续通信。
- WebSocket 消息没有 HTTP 那种复杂的头部,开销小。
2.3 WebSocket 的工作流程
- 握手阶段:客户端发起 HTTP 请求,携带
Upgrade: websocket 头,请求升级协议。
- 协议切换:服务器返回 101 状态码,同意切换,连接协议变为 WebSocket。
- 数据传输:客户端和服务器可以互相发送数据帧(数据单元),可以是文本或二进制。
- 关闭连接:任意一方可以发送关闭帧,另一方响应后关闭 TCP 连接。
2.4 WebSocket 的优点
- 实时性:服务器可以随时推送消息,延迟极低。
- 减少网络开销:相比轮询,减少了大量的 HTTP 头部传输。
- 全双工:双方可以同时发送数据,更自然。
- 跨域支持:可通过 CORS 或特殊配置支持跨域。
3. Spring Boot 对 WebSocket 的支持
Spring 框架从 4.0 开始引入了 WebSocket 模块,提供了一套简洁的 API 来集成 WebSocket。Spring Boot 则通过自动配置进一步简化了集成过程。主要支持两种方式:
- 原生 WebSocket:使用
@ServerEndpoint 注解,基于 Java WebSocket API(JSR-356)实现。
- STOMP over WebSocket:在 WebSocket 之上使用 STOMP 协议(简单文本定向消息协议),提供更高级的消息路由功能,类似于消息队列的订阅发布模型。
Spring 还提供了 SimpMessagingTemplate 用于向客户端推送消息,以及 @MessageMapping 注解处理客户端发送的消息。
4. 准备工作:创建 Spring Boot 项目
- Spring Web
- Spring Boot DevTools(可选)
- WebSocket(即
spring-boot-starter-websocket)
如果你使用 Maven,pom.xml 中的核心依赖如下:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.boot:spring-boot-starter-web'
runtimeOnly 'org.springframework.boot:spring-boot-devtools'
5. 基于原生 WebSocket 的实现
我们先从最基础的原生 WebSocket 开始,了解握手、消息收发等基本流程。
5.1 配置 WebSocket 处理器
在 Spring Boot 中使用原生 WebSocket,通常有两种方式:一种是使用 @ServerEndpoint 注解,另一种是实现 WebSocketHandler 接口。这里我们采用更符合 Spring 风格的 @ServerEndpoint 方式。
首先,需要注册一个 ServerEndpointExporter Bean,它会自动扫描并注册带有 @ServerEndpoint 注解的类。在 Spring Boot 中,通常通过配置类完成:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
注意:如果你使用内嵌的 Servlet 容器(如 Tomcat),ServerEndpointExporter 会自动注册端点。如果部署到外部容器,可能需要额外配置,但 Spring Boot 内嵌容器通常够用。
5.2 编写 WebSocket 处理类
使用 @ServerEndpoint 注解标记一个类,并定义其路径。然后通过 @OnOpen、@OnMessage、@OnClose、@OnError 注解来处理相应的事件。
下面是一个简单的 WebSocket 服务端示例,它接收客户端消息并原样返回(回声),同时支持主动推送:
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@ServerEndpoint("/ws/echo")
public class EchoWebSocket {
private static final CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<>();
@OnOpen
public void onOpen(Session session) {
sessions.add(session);
System.out.println("新连接加入,当前连接数:" + sessions.size());
try {
session.getBasicRemote().sendText("连接成功,欢迎!");
} catch (IOException e) {
e.printStackTrace();
}
}
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("收到消息:" + message);
try {
session.getBasicRemote().sendText("Echo: " + message);
} catch (IOException e) {
e.printStackTrace();
}
broadcast("用户说:" + message);
}
@OnClose
public void onClose(Session session) {
sessions.remove(session);
System.out.println("连接关闭,当前连接数:" + sessions.size());
}
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
public static void broadcast(String message) {
for (Session session : sessions) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
5.3 前端 JavaScript 示例
前端使用浏览器原生的 WebSocket API 连接后端:
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Echo Test</title>
</head>
<body>
<input type="text" placeholder="输入消息" />
<button onclick="sendMessage()">发送</button>
<div id="messages"></div>
<script>
var ws = new WebSocket("ws://localhost:8080/ws/echo");
ws.onopen = function() {
appendMessage("连接已建立");
};
ws.onmessage = function(event) {
appendMessage("收到:" + event.data);
};
ws.onclose = function() {
appendMessage("连接关闭");
};
ws.onerror = function(error) {
appendMessage("错误:" + error);
};
function sendMessage() {
var input = document.getElementById("messageInput");
var msg = input.value;
ws.send(msg);
appendMessage("发送:" + msg);
input.value = '';
}
function appendMessage(msg) {
var div = document.getElementById("messages");
div.innerHTML += "<p>" + msg + "</p>";
}
</script>
</body>
</html>
启动 Spring Boot 应用,访问该 HTML 页面,即可测试双向通信。
5.4 后台主动推送消息
我们可以在任何地方(如定时任务、Controller 中)调用 EchoWebSocket.broadcast() 来向所有连接的客户端推送消息。例如,创建一个定时任务每秒推送当前时间:
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class PushTask {
@Scheduled(fixedRate = 5000)
public void pushTime() {
String message = "当前时间:" + System.currentTimeMillis();
EchoWebSocket.broadcast(message);
}
}
这样每隔 5 秒就会向所有连接的前端推送时间信息。
5.5 原生方式的优缺点
- 优点:简单直接,依赖少,适合小规模应用。
- 缺点:需要自己管理会话、处理线程安全;缺乏高级消息路由(如广播给特定用户);与 Spring 的集成度不高(例如无法直接利用 Spring 的依赖注入,因为 WebSocket 实例不是 Spring 管理的,但可以通过静态方法或工具类解决)。
6. 基于 STOMP 的实现
STOMP(Simple Text Oriented Messaging Protocol)是一个简单的文本定向消息协议,它定义了一套基于帧的格式,可以在 WebSocket 之上使用。Spring 提供了对 STOMP over WebSocket 的支持,使得我们可以像使用消息队列一样处理消息,支持目的地(destination)、订阅等概念,极大简化了开发。
6.1 STOMP 简介
STOMP 类似于 HTTP,但更简单。它的帧格式如下:
COMMAND header1:value1 header2:value2 body
常用命令:SEND(发送消息)、SUBSCRIBE(订阅目的地)、UNSUBSCRIBE、MESSAGE(消息推送)、CONNECT、CONNECTED 等。
在 WebSocket 上使用 STOMP,我们可以将消息路由到不同的处理器,支持点对点和广播。
6.2 配置 WebSocket 消息代理
在 Spring Boot 中启用 STOMP over WebSocket,需要创建一个配置类,实现 WebSocketMessageBrokerConfigurer 接口,并重写相关方法。
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic", "/queue");
registry.setUserDestinationPrefix("/user");
}
}
registerStompEndpoints:注册 STOMP 端点,客户端通过该端点连接。withSockJS() 表示如果浏览器不支持 WebSocket,可以降级使用 SockJS(基于其他传输方式模拟)。
configureMessageBroker:配置消息代理。
setApplicationDestinationPrefixes:客户端发送消息给服务器的路径前缀(如 /app/hello)。
enableSimpleBroker:启用内置的简单消息代理,并指定代理前缀(如 /topic 用于广播,/queue 用于点对点)。
setUserDestinationPrefix:设置用户目的地前缀,用于点对点推送(如 /user/{userId}/message)。
6.3 创建 Controller 处理消息
在 Spring MVC 中,我们可以使用 @MessageMapping 注解来处理客户端发送到特定目的地的消息。这些方法通常返回一个对象,该对象会被自动转换并发送到指定的目的地。
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class WebSocketController {
@MessageMapping("/hello")
@SendTo("/topic/greetings")
public Greeting greeting(HelloMessage message) throws Exception {
Thread.sleep(1000);
return new Greeting("Hello, " + message.getName() + "!");
}
}
class HelloMessage {
private String name;
}
class Greeting {
private String content;
}
@MessageMapping("/hello"):表示当客户端发送到 /app/hello(因为我们在配置中设置了应用前缀 /app)的消息会路由到此方法。
@SendTo("/topic/greetings"):指定方法的返回值将发送到 /topic/greetings 目的地,所有订阅了该目的地的客户端都会收到。
如果不使用 @SendTo,也可以使用 SimpMessagingTemplate 手动发送。
6.4 使用 SimpMessagingTemplate 推送消息
SimpMessagingTemplate 是 Spring 提供的用于向客户端推送消息的工具类。我们可以将它注入到任何 Spring 管理的 Bean 中,灵活地推送消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class PushController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@GetMapping("/push")
public String pushToAll() {
messagingTemplate.convertAndSend("/topic/news", "突发新闻:Spring Boot 3.0 发布!");
return "推送成功";
}
@GetMapping("/push/user")
public String pushToUser(String userId) {
messagingTemplate.convertAndSendToUser(userId, "/message", "您有一条私信");
return "私信推送成功";
}
}
convertAndSend 用于广播,convertAndSendToUser 用于点对点。注意 convertAndSendToUser 默认会拼接成 /user/{userId}/message 这样的目的地,用户客户端需要订阅 /user/message(实际上前缀 /user 会被处理,后面详述)。
6.5 前端使用 SockJS 和 STOMP.js
前端需要引入 SockJS 和 STOMP.js 库。可以通过 CDN 或本地文件引入。
<!DOCTYPE html>
<html>
<head>
<title>STOMP over WebSocket Demo</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<div>
<input type="text" placeholder="你的名字" />
<button onclick="sendName()">发送</button>
</div>
<div id="greetings"></div>
<script>
var stompClient = null;
function connect() {
var socket = new SockJS('/ws-stomp');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('连接成功:' + frame);
stompClient.subscribe('/topic/greetings', function(greeting) {
showGreeting(JSON.parse(greeting.body).content);
});
stompClient.subscribe('/user/message', function(message) {
showGreeting("私信:" + message.body);
});
}, function(error) {
console.log('连接失败:' + error);
});
}
function sendName() {
var name = document.getElementById('name').value;
stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
}
function showGreeting(message) {
var div = document.getElementById('greetings');
div.innerHTML += "<p>" + message + "</p>";
}
window.onload = connect;
</script>
</body>
</html>
- 连接端点:
new SockJS('/ws-stomp'),对应服务端注册的端点。
- 连接成功后,订阅
/topic/greetings 和 /user/message。
- 发送消息:
stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name })),目的地址为 /app/hello。
- 收到消息后解析并显示。
6.6 广播与点对点的深入理解
6.6.1 广播(Topic)
广播模式:所有订阅了某个主题的客户端都会收到消息。在配置中我们启用了简单消息代理,并指定了前缀 /topic。任何发送到 /topic/** 的消息都会被转发给所有订阅了该目的地的客户端。例如:
- 服务器调用
convertAndSend("/topic/news", "内容"),所有订阅 /topic/news 的客户端都会收到。
6.6.2 点对点(Queue)
点对点模式:消息只发送给特定的用户。Spring 内部通过 /user/{username}/** 这样的目的地来实现。客户端订阅时需要订阅 /user/queue/message,服务器发送时使用 convertAndSendToUser(username, "/queue/message", payload)。Spring 会自动将 /user 前缀转换为用户特定的目的地。
注意:点对点需要知道当前用户是谁。通常结合 Spring Security,通过认证的用户信息来确定用户标识。如果没有 Spring Security,也可以在连接时传递用户信息,但比较复杂。
6.6.3 使用 @SendToUser
在 @MessageMapping 方法上可以使用 @SendToUser,表示将返回值发送给当前发送消息的用户(而不是广播)。例如:
@MessageMapping("/private")
@SendToUser("/queue/private")
public String handlePrivate(String message) {
return "这是你的私密回声:" + message;
}
客户端需要订阅 /user/queue/private 来接收。
7. 安全集成(Spring Security + WebSocket)
在实际应用中,WebSocket 连接往往需要鉴权,确保只有合法用户才能连接和订阅某些目的地。Spring Security 提供了与 WebSocket 的集成。
7.1 添加 Spring Security 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
7.2 配置 Spring Security
创建一个简单的 Security 配置类,启用基本认证,并设置用户内存存储用于测试。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.provisioning.InMemoryUserDetailsManager;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
public class SecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/ws-stomp/**").authenticated()
.anyRequest().permitAll()
.and()
.formLogin()
.permitAll()
.and()
.logout()
.permitAll()
.and()
.csrf().disable();
return http.build();
}
@Bean
public UserDetailsService userDetailsService() {
InMemoryUserDetailsManager manager = new InMemoryUserDetailsManager();
manager.createUser(User.withUsername("user1").password("{noop}password1").roles("USER").build());
manager.createUser(User.withUsername("user2").password("{noop}password2").roles("USER").build());
return manager;
}
}
注意:{noop} 表示明文密码,仅为演示,生产环境应使用加密。
7.3 WebSocket 拦截器获取用户信息
为了让 WebSocket 知道当前用户是谁,我们需要在握手阶段将认证信息传递进去。Spring 提供了 DefaultHandshakeHandler,我们可以重写 determineUser 方法。更常见的做法是结合 Spring Security,在连接时通过 Cookie 或 Header 传递 token,然后解析。
在 STOMP 配置中,我们可以添加一个 ChannelInterceptor,用于在消息进入时设置用户。
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic", "/queue");
registry.setUserDestinationPrefix("/user");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new UserInterceptor());
}
}
UserInterceptor 需要实现 ChannelInterceptor 接口,重写 preSend 方法,从消息头中获取认证信息,设置到 Message 中。
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
public class UserInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && accessor.getCommand() != null) {
switch (accessor.getCommand()) {
case CONNECT:
String token = accessor.getFirstNativeHeader("Authorization");
if (token != null && token.startsWith("Bearer ")) {
token = token.substring(7);
}
break;
case SUBSCRIBE:
break;
}
}
return message;
}
}
一种更简单的做法是:如果已经通过 HTTP 登录(如使用表单登录),那么在 WebSocket 握手时,由于同源,JSESSIONID Cookie 会自动携带,Spring Security 会识别出已登录的用户,自动将用户信息绑定到 WebSocket 会话上。只要连接路径与 HTTP 共享相同的会话,就可以。因此,很多情况下不需要额外配置,只需确保 WebSocket 端点在安全配置中允许已认证用户访问即可。
7.4 基于角色的目的地权限控制
除了认证,我们还可能需要对目的地进行权限控制,比如只有管理员才能订阅 /topic/admin。Spring Security 提供了 @PreAuthorize 等注解,但需要配合消息拦截器。
我们可以扩展 ChannelInterceptor,在 preSend 中检查目标地址和用户角色。
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if (destination != null && destination.startsWith("/topic/admin") && (auth == null || !auth.getAuthorities().contains(new SimpleGrantedAuthority("ROLE_ADMIN")))) {
throw new AccessDeniedException("无权限订阅此主题");
}
}
return message;
}
8. 集群环境下的 WebSocket
在生产环境中,应用通常会部署多个实例以实现高可用和负载均衡。WebSocket 连接是状态化的(每个连接对应一个会话),在集群环境下会面临会话共享问题:用户连接到实例 A,但推送消息时可能由实例 B 发送,实例 B 没有该用户的会话信息,导致推送失败。
8.1 问题分析
- WebSocket 会话(
WebSocketSession)保存在每个节点的内存中,其他节点无法访问。
- 当使用简单消息代理(
enableSimpleBroker)时,消息只在当前节点内分发,无法跨节点广播给连接到其他节点的客户端。
- 点对点消息也可能发送到错误的节点。
8.2 解决方案概览
- 使用外部消息代理:如 RabbitMQ、ActiveMQ、Kafka 等,作为消息的中转站。所有节点都连接到同一个消息代理,订阅相关的主题或队列。当某个节点需要推送消息时,将消息发送到消息代理,代理再将消息广播给所有订阅了该主题的节点,各节点再将消息推送给各自的客户端。
- 会话信息集中存储:将会话信息(如用户与节点的映射关系)存储在 Redis 等集中式缓存中,推送时根据用户找到对应的节点,然后通过 HTTP 或 RPC 转发推送请求。但这种方式实现较复杂,不如使用消息代理直接。
- 使用 Stomp Broker Relay:Spring 提供了
StompBrokerRelayMessageHandler,可以配置将消息转发给外部的 STOMP 代理(如 RabbitMQ 的 STOMP 插件)。
8.3 使用 RabbitMQ 作为外部消息代理
8.3.1 安装并启用 RabbitMQ STOMP 插件
RabbitMQ 默认不支持 STOMP,需要启用插件:
rabbitmq-plugins enable rabbitmq_stomp
8.3.2 修改 Spring Boot 配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改 StompWebSocketConfig,使用 StompBrokerRelay 代替简单的内存代理:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest")
.setSystemLogin("guest")
.setSystemPasscode("guest");
registry.setUserDestinationPrefix("/user");
}
这样配置后,消息的广播和点对点都将通过 RabbitMQ 进行分发,所有应用节点都连接到同一个 RabbitMQ,从而实现跨节点通信。
8.3.3 注意事项
- 需要确保 RabbitMQ 的 STOMP 插件监听端口(默认为 61613)可访问。
- 生产环境需配置正确的用户名密码,并考虑使用 SSL。
- 集群节点数量较多时,消息代理的性能成为瓶颈,需合理配置。
8.4 使用 Redis 存储会话
另一种思路是不用消息代理,而是将会话信息存储在 Redis 中。推送时,根据用户 ID 从 Redis 查找该用户当前连接到哪个节点(存储节点地址),然后通过 HTTP 调用该节点的 API 进行推送。这种方式需要自己实现节点间通信,但比较灵活。
不过,这种方法会引入额外的网络开销,且需要处理节点故障时的会话迁移。相比消息代理,实现复杂度更高。通常推荐使用消息代理方案。
9. 性能优化与最佳实践
9.1 心跳机制
WebSocket 本身有 Ping/Pong 帧用于心跳检测,保持连接不超时。STOMP 也支持心跳,客户端和服务器可以协商心跳间隔。在 Spring 中,可以配置心跳:
registry.enableSimpleBroker("/topic", "/queue")
.setHeartbeatValue(new long[]{10000, 10000});
对于 StompBrokerRelay,也需要配置心跳。
9.2 线程模型
Spring WebSocket 使用任务线程池处理消息。默认情况下,@EnableWebSocketMessageBroker 会配置一个 ThreadPoolTaskExecutor。我们可以自定义线程池参数,防止高并发下线程耗尽。
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
}
9.3 消息大小限制
WebSocket 协议本身对消息大小没有严格限制,但底层实现和代理可能有默认限制。在 Spring 中,可以通过 WebSocketTransportProperties 调整,或者配置 StompBrokerRelay 的 messageSizeLimit。
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler(
StompBrokerRelayRegistration registration,
MessageBrokerProperties properties) {
StompBrokerRelayMessageHandler handler = new StompBrokerRelayMessageHandler(registration);
handler.setMessageSizeLimit(1024 * 1024);
return handler;
}
9.4 连接数限制
WebSocket 是基于 TCP 的长连接,每个连接都会占用服务器资源。操作系统有最大文件描述符限制,应用服务器也有相关配置。可以通过以下方式优化:
- 使用 Nginx 等反向代理负载均衡,分散连接压力。
- 调整 Tomcat 的最大连接数(
server.tomcat.max-connections)。
- 合理设置心跳,及时清理僵尸连接。
9.5 压缩与编码
对于大量数据的传输,可以考虑使用二进制格式(如 Protocol Buffers、MessagePack)代替 JSON,减少数据大小。Spring 支持自定义消息转换器。
10. 测试 WebSocket
10.1 使用浏览器开发者工具
现代浏览器(Chrome、Firefox)的开发者工具中,可以在'Network'标签查看 WebSocket 帧,或者使用专门的 WebSocket 测试插件。
10.2 使用命令行工具
如 websocat、wscat(Node.js)等。例如:
wscat -c ws://localhost:8080/ws/echo
10.3 Spring Boot 集成测试
Spring 提供了 WebSocketStompClient 用于编写测试。我们可以模拟客户端连接、发送和接收消息。
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class WebSocketTest {
@Test
void testGreeting() throws Exception {
WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
String url = "ws://localhost:" + port + "/ws-stomp";
StompSession session = stompClient.connect(url, new StompSessionHandlerAdapter() {}).get(1, TimeUnit.SECONDS);
CompletableFuture<Greeting> future = new CompletableFuture<>();
session.subscribe("/topic/greetings", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return Greeting.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
future.complete((Greeting) payload);
}
});
session.send("/app/hello", new HelloMessage("TestUser"));
Greeting greeting = future.get(5, TimeUnit.SECONDS);
assertThat(greeting.getContent()).contains("Hello, TestUser");
}
@LocalServerPort
private int port;
}
11. 常见问题与解决方案
11.1 连接失败(404)
- 检查端点路径是否正确,包括上下文路径(
server.servlet.context-path)。
- 检查是否启用了 SockJS,前端连接时是否使用了正确路径。
- 如果使用 STOMP,确认配置了
setAllowedOrigins,防止跨域问题。
11.2 消息丢失
- 确认客户端订阅了正确的目的地。
- 如果是点对点消息,确保
convertAndSendToUser 中的用户名正确,且客户端订阅了 /user/...。
- 检查网络或代理是否关闭了连接。
11.3 断线重连
前端 STOMP 客户端可以监听断开事件,并重新连接。例如:
stompClient.connect({}, function(frame) {
}, function(error) {
setTimeout(connect, 5000);
});
11.4 跨域问题
在 registerStompEndpoints 中通过 setAllowedOrigins("*") 允许所有域名,生产环境应指定具体域名。注意,SockJS 不支持 *,需要使用具体域名或使用 setAllowedOriginPatterns("*")。
11.5 客户端无法接收广播消息
- 确保客户端订阅了正确的主题,如
/topic/news。
- 检查服务器推送的目的地是否一致。
- 如果使用外部消息代理,检查代理配置。
11.6 性能瓶颈
- 如果单节点连接数过多,考虑水平扩展并使用消息代理。
- 调整 JVM 内存和 GC 参数。
- 使用异步、非阻塞 IO。
12. 实战案例:实时通知系统
下面我们通过一个简单的实时通知系统,综合运用上述知识。
12.1 需求描述
- 系统广播:所有在线用户都能收到的通知(如系统维护公告)。
- 个人私信:仅特定用户收到的通知(如评论回复)。
12.2 后端实现
- 配置类:使用 STOMP over WebSocket,配置端点、消息代理。
- 安全配置:集成 Spring Security,使用内存用户。
- 通知实体:
public class Notification {
private String type;
private String content;
private String targetUser;
private Date timestamp;
}
- Controller 处理私信发送(来自其他用户):
@MessageMapping("/private")
public void sendPrivate(@Payload Notification notification, Principal principal) {
notification.setTimestamp(new Date());
messagingTemplate.convertAndSendToUser(notification.getTargetUser(), "/queue/notifications", notification);
}
@RestController
@RequestMapping("/api/notify")
public class NotifyController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@PostMapping("/broadcast")
public String broadcast(@RequestBody String content) {
Notification notification = new Notification("BROADCAST", content, null, new Date());
messagingTemplate.convertAndSend("/topic/broadcast", notification);
return "广播成功";
}
}
12.3 前端实现
登录页面(省略),假设登录后跳转到主页。主页 HTML 包含:
function connect() {
var socket = new SockJS('/ws-stomp');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
stompClient.subscribe('/topic/broadcast', function(msg) {
var notif = JSON.parse(msg.body);
showNotification(notif.content);
});
stompClient.subscribe('/user/queue/notifications', function(msg) {
var notif = JSON.parse(msg.body);
showNotification("私信:" + notif.content);
});
});
}
12.4 测试
- 启动应用,用两个不同用户登录(如 user1 和 user2)。
- 调用广播 API,所有在线用户都能看到通知。
- 发送私信给 user1,只有 user1 能看到。
13. 总结与展望
本文从 WebSocket 的基本概念出发,详细介绍了在 Spring Boot 中集成 WebSocket 的两种方式:原生 WebSocket 和基于 STOMP 的消息代理方式。我们探讨了消息的广播与点对点推送、安全集成、集群部署以及性能优化。通过实战案例,展示了如何构建一个实时通知系统。
WebSocket 技术为实时 Web 应用提供了强大的支持,结合 Spring Boot 的便捷性,开发者可以快速构建出高实时性的应用。随着 Web 技术的不断发展,WebSocket 的应用场景将越来越广泛,比如在线游戏、协同编辑、实时监控等
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online