Spring Boot集成WebSocket,实现后台向前端推送信息

1. 引言

随着互联网应用的不断发展,用户对实时性的要求越来越高。传统的HTTP协议是基于请求-响应模式的,客户端发起请求,服务器返回响应,连接即关闭。这种“拉取”模式在处理实时数据(如股票行情、即时消息、游戏对战、系统通知等)时显得力不从心:要么客户端频繁轮询造成资源浪费,要么服务器有新数据却无法主动通知客户端。

WebSocket协议的出现完美解决了这一难题。它允许服务器主动向客户端推送数据,实现真正的双向通信。Spring Boot作为当今最流行的Java微服务框架,对WebSocket提供了良好的支持。本文将深入浅出地讲解如何在Spring Boot中集成WebSocket,实现后台向前端推送信息,涵盖原生WebSocket、STOMP协议、安全集成、集群部署等方方面面,力求让读者能够全面掌握这一技术。

2. WebSocket基础

2.1 什么是WebSocket?

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它由IETF在2011年定为标准RFC 6455,并被Web API定义为W3C标准。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

2.2 WebSocket与HTTP的关系

WebSocket与HTTP协议是相辅相成的。WebSocket在建立连接时使用HTTP协议的Upgrade机制进行协议升级:客户端发起一个带有特殊头部(Connection: UpgradeUpgrade: websocket)的HTTP请求,服务器如果支持WebSocket,则返回101状态码(Switching Protocols),之后连接便从HTTP协议切换到WebSocket协议,后续通信不再使用HTTP格式。

  • 相同点
    • 都基于TCP协议。
    • 默认端口也是80和443(ws对应80,wss对应443)。
  • 不同点
    • HTTP是半双工,WebSocket是全双工。
    • HTTP需要频繁建立连接(尤其是HTTP/1.1虽支持keep-alive,但仍是请求-响应模式),WebSocket连接一旦建立,可以持续通信。
    • WebSocket消息没有HTTP那种复杂的头部,开销小。

2.3 WebSocket的工作流程

  1. 握手阶段:客户端发起HTTP请求,携带Upgrade: websocket头,请求升级协议。
  2. 协议切换:服务器返回101状态码,同意切换,连接协议变为WebSocket。
  3. 数据传输:客户端和服务器可以互相发送数据帧(数据单元),可以是文本或二进制。
  4. 关闭连接:任意一方可以发送关闭帧,另一方响应后关闭TCP连接。

2.4 WebSocket的优点

  • 实时性:服务器可以随时推送消息,延迟极低。
  • 减少网络开销:相比轮询,减少了大量的HTTP头部传输。
  • 全双工:双方可以同时发送数据,更自然。
  • 跨域支持:可通过CORS或特殊配置支持跨域。

3. Spring Boot对WebSocket的支持

Spring框架从4.0开始引入了WebSocket模块,提供了一套简洁的API来集成WebSocket。Spring Boot则通过自动配置进一步简化了集成过程。主要支持两种方式:

  • 原生WebSocket:使用@ServerEndpoint注解,基于Java WebSocket API(JSR-356)实现。
  • STOMP over WebSocket:在WebSocket之上使用STOMP协议(简单文本定向消息协议),提供更高级的消息路由功能,类似于消息队列的订阅发布模型。

Spring还提供了SimpMessagingTemplate用于向客户端推送消息,以及@MessageMapping注解处理客户端发送的消息。

4. 准备工作:创建Spring Boot项目

我们从一个基本的Spring Boot项目开始。可以使用Spring Initializr(https://start.spring.io/)或者IDE(如IntelliJ IDEA)创建项目,选择以下依赖:

  • Spring Web
  • Spring Boot DevTools(可选)
  • WebSocket(即spring-boot-starter-websocket

如果你使用Maven,pom.xml中的核心依赖如下:

xml

<dependencies> <!-- Spring Boot WebSocket Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- Spring Boot Web Starter (通常包含在内,但显式添加也无妨) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 可选:用于简化开发的热部署 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <!-- 可选:JSON处理 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>

如果使用Gradle,对应依赖:

groovy

implementation 'org.springframework.boot:spring-boot-starter-websocket' implementation 'org.springframework.boot:spring-boot-starter-web' runtimeOnly 'org.springframework.boot:spring-boot-devtools'

5. 基于原生WebSocket的实现

我们先从最基础的原生WebSocket开始,了解握手、消息收发等基本流程。

5.1 配置WebSocket处理器

在Spring Boot中使用原生WebSocket,通常有两种方式:一种是使用@ServerEndpoint注解,另一种是实现WebSocketHandler接口。这里我们采用更符合Spring风格的@ServerEndpoint方式。

首先,需要注册一个ServerEndpointExporter Bean,它会自动扫描并注册带有@ServerEndpoint注解的类。在Spring Boot中,通常通过配置类完成:

java

import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

注意:如果你使用内嵌的Servlet容器(如Tomcat),ServerEndpointExporter会自动注册端点。如果部署到外部容器,可能需要额外配置,但Spring Boot内嵌容器通常够用。

5.2 编写WebSocket处理类

使用@ServerEndpoint注解标记一个类,并定义其路径。然后通过@OnOpen@OnMessage@OnClose@OnError注解来处理相应的事件。

下面是一个简单的WebSocket服务端示例,它接收客户端消息并原样返回(回声),同时支持主动推送:

java

import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint("/ws/echo") public class EchoWebSocket { // 存储所有连接的会话,用于广播 private static final CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<>(); @OnOpen public void onOpen(Session session) { sessions.add(session); System.out.println("新连接加入,当前连接数:" + sessions.size()); // 可以给客户端发送欢迎消息 try { session.getBasicRemote().sendText("连接成功,欢迎!"); } catch (IOException e) { e.printStackTrace(); } } @OnMessage public void onMessage(String message, Session session) { System.out.println("收到消息:" + message); // 回声:将消息返回给发送者 try { session.getBasicRemote().sendText("Echo: " + message); } catch (IOException e) { e.printStackTrace(); } // 也可以广播给所有人(可选) broadcast("用户说:" + message); } @OnClose public void onClose(Session session) { sessions.remove(session); System.out.println("连接关闭,当前连接数:" + sessions.size()); } @OnError public void onError(Session session, Throwable error) { error.printStackTrace(); } // 广播消息给所有客户端 public static void broadcast(String message) { for (Session session : sessions) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } } }

5.3 前端JavaScript示例

前端使用浏览器原生的WebSocket API连接后端:

html

<!DOCTYPE html> <html> <head> <title>WebSocket Echo Test</title> </head> <body> <input type="text" placeholder="输入消息" /> <button οnclick="sendMessage()">发送</button> <div></div> <script> var ws = new WebSocket("ws://localhost:8080/ws/echo"); ws.onopen = function() { appendMessage("连接已建立"); }; ws.onmessage = function(event) { appendMessage("收到: " + event.data); }; ws.onclose = function() { appendMessage("连接关闭"); }; ws.onerror = function(error) { appendMessage("错误: " + error); }; function sendMessage() { var input = document.getElementById("messageInput"); var msg = input.value; ws.send(msg); appendMessage("发送: " + msg); input.value = ''; } function appendMessage(msg) { var div = document.getElementById("messages"); div.innerHTML += "<p>" + msg + "</p>"; } </script> </body> </html>

启动Spring Boot应用,访问该HTML页面,即可测试双向通信。

5.4 后台主动推送消息

我们可以在任何地方(如定时任务、Controller中)调用EchoWebSocket.broadcast()来向所有连接的客户端推送消息。例如,创建一个定时任务每秒推送当前时间:

java

import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component @EnableScheduling public class PushTask { @Scheduled(fixedRate = 5000) public void pushTime() { String message = "当前时间:" + System.currentTimeMillis(); EchoWebSocket.broadcast(message); } }

这样每隔5秒就会向所有连接的前端推送时间信息。

5.5 原生方式的优缺点

  • 优点:简单直接,依赖少,适合小规模应用。
  • 缺点:需要自己管理会话、处理线程安全;缺乏高级消息路由(如广播给特定用户);与Spring的集成度不高(例如无法直接利用Spring的依赖注入,因为WebSocket实例不是Spring管理的,但可以通过静态方法或工具类解决)。

6. 基于STOMP的实现

STOMP(Simple Text Oriented Messaging Protocol)是一个简单的文本定向消息协议,它定义了一套基于帧的格式,可以在WebSocket之上使用。Spring提供了对STOMP over WebSocket的支持,使得我们可以像使用消息队列一样处理消息,支持目的地(destination)、订阅等概念,极大简化了开发。

6.1 STOMP简介

STOMP类似于HTTP,但更简单。它的帧格式如下:

text

COMMAND header1:value1 header2:value2 body

常用命令:SEND(发送消息)、SUBSCRIBE(订阅目的地)、UNSUBSCRIBEMESSAGE(消息推送)、CONNECTCONNECTED等。

在WebSocket上使用STOMP,我们可以将消息路由到不同的处理器,支持点对点和广播。

6.2 配置WebSocket消息代理

在Spring Boot中启用STOMP over WebSocket,需要创建一个配置类,实现WebSocketMessageBrokerConfigurer接口,并重写相关方法。

java

import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; @Configuration @EnableWebSocketMessageBroker public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 注册一个STOMP端点,客户端通过这个端点进行连接 registry.addEndpoint("/ws-stomp") .setAllowedOrigins("*") // 允许跨域 .withSockJS(); // 启用SockJS支持,用于降级方案 } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { // 设置应用目的地前缀,即客户端发送消息的目的地前缀 registry.setApplicationDestinationPrefixes("/app"); // 启用简单消息代理,并设置消息代理前缀(订阅前缀) registry.enableSimpleBroker("/topic", "/queue"); // 点对点使用的订阅前缀(默认是/user/) registry.setUserDestinationPrefix("/user"); } }

  • registerStompEndpoints:注册STOMP端点,客户端通过该端点连接。withSockJS()表示如果浏览器不支持WebSocket,可以降级使用SockJS(基于其他传输方式模拟)。
  • configureMessageBroker:配置消息代理。
    • setApplicationDestinationPrefixes:客户端发送消息给服务器的路径前缀(如/app/hello)。
    • enableSimpleBroker:启用内置的简单消息代理,并指定代理前缀(如/topic用于广播,/queue用于点对点)。
    • setUserDestinationPrefix:设置用户目的地前缀,用于点对点推送(如/user/{userId}/message)。

6.3 创建Controller处理消息

在Spring MVC中,我们可以使用@MessageMapping注解来处理客户端发送到特定目的地的消息。这些方法通常返回一个对象,该对象会被自动转换并发送到指定的目的地。

java

import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Controller; @Controller public class WebSocketController { /** * 客户端发送消息到 /app/hello,服务器处理后,将结果广播给所有订阅了 /topic/greetings 的客户端 */ @MessageMapping("/hello") @SendTo("/topic/greetings") public Greeting greeting(HelloMessage message) throws Exception { // 模拟处理延迟 Thread.sleep(1000); return new Greeting("Hello, " + message.getName() + "!"); } } // 消息体类 class HelloMessage { private String name; // getter/setter... } class Greeting { private String content; // constructor, getter/setter... }

  • @MessageMapping("/hello"):表示当客户端发送到/app/hello(因为我们在配置中设置了应用前缀/app)的消息会路由到此方法。
  • @SendTo("/topic/greetings"):指定方法的返回值将发送到/topic/greetings目的地,所有订阅了该目的地的客户端都会收到。

如果不使用@SendTo,也可以使用SimpMessagingTemplate手动发送。

6.4 使用SimpMessagingTemplate推送消息

SimpMessagingTemplate是Spring提供的用于向客户端推送消息的工具类。我们可以将它注入到任何Spring管理的Bean中,灵活地推送消息。

java

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping; @RestController public class PushController { @Autowired private SimpMessagingTemplate messagingTemplate; @GetMapping("/push") public String pushToAll() { // 向所有订阅了 /topic/news 的客户端推送消息 messagingTemplate.convertAndSend("/topic/news", "突发新闻:Spring Boot 3.0发布!"); return "推送成功"; } @GetMapping("/push/user") public String pushToUser(String userId) { // 向特定用户推送消息,用户订阅了 /user/{userId}/message 才能收到 messagingTemplate.convertAndSendToUser(userId, "/message", "您有一条私信"); return "私信推送成功"; } }

convertAndSend用于广播,convertAndSendToUser用于点对点。注意convertAndSendToUser默认会拼接成/user/{userId}/message这样的目的地,用户客户端需要订阅/user/message(实际上前缀/user会被处理,后面详述)。

6.5 前端使用SockJS和STOMP.js

前端需要引入SockJS和STOMP.js库。可以通过CDN或本地文件引入。

html

<!DOCTYPE html> <html> <head> <title>STOMP over WebSocket Demo</title> <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script> </head> <body> <div> <input type="text" placeholder="你的名字" /> <button οnclick="sendName()">发送</button> </div> <div></div> <script> var stompClient = null; function connect() { var socket = new SockJS('/ws-stomp'); // 连接SockJS端点 stompClient = Stomp.over(socket); stompClient.connect({}, function(frame) { console.log('连接成功: ' + frame); // 订阅服务器的广播主题 /topic/greetings stompClient.subscribe('/topic/greetings', function(greeting) { showGreeting(JSON.parse(greeting.body).content); }); // 订阅用户的私有消息(当前用户,服务器会根据连接自动识别用户) stompClient.subscribe('/user/message', function(message) { showGreeting("私信: " + message.body); }); }, function(error) { console.log('连接失败: ' + error); }); } function sendName() { var name = document.getElementById('name').value; stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name })); } function showGreeting(message) { var div = document.getElementById('greetings'); div.innerHTML += "<p>" + message + "</p>"; } // 页面加载完成后自动连接 window.onload = connect; </script> </body> </html>

  • 连接端点:new SockJS('/ws-stomp'),对应服务端注册的端点。
  • 连接成功后,订阅/topic/greetings/user/message
  • 发送消息:stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name })),目的地址为/app/hello
  • 收到消息后解析并显示。

6.6 广播与点对点的深入理解

6.6.1 广播(Topic)

广播模式:所有订阅了某个主题的客户端都会收到消息。在配置中我们启用了简单消息代理,并指定了前缀/topic。任何发送到/topic/**的消息都会被转发给所有订阅了该目的地的客户端。例如:

  • 服务器调用convertAndSend("/topic/news", "内容"),所有订阅/topic/news的客户端都会收到。
6.6.2 点对点(Queue)

点对点模式:消息只发送给特定的用户。Spring内部通过/user/{username}/**这样的目的地来实现。客户端订阅时需要订阅/user/queue/message,服务器发送时使用convertAndSendToUser(username, "/queue/message", payload)。Spring会自动将/user前缀转换为用户特定的目的地。

注意:点对点需要知道当前用户是谁。通常结合Spring Security,通过认证的用户信息来确定用户标识。如果没有Spring Security,也可以在连接时传递用户信息,但比较复杂。

6.6.3 使用@SendToUser

@MessageMapping方法上可以使用@SendToUser,表示将返回值发送给当前发送消息的用户(而不是广播)。例如:

java

@MessageMapping("/private") @SendToUser("/queue/private") public String handlePrivate(String message) { return "这是你的私密回声:" + message; }

客户端需要订阅/user/queue/private来接收。

7. 安全集成(Spring Security + WebSocket)

在实际应用中,WebSocket连接往往需要鉴权,确保只有合法用户才能连接和订阅某些目的地。Spring Security提供了与WebSocket的集成。

7.1 添加Spring Security依赖

xml

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency>

7.2 配置Spring Security

创建一个简单的Security配置类,启用基本认证,并设置用户内存存储用于测试。

java

import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.web.builders.HttpSecurity; import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; import org.springframework.security.core.userdetails.User; import org.springframework.security.core.userdetails.UserDetailsService; import org.springframework.security.provisioning.InMemoryUserDetailsManager; import org.springframework.security.web.SecurityFilterChain; @Configuration @EnableWebSecurity public class SecurityConfig { @Bean public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { http .authorizeRequests() .antMatchers("/ws-stomp/**").authenticated() // WebSocket端点需要认证 .anyRequest().permitAll() .and() .formLogin() .permitAll() .and() .logout() .permitAll() .and() .csrf().disable(); // 简单起见禁用CSRF,生产环境需配置 return http.build(); } @Bean public UserDetailsService userDetailsService() { InMemoryUserDetailsManager manager = new InMemoryUserDetailsManager(); manager.createUser(User.withUsername("user1").password("{noop}password1").roles("USER").build()); manager.createUser(User.withUsername("user2").password("{noop}password2").roles("USER").build()); return manager; } }

注意:{noop}表示明文密码,仅为演示,生产环境应使用加密。

7.3 WebSocket拦截器获取用户信息

为了让WebSocket知道当前用户是谁,我们需要在握手阶段将认证信息传递进去。Spring提供了DefaultHandshakeHandler,我们可以重写determineUser方法。更常见的做法是结合Spring Security,在连接时通过Cookie或Header传递token,然后解析。

在STOMP配置中,我们可以添加一个ChannelInterceptor,用于在消息进入时设置用户。

java

import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; @Configuration @EnableWebSocketMessageBroker public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws-stomp") .setAllowedOrigins("*") .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app"); registry.enableSimpleBroker("/topic", "/queue"); registry.setUserDestinationPrefix("/user"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { // 添加自定义的拦截器,用于从消息中提取用户信息 registration.interceptors(new UserInterceptor()); } }

UserInterceptor需要实现ChannelInterceptor接口,重写preSend方法,从消息头中获取认证信息,设置到Message中。

java

import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; public class UserInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); // 如果是连接帧,可以从header中获取token,然后设置认证用户 if (accessor != null && accessor.getCommand() != null) { switch (accessor.getCommand()) { case CONNECT: // 假设客户端在连接时通过header传递了token,例如 "Authorization: Bearer xxx" String token = accessor.getFirstNativeHeader("Authorization"); if (token != null && token.startsWith("Bearer ")) { token = token.substring(7); // 解析token获取用户信息,此处简化:根据token查数据库或JWT解析 // 假设解析出用户名为user1 // 然后创建Authentication对象 // 这里简单模拟:从内存中获取用户 // 实际中需要从JWT解析出用户名,再加载UserDetails // 然后创建UsernamePasswordAuthenticationToken // 并将authentication设置到SecurityContext中 // 同时设置accessor.setUser(authentication) // ... } break; case SUBSCRIBE: // 可以检查订阅目的地权限 break; // 其他命令... } } return message; } }

一种更简单的做法是:如果已经通过HTTP登录(如使用表单登录),那么在WebSocket握手时,由于同源,JSESSIONID Cookie会自动携带,Spring Security会识别出已登录的用户,自动将用户信息绑定到WebSocket会话上。只要连接路径与HTTP共享相同的会话,就可以。因此,很多情况下不需要额外配置,只需确保WebSocket端点在安全配置中允许已认证用户访问即可。

7.4 基于角色的目的地权限控制

除了认证,我们还可能需要对目的地进行权限控制,比如只有管理员才能订阅/topic/admin。Spring Security提供了@PreAuthorize等注解,但需要配合消息拦截器。

我们可以扩展ChannelInterceptor,在preSend中检查目标地址和用户角色。

java

@Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (accessor != null && StompCommand.SUBSCRIBE.equals(accessor.getCommand())) { String destination = accessor.getDestination(); Authentication auth = SecurityContextHolder.getContext().getAuthentication(); if (destination != null && destination.startsWith("/topic/admin") && (auth == null || !auth.getAuthorities().contains(new SimpleGrantedAuthority("ROLE_ADMIN")))) { // 没有权限,拒绝订阅 throw new AccessDeniedException("无权限订阅此主题"); } } return message; }

8. 集群环境下的WebSocket

在生产环境中,应用通常会部署多个实例以实现高可用和负载均衡。WebSocket连接是状态化的(每个连接对应一个会话),在集群环境下会面临会话共享问题:用户连接到实例A,但推送消息时可能由实例B发送,实例B没有该用户的会话信息,导致推送失败。

8.1 问题分析

  • WebSocket会话(WebSocketSession)保存在每个节点的内存中,其他节点无法访问。
  • 当使用简单消息代理(enableSimpleBroker)时,消息只在当前节点内分发,无法跨节点广播给连接到其他节点的客户端。
  • 点对点消息也可能发送到错误的节点。

8.2 解决方案概览

  • 使用外部消息代理:如RabbitMQ、ActiveMQ、Kafka等,作为消息的中转站。所有节点都连接到同一个消息代理,订阅相关的主题或队列。当某个节点需要推送消息时,将消息发送到消息代理,代理再将消息广播给所有订阅了该主题的节点,各节点再将消息推送给各自的客户端。
  • 会话信息集中存储:将会话信息(如用户与节点的映射关系)存储在Redis等集中式缓存中,推送时根据用户找到对应的节点,然后通过HTTP或RPC转发推送请求。但这种方式实现较复杂,不如使用消息代理直接。
  • 使用Stomp Broker Relay:Spring提供了StompBrokerRelayMessageHandler,可以配置将消息转发给外部的STOMP代理(如RabbitMQ的STOMP插件)。

8.3 使用RabbitMQ作为外部消息代理

8.3.1 安装并启用RabbitMQ STOMP插件

RabbitMQ默认不支持STOMP,需要启用插件:

bash

rabbitmq-plugins enable rabbitmq_stomp

8.3.2 修改Spring Boot配置

添加RabbitMQ依赖:

xml

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- RabbitMQ的STOMP支持通常不需要额外依赖,但可能需要spring-rabbit -->

修改StompWebSocketConfig,使用StompBrokerRelay代替简单的内存代理:

java

@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app"); // 使用RabbitMQ作为外部消息代理 registry.enableStompBrokerRelay("/topic", "/queue") .setRelayHost("localhost") .setRelayPort(61613) // STOMP插件默认端口 .setClientLogin("guest") .setClientPasscode("guest") .setSystemLogin("guest") .setSystemPasscode("guest"); registry.setUserDestinationPrefix("/user"); }

这样配置后,消息的广播和点对点都将通过RabbitMQ进行分发,所有应用节点都连接到同一个RabbitMQ,从而实现跨节点通信。

8.3.3 注意事项
  • 需要确保RabbitMQ的STOMP插件监听端口(默认为61613)可访问。
  • 生产环境需配置正确的用户名密码,并考虑使用SSL。
  • 集群节点数量较多时,消息代理的性能成为瓶颈,需合理配置。

8.4 使用Redis存储会话

另一种思路是不用消息代理,而是将会话信息存储在Redis中。推送时,根据用户ID从Redis查找该用户当前连接到哪个节点(存储节点地址),然后通过HTTP调用该节点的API进行推送。这种方式需要自己实现节点间通信,但比较灵活。

不过,这种方法会引入额外的网络开销,且需要处理节点故障时的会话迁移。相比消息代理,实现复杂度更高。通常推荐使用消息代理方案。

9. 性能优化与最佳实践

9.1 心跳机制

WebSocket本身有Ping/Pong帧用于心跳检测,保持连接不超时。STOMP也支持心跳,客户端和服务器可以协商心跳间隔。在Spring中,可以配置心跳:

java

registry.enableSimpleBroker("/topic", "/queue") .setHeartbeatValue(new long[]{10000, 10000}); // 客户端和服务器均每10秒发送一次心跳

对于StompBrokerRelay,也需要配置心跳。

9.2 线程模型

Spring WebSocket使用任务线程池处理消息。默认情况下,@EnableWebSocketMessageBroker会配置一个ThreadPoolTaskExecutor。我们可以自定义线程池参数,防止高并发下线程耗尽。

java

@Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.taskExecutor() .corePoolSize(10) .maxPoolSize(20) .keepAliveSeconds(60); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor() .corePoolSize(10) .maxPoolSize(20) .keepAliveSeconds(60); }

9.3 消息大小限制

WebSocket协议本身对消息大小没有严格限制,但底层实现和代理可能有默认限制。在Spring中,可以通过WebSocketTransportProperties调整,或者配置StompBrokerRelaymessageSizeLimit

java

@Bean public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler( StompBrokerRelayRegistration registration, MessageBrokerProperties properties) { StompBrokerRelayMessageHandler handler = new StompBrokerRelayMessageHandler(registration); handler.setMessageSizeLimit(1024 * 1024); // 1MB return handler; }

9.4 连接数限制

WebSocket是基于TCP的长连接,每个连接都会占用服务器资源。操作系统有最大文件描述符限制,应用服务器也有相关配置。可以通过以下方式优化:

  • 使用Nginx等反向代理负载均衡,分散连接压力。
  • 调整Tomcat的最大连接数(server.tomcat.max-connections)。
  • 合理设置心跳,及时清理僵尸连接。

9.5 压缩与编码

对于大量数据的传输,可以考虑使用二进制格式(如Protocol Buffers、MessagePack)代替JSON,减少数据大小。Spring支持自定义消息转换器。

10. 测试WebSocket

10.1 使用浏览器开发者工具

现代浏览器(Chrome、Firefox)的开发者工具中,可以在“Network”标签查看WebSocket帧,或者使用专门的WebSocket测试插件。

10.2 使用命令行工具

websocatwscat(Node.js)等。例如:

bash

wscat -c ws://localhost:8080/ws/echo

10.3 Spring Boot集成测试

Spring提供了WebSocketStompClient用于编写测试。我们可以模拟客户端连接、发送和接收消息。

java

import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.simp.stomp.StompFrameHandler; import org.springframework.messaging.simp.stomp.StompHeaders; import org.springframework.messaging.simp.stomp.StompSession; import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient; import java.lang.reflect.Type; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class WebSocketTest { @Test void testGreeting() throws Exception { WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient()); stompClient.setMessageConverter(new MappingJackson2MessageConverter()); String url = "ws://localhost:" + port + "/ws-stomp"; StompSession session = stompClient.connect(url, new StompSessionHandlerAdapter() {}).get(1, TimeUnit.SECONDS); CompletableFuture<Greeting> future = new CompletableFuture<>(); session.subscribe("/topic/greetings", new StompFrameHandler() { @Override public Type getPayloadType(StompHeaders headers) { return Greeting.class; } @Override public void handleFrame(StompHeaders headers, Object payload) { future.complete((Greeting) payload); } }); session.send("/app/hello", new HelloMessage("TestUser")); Greeting greeting = future.get(5, TimeUnit.SECONDS); assertThat(greeting.getContent()).contains("Hello, TestUser"); } // 需要注入端口 @LocalServerPort private int port; }

11. 常见问题与解决方案

11.1 连接失败(404)

  • 检查端点路径是否正确,包括上下文路径(server.servlet.context-path)。
  • 检查是否启用了SockJS,前端连接时是否使用了正确路径。
  • 如果使用STOMP,确认配置了setAllowedOrigins,防止跨域问题。

11.2 消息丢失

  • 确认客户端订阅了正确的目的地。
  • 如果是点对点消息,确保convertAndSendToUser中的用户名正确,且客户端订阅了/user/...
  • 检查网络或代理是否关闭了连接。

11.3 断线重连

前端STOMP客户端可以监听断开事件,并重新连接。例如:

javascript

stompClient.connect({}, function(frame) { // 连接成功 }, function(error) { // 连接失败,延时重连 setTimeout(connect, 5000); });

同时,可以在服务端设置心跳,检测失效连接并清理。

11.4 跨域问题

registerStompEndpoints中通过setAllowedOrigins("*")允许所有域名,生产环境应指定具体域名。注意,SockJS不支持*,需要使用具体域名或使用setAllowedOriginPatterns("*")

11.5 客户端无法接收广播消息

  • 确保客户端订阅了正确的主题,如/topic/news
  • 检查服务器推送的目的地是否一致。
  • 如果使用外部消息代理,检查代理配置。

11.6 性能瓶颈

  • 如果单节点连接数过多,考虑水平扩展并使用消息代理。
  • 调整JVM内存和GC参数。
  • 使用异步、非阻塞IO。

12. 实战案例:实时通知系统

下面我们通过一个简单的实时通知系统,综合运用上述知识。

12.1 需求描述

用户登录系统后,可以收到两类通知:

  • 系统广播:所有在线用户都能收到的通知(如系统维护公告)。
  • 个人私信:仅特定用户收到的通知(如评论回复)。

12.2 后端实现

  1. 配置类:使用STOMP over WebSocket,配置端点、消息代理。
  2. 安全配置:集成Spring Security,使用内存用户。
  3. 通知实体

java

public class Notification { private String type; // "BROADCAST" 或 "PRIVATE" private String content; private String targetUser; // 私信目标用户,广播时为空 private Date timestamp; // 构造、getter、setter }

  1. Controller处理私信发送(来自其他用户)

java

@MessageMapping("/private") public void sendPrivate(@Payload Notification notification, Principal principal) { // 谁发送的?可通过principal获取 notification.setTimestamp(new Date()); // 发送给目标用户 messagingTemplate.convertAndSendToUser(notification.getTargetUser(), "/queue/notifications", notification); }

  1. 广播API

java

@RestController @RequestMapping("/api/notify") public class NotifyController { @Autowired private SimpMessagingTemplate messagingTemplate; @PostMapping("/broadcast") public String broadcast(@RequestBody String content) { Notification notification = new Notification("BROADCAST", content, null, new Date()); messagingTemplate.convertAndSend("/topic/broadcast", notification); return "广播成功"; } }

12.3 前端实现

登录页面(省略),假设登录后跳转到主页。主页HTML包含:

javascript

function connect() { var socket = new SockJS('/ws-stomp'); stompClient = Stomp.over(socket); stompClient.connect({}, function(frame) { // 订阅广播 stompClient.subscribe('/topic/broadcast', function(msg) { var notif = JSON.parse(msg.body); showNotification(notif.content); }); // 订阅私有通知 stompClient.subscribe('/user/queue/notifications', function(msg) { var notif = JSON.parse(msg.body); showNotification("私信:" + notif.content); }); }); }

12.4 测试

  • 启动应用,用两个不同用户登录(如user1和user2)。
  • 调用广播API,所有在线用户都能看到通知。
  • 发送私信给user1,只有user1能看到。

13. 总结与展望

本文从WebSocket的基本概念出发,详细介绍了在Spring Boot中集成WebSocket的两种方式:原生WebSocket和基于STOMP的消息代理方式。我们探讨了消息的广播与点对点推送、安全集成、集群部署以及性能优化。通过实战案例,展示了如何构建一个实时通知系统。

WebSocket技术为实时Web应用提供了强大的支持,结合Spring Boot的便捷性,开发者可以快速构建出高实时性的应用。随着Web技术的不断发展,WebSocket的应用场景将越来越广泛,比如在线游戏、协同编辑、实时监控等

Read more

DeepSeek各版本说明与优缺点分析_deepseek各版本区别

DeepSeek各版本说明与优缺点分析 DeepSeek是最近人工智能领域备受瞩目的一个语言模型系列,其在不同版本的发布过程中,逐步加强了对多种任务的处理能力。本文将详细介绍DeepSeek的各版本,从版本的发布时间、特点、优势以及不足之处,为广大AI技术爱好者和开发者提供一份参考指南。 1. DeepSeek-V1:起步与编码强劲 DeepSeek-V1是DeepSeek的起步版本,这里不过多赘述,主要分析它的优缺点。 发布时间: 2024年1月 特点: DeepSeek-V1是DeepSeek系列的首个版本,预训练于2TB的标记数据,主打自然语言处理和编码任务。它支持多种编程语言,具有强大的编码能力,适合程序开发人员和技术研究人员使用。 优势: * 强大编码能力:支持多种编程语言,能够理解和生成代码,适合开发者进行自动化代码生成与调试。 * 高上下文窗口:支持高达128K标记的上下文窗口,能够处理较为复杂的文本理解和生成任务。 缺点: * 多模态能力有限:该版本主要集中在文本处理上,缺少对图像、语音等多模态任务的支持。 * 推理能力较弱:尽管在自然语言

By Ne0inhk
【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱

【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱

【DeepSeek应用】Deepseek R1 本地部署(Ollama+Docker+OpenWebUI) 【DeepSeek应用】DeepSeek 搭建个人知识库(Ollama+CherryStudio) 【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱 【DeepSeek应用】Zotero+Deepseek 阅读与分析文献 【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱 * 1. DeepSeek 工具箱:应用程序 * 2. DeepSeek 工具箱:AI Agent 框架 * 3. DeepSeek 工具箱:RAG 框架 * 4. DeepSeek 工具箱:即时通讯软件 * 5. DeepSeek 工具箱:浏览器插件 * 6. DeepSeek 工具箱:

By Ne0inhk
“现在的AI就像1880年的笨重工厂!”微软CSO斯坦福泼冷水:别急着造神

“现在的AI就像1880年的笨重工厂!”微软CSO斯坦福泼冷水:别急着造神

大模型仍未对上商业的齿轮? 编译 | 王启隆 来源 | youtu.be/aWqfH0aSGKI 出品丨AI 科技大本营(ID:rgznai100) 现在的硅谷,空气里都飘着一股“再不上车就晚了”的焦躁感。 最近 OpenClaw 风头正旺,强势登顶 GitHub,终结了 React 神话,许多人更是觉得“AI 自己干活赚钱”的日子就在明天了。 特别是在斯坦福商学院(GSB)这种地方,台下坐着的都是成天琢磨怎么用下一个技术风口搞个独角兽出来的狠人。 微软的首席科学官(CSO)Eric Horvitz 被请到了这个几乎全美最想用 AI 变现的礼堂里。作为从上世纪 80 年代就开始搞 AI 的绝对老炮、也是微软技术底座的“扫地僧”,这位老哥并没有顺着台下的胃口,去吹捧下个月大模型又要颠覆什么行业,而是兜头给大家浇了一盆带点学术味的冷水。 他讲了一个挺有画面感的比喻:大家都在聊

By Ne0inhk
Godot被AI代码“围攻”!维护者崩溃发声:“不知道还能坚持多久”

Godot被AI代码“围攻”!维护者崩溃发声:“不知道还能坚持多久”

整理 | 郑丽媛 出品 | ZEEKLOG(ID:ZEEKLOGnews) 当大模型能在几秒钟内生成一段“看起来像那么回事”的补丁时,开源社区却开始付出另一种代价。 最近,开源游戏引擎 Godot 的核心维护团队公开吐槽:他们正被大量“AI 生成的低质量代码”淹没。那些代码往往结构完整、注释齐全、描述洋洋洒洒,但真正的问题是——提交者可能并不理解自己交上来的内容。 这件事,并不是简单的“有人偷懒用 AI 写代码”。它正在触及开源协作最核心的东西:信任。 一场悄无声息的“AI 洪水” 事情的导火索来自一条 Bluesky 讨论帖。 Godot 主要维护者之一、同时也是 Godot 商业支持公司 W4 Games 联合创始人的 Rémi Verschelde 表示,所谓的“AI slop”

By Ne0inhk