跳到主要内容WebSocket 实战:基于 Spring Boot 构建实时通信系统 | 极客日志Java大前端java
WebSocket 实战:基于 Spring Boot 构建实时通信系统
基于 Spring Boot 和 WebSocket 技术栈,实现高并发实时通信的在线聊天室系统。方案包含 STOMP 协议配置、会话管理服务、心跳检测机制及集群部署策略。详细解析了前后端交互逻辑,解决连接超时、内存泄漏及消息丢失等常见问题,提供完整的生产级代码示例与最佳实践建议。
ServerBase1 浏览 WebSocket 实战:基于 Spring Boot 构建实时通信系统
一、为什么需要 WebSocket?
1.1 HTTP 的局限性
在深入 WebSocket 之前,先看看传统 HTTP 协议在实时场景下的痛点。
想象一个聊天室场景:用户 A 发送消息给用户 B。如果用 HTTP 实现,通常采用轮询方式:客户端不断向服务器发起请求询问'有新消息吗?'。
这种方式的弊端很明显:
- 资源浪费:大部分请求都是空轮询,无意义消耗带宽。
- 实时性差:延迟取决于轮询间隔,无法做到毫秒级推送。
- 服务器压力大:大量无效连接和请求会迅速耗尽服务器资源。
1.2 WebSocket 的解决方案
WebSocket 的出现完美解决了上述问题。它通过一次握手建立长连接,之后保持全双工通信状态。
核心优势:
- 一次连接,双向通信:服务器可以主动推送数据给客户端,无需客户端反复请求。
- 低延迟:建立连接后,无需重复握手,数据传输开销极小。
- 轻量级:消息头部小,适合高频交互场景。
二、WebSocket 基础概念
2.1 什么是 WebSocket?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,由 RFC 6455 定义。它允许服务器主动向客户端推送数据,是构建实时应用的基础设施。
核心特点:
- 全双工:客户端和服务器可以同时发送和接收消息。
- 低延迟:连接建立后,传输几乎无额外开销。
- 跨域支持:天然支持跨域通信,简化前端部署。
2.2 连接状态
WebSocket 连接生命周期包含四种状态,理解它们有助于调试连接问题:
console.log(WebSocket.CONNECTING);
console.log(WebSocket.OPEN);
console.log(WebSocket.CLOSING);
console.log(WebSocket.CLOSED);
三、实战:在线聊天室系统
我们将实现一个完整的在线聊天室,具备以下功能:
- 用户连接/断开管理
- 群聊广播与私聊
- 在线用户列表
- 心跳检测机制
3.1 整体架构
系统采用分层设计,确保高可用性和扩展性:
- 接入层:处理 WebSocket 连接握手与鉴权。
- 业务层:负责消息分发、会话管理及业务逻辑。
- 数据层:使用 Redis 存储会话信息,MySQL 持久化历史消息。
客户端层:浏览器端通过 STOMP 协议与后端交互。3.2 环境搭建
首先配置 Maven 依赖,引入 WebSocket 启动器及相关工具库。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>
<groupId>com.example</groupId>
<artifactId>websocket-chat</artifactId>
<version>1.0.0</version>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
配置文件 application.yml 设置服务端口及 WebSocket 参数:
server:
port: 8080
spring:
application:
name: websocket-chat
redis:
host: localhost
port: 6379
database: 0
timeout: 5000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
websocket:
endpoint: /ws
allowed-origins: "*"
heartbeat-interval: 30
max-message-size: 65536
send-buffer-size: 524288
send-timeout: 20000
3.3 核心数据结构
package com.example.websocket.model;
import lombok.Data;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
private MessageType type;
private String senderId;
private String senderName;
private String receiverId;
private String content;
private LocalDateTime timestamp;
private String messageId;
public enum MessageType {
CHAT, JOIN, LEAVE, SYSTEM, HEARTBEAT, ONLINE_USERS, ERROR
}
}
package com.example.websocket.model;
import lombok.Data;
import lombok.Builder;
import org.springframework.web.socket.WebSocketSession;
import java.time.LocalDateTime;
@Data
@Builder
public class WebSocketSessionInfo {
private String sessionId;
private String userId;
private String username;
private LocalDateTime connectTime;
private LocalDateTime lastHeartbeatTime;
private String clientIp;
private transient WebSocketSession session;
}
3.4 配置类
WebSocket 配置是核心,我们使用 STOMP 协议来规范消息路由。
package com.example.websocket.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Value("${websocket.endpoint:/ws}")
private String endpoint;
@Value("${websocket.allowed-origins:*}")
private String allowedOrigins;
@Value("${websocket.max-message-size:65536}")
private int maxMessageSize;
@Value("${websocket.send-buffer-size:524288}")
private int sendBufferSize;
@Value("${websocket.send-timeout:20000}")
private long sendTimeout;
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/queue", "/user");
registry.setApplicationDestinationPrefixes("/app");
registry.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint(endpoint).setAllowedOriginPatterns(allowedOrigins);
registry.addEndpoint(endpoint + "-raw").setAllowedOriginPatterns(allowedOrigins).withSockJS();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(maxMessageSize)
.setSendBufferSizeLimit(sendBufferSize)
.setSendTimeLimit(sendTimeout);
}
}
为什么需要 STOMP?
STOMP(Simple Text Oriented Messaging Protocol)是 WebSocket 的子协议,它提供了消息格式规范、目的地概念(类似 Topic/Queue)以及订阅机制,让 WebSocket 更像是一个消息队列,易于扩展事务和认证功能。
3.5 消息处理器
创建消息控制器来处理具体的业务逻辑,包括群聊、私聊和心跳。
package com.example.websocket.controller;
import com.example.websocket.model.ChatMessage;
import com.example.websocket.service.UserSessionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.*;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {
private final SimpMessagingTemplate messagingTemplate;
private final UserSessionService sessionService;
@MessageMapping("/chat.send")
@SendTo("/topic/public")
public ChatMessage sendPublicMessage(@Payload ChatMessage message, SimpMessageHeaderAccessor headerAccessor) {
log.info("收到公共聊天消息:{} -> {}", message.getSenderName(), message.getContent());
message.setMessageId(UUID.randomUUID().toString());
message.setTimestamp(LocalDateTime.now());
message.setType(ChatMessage.MessageType.CHAT);
return message;
}
@MessageMapping("/chat.private")
public void sendPrivateMessage(@Payload ChatMessage message, SimpMessageHeaderAccessor headerAccessor) {
log.info("处理私聊消息:{} -> {}: {}", message.getSenderName(), message.getReceiverId(), message.getContent());
message.setMessageId(UUID.randomUUID().toString());
message.setTimestamp(LocalDateTime.now());
message.setType(ChatMessage.MessageType.CHAT);
messagingTemplate.convertAndSendToUser(
message.getReceiverId(), "/queue/private", message
);
messagingTemplate.convertAndSendToUser(
message.getSenderId(), "/queue/private", message
);
}
@MessageMapping("/chat.join")
@SendTo("/topic/public")
public ChatMessage userJoin(@Payload ChatMessage message, SimpMessageHeaderAccessor headerAccessor) {
String sessionId = headerAccessor.getSessionId();
headerAccessor.getSessionAttributes().put("username", message.getSenderName());
headerAccessor.getSessionAttributes().put("userId", message.getSenderId());
sessionService.registerSession(
message.getSenderId(), message.getSenderName(), sessionId, headerAccessor.getSession()
);
log.info("用户加入:{} ({})", message.getSenderName(), message.getSenderId());
ChatMessage joinMessage = ChatMessage.builder()
.type(ChatMessage.MessageType.JOIN)
.senderId("system")
.senderName("系统")
.content(message.getSenderName() + " 加入了聊天室")
.timestamp(LocalDateTime.now())
.messageId(UUID.randomUUID().toString())
.build();
return joinMessage;
}
@MessageMapping("/heartbeat")
public void handleHeartbeat(@Payload ChatMessage message, SimpMessageHeaderAccessor headerAccessor) {
String sessionId = headerAccessor.getSessionId();
sessionService.updateHeartbeat(sessionId);
log.debug("收到心跳:{} - {}", sessionId, message.getSenderId());
}
@MessageMapping("/users.online")
public void getOnlineUsers(SimpMessageHeaderAccessor headerAccessor) {
String sessionId = headerAccessor.getSessionId();
ChatMessage message = ChatMessage.builder()
.type(ChatMessage.MessageType.ONLINE_USERS)
.senderId("system")
.senderName("系统")
.content("在线用户列表")
.timestamp(LocalDateTime.now())
.messageId(UUID.randomUUID().toString())
.build();
messagingTemplate.convertAndSendToUser(
sessionId, "/queue/users",
Map.of("type", "ONLINE_USERS", "users", sessionService.getAllOnlineUsers())
);
}
@MessageExceptionHandler
@SendTo("/queue/errors")
public ChatMessage handleException(Exception e) {
log.error("处理消息时发生异常", e);
return ChatMessage.builder()
.type(ChatMessage.MessageType.ERROR)
.senderId("system")
.senderName("系统")
.content("消息处理失败:" + e.getMessage())
.timestamp(LocalDateTime.now())
.messageId(UUID.randomUUID().toString())
.build();
}
}
3.6 会话管理服务
我们需要一个服务来管理所有活跃的 WebSocket 连接,包括注册、注销和心跳检测。
package com.example.websocket.service;
import com.example.websocket.model.WebSocketSessionInfo;
import com.example.websocket.model.ChatMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketSession;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
@Service
public class UserSessionService {
private final Map<String, WebSocketSessionInfo> sessions = new ConcurrentHashMap<>();
private final Map<String, String> userToSession = new ConcurrentHashMap<>();
private final SimpMessagingTemplate messagingTemplate;
@Value("${websocket.heartbeat-interval:30}")
private int heartbeatInterval;
public UserSessionService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
public void registerSession(String userId, String username, String sessionId, WebSocketSession session) {
WebSocketSessionInfo sessionInfo = WebSocketSessionInfo.builder()
.sessionId(sessionId)
.userId(userId)
.username(username)
.connectTime(LocalDateTime.now())
.lastHeartbeatTime(LocalDateTime.now())
.clientIp(getClientIp(session))
.session(session)
.build();
sessions.put(sessionId, sessionInfo);
userToSession.put(userId, sessionId);
log.info("用户会话注册成功:{} - {} - {}", userId, username, sessionId);
broadcastOnlineUsers();
}
public void removeSession(String sessionId) {
WebSocketSessionInfo sessionInfo = sessions.remove(sessionId);
if (sessionInfo != null) {
userToSession.remove(sessionInfo.getUserId());
log.info("用户会话移除:{} - {}", sessionInfo.getUserId(), sessionInfo.getUsername());
ChatMessage leaveMessage = ChatMessage.builder()
.type(ChatMessage.MessageType.LEAVE)
.senderId("system")
.senderName("系统")
.content(sessionInfo.getUsername() + " 离开了聊天室")
.timestamp(LocalDateTime.now())
.build();
messagingTemplate.convertAndSend("/topic/public", leaveMessage);
broadcastOnlineUsers();
}
}
public void updateHeartbeat(String sessionId) {
WebSocketSessionInfo sessionInfo = sessions.get(sessionId);
if (sessionInfo != null) {
sessionInfo.setLastHeartbeatTime(LocalDateTime.now());
}
}
public List<Map<String, Object>> getAllOnlineUsers() {
return sessions.values().stream()
.map(info -> Map.of("userId", info.getUserId(), "username", info.getUsername(), "connectTime", info.getConnectTime()))
.collect(Collectors.toList());
}
private void broadcastOnlineUsers() {
messagingTemplate.convertAndSend("/topic/users.online",
Map.of("type", "ONLINE_USERS", "count", sessions.size(), "users", getAllOnlineUsers()));
}
private String getClientIp(WebSocketSession session) {
try {
return (String) session.getAttributes().get("clientIp");
} catch (Exception e) {
return "unknown";
}
}
@Scheduled(fixedDelay = 10000)
public void heartbeatCheck() {
LocalDateTime now = LocalDateTime.now();
sessions.entrySet().removeIf(entry -> {
WebSocketSessionInfo info = entry.getValue();
long seconds = ChronoUnit.SECONDS.between(info.getLastHeartbeatTime(), now);
if (seconds > heartbeatInterval * 3) {
log.warn("会话超时关闭:{} - {}, 最后心跳:{}", info.getUserId(), info.getUsername(), info.getLastHeartbeatTime());
try {
if (info.getSession() != null && info.getSession().isOpen()) {
info.getSession().close();
}
} catch (Exception e) {
log.error("关闭会话失败", e);
}
userToSession.remove(info.getUserId());
return true;
}
return false;
});
}
}
3.7 事件监听器
监听 WebSocket 的生命周期事件,如连接建立和断开,以便自动清理资源。
package com.example.websocket.listener;
import com.example.websocket.service.UserSessionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketEventListener {
private final UserSessionService sessionService;
@EventListener
public void handleSessionConnected(SessionConnectedEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();
log.info("WebSocket 连接建立:{}", sessionId);
}
@EventListener
public void handleSessionDisconnect(SessionDisconnectEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();
log.info("WebSocket 连接断开:{}", sessionId);
sessionService.removeSession(sessionId);
}
}
3.8 连接拦截器
添加握手拦截器进行 Token 验证和日志记录,确保安全性。
package com.example.websocket.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
@Slf4j
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
String token = servletRequest.getParameter("token");
String userId = servletRequest.getParameter("userId");
if (token == null || token.isEmpty()) {
log.warn("WebSocket 握手失败:token 为空");
return false;
}
if (!validateToken(token, userId)) {
log.warn("WebSocket 握手失败:token 无效");
return false;
}
attributes.put("userId", userId);
attributes.put("token", token);
attributes.put("clientIp", getClientIp(servletRequest));
log.info("WebSocket 握手成功:userId={}, ip={}", userId, attributes.get("clientIp"));
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
private boolean validateToken(String token, String userId) {
return token != null && !token.isEmpty();
}
private String getClientIp(HttpServletRequest request) {
String ip = request.getHeader("X-Forwarded-For");
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
}
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint(endpoint)
.setAllowedOriginPatterns(allowedOrigins)
.addInterceptors(webSocketHandshakeInterceptor)
.withSockJS();
}
3.9 前端实现
前端页面使用 HTML/CSS/JavaScript 配合 SockJS 和 STOMP.js 库。
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket 聊天室</title>
<style>
body { font-family: 'Microsoft YaHei', sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; padding: 20px; }
#login-container, #chat-container { max-width: 800px; margin: 0 auto; background: white; border-radius: 10px; box-shadow: 0 15px 35px rgba(0,0,0,0.2); overflow: hidden; }
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.1/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<div id="login-container" style="display: block;">
<div class="header"><h2>WebSocket 聊天室</h2></div>
<div class="content">
<div class="form-group"><label>用户 ID</label><input type="text" id="userId" placeholder="请输入用户 ID"></div>
<div class="form-group"><label>昵称</label><input type="text" id="username" placeholder="请输入昵称"></div>
<div class="form-group"><label>Token</label><input type="text" id="token" value="test-token"></div>
<button onclick="connect()">连接聊天室</button>
</div>
</div>
<div id="chat-container" style="display: none;">
<div class="chat-header">
<span id="currentUser"></span>
<span>在线人数:<span id="onlineCount">0</span></span>
<button onclick="disconnect()">断开连接</button>
</div>
<div class="chat-main">
<div class="sidebar"><h3>在线用户</h3><ul class="user-list" id="userList"></ul></div>
<div class="message-area">
<div class="messages" id="messages"></div>
<div class="input-area">
<input type="text" id="messageInput" placeholder="输入消息... 私聊请先点击用户">
<button onclick="sendMessage()">发送</button>
</div>
</div>
</div>
</div>
<script>
let stompClient = null;
let currentUser = null;
let selectedUserId = null;
function connect() {
const userId = document.getElementById('userId').value.trim();
const username = document.getElementById('username').value.trim();
const token = document.getElementById('token').value.trim();
if (!userId || !username || !token) { alert('请填写完整信息'); return; }
currentUser = { userId, username, token };
const socket = new SockJS('http://localhost:8080/ws');
stompClient = Stomp.over(socket);
stompClient.connect({token: token, userId: userId}, function(frame) {
console.log('连接成功:', frame);
document.getElementById('login-container').style.display = 'none';
.().. = ;
.(). = ;
stompClient.(, () {
(.(message.));
});
stompClient.(, () {
(.(message.));
});
stompClient.(, () {
(.(message.));
});
stompClient.(, {}, .({
: userId, : username, : username + , :
}));
();
();
stompClient.(, {});
}, () {
.(, error);
();
});
}
() {
(stompClient !== ) {
stompClient.(, {}, .({
: currentUser., : currentUser.
}));
stompClient.();
();
.().. = ;
.().. = ;
.(). = ;
.(). = ;
stompClient = ;
}
}
() {
input = .();
content = input..();
(!content || !stompClient) ;
(selectedUserId) {
stompClient.(, {}, .({
: currentUser., : currentUser.,
: selectedUserId, : content, :
}));
} {
stompClient.(, {}, .({
: currentUser., : currentUser.,
: content, :
}));
}
input. = ;
}
() {
messagesDiv = .();
isOwn = message. === currentUser.;
messageDiv = .();
messageDiv. = ;
time = message. ? (message.).() : ().();
html = ;
(message. === ) {
html = ;
} {
html = ;
}
messageDiv. = html;
messagesDiv.(messageDiv);
messagesDiv. = messagesDiv.;
}
() {
userList = .();
onlineCount = .();
onlineCount. = data. || ;
html = ;
data..( {
(user. !== currentUser.) {
html += ;
}
});
userList. = html;
}
() {
(selectedUserId === userId) {
selectedUserId = ;
.(). = ;
} {
selectedUserId = userId;
.(). = ;
}
}
() {
( {
(stompClient && stompClient.) {
stompClient.(, {}, .({
: currentUser., :
}));
}
}, );
}
() {
}
</script>
</body>
</html>
3.10 运行测试
- 启动 Spring Boot 应用。
- 打开浏览器访问
http://localhost:8080/chat.html。
- 打开多个标签页模拟多用户。
- 测试功能:用户连接/断开、群聊消息、私聊消息、在线用户列表、心跳检测。
四、高级特性
4.1 集群部署
当应用需要水平扩展时,WebSocket 的集群部署是个挑战。解决方案是使用消息中间件共享会话。
@Configuration
public class ClusterWebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest")
.setSystemLogin("guest")
.setSystemPasscode("guest");
registry.setApplicationDestinationPrefixes("/app");
registry.setUserDestinationPrefix("/user");
}
}
4.2 安全性
添加 Spring Security 支持,对消息访问进行控制。
@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
.simpTypeMatchers(SimpMessageType.CONNECT).permitAll()
.simpTypeMatchers(SimpMessageType.DISCONNECT).permitAll()
.simpDestMatchers("/topic/**", "/queue/**").authenticated()
.simpDestMatchers("/app/**").authenticated()
.anyMessage().denyAll();
}
@Override
protected boolean sameOriginDisabled() {
return true;
}
}
4.3 性能监控
集成 Micrometer 收集指标,便于观察系统健康度。
@Service
public class WebSocketMetricsService {
private final MeterRegistry meterRegistry;
private Counter messageCounter;
private Counter connectionCounter;
public WebSocketMetricsService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@PostConstruct
public void init() {
messageCounter = Counter.builder("websocket.messages").register(meterRegistry);
connectionCounter = Counter.builder("websocket.connections").register(meterRegistry);
}
public void incrementMessageCount() {
messageCounter.increment();
}
public void incrementConnectionCount() {
connectionCounter.increment();
}
}
五、常见问题及解决方案
5.1 连接超时问题
现象:连接经常自动断开。
解决方案:合理设置心跳间隔,确保两端都维持活跃状态。
setInterval(() => {
if (stompClient && stompClient.connected) {
stompClient.send('/app/heartbeat', {}, 'ping');
}
}, 25000);
5.2 内存泄漏
现象:随着运行时间增长,内存占用越来越大。
解决方案:确保及时清理失效会话,避免僵尸连接堆积。
@Component
public class SessionCleanupTask {
@Scheduled(fixedDelay = 60000)
public void cleanupStaleSessions() {
sessions.entrySet().removeIf(entry -> {
WebSocketSession session = entry.getValue();
if (!session.isOpen()) {
log.info("清理已关闭会话:{}", entry.getKey());
return true;
}
return false;
});
}
}
5.3 消息丢失
现象:客户端收不到某些消息。
解决方案:实现消息确认机制,确保投递可靠性。
@MessageMapping("/chat.send")
public void sendMessageWithAck(ChatMessage message, SimpMessageHeaderAccessor headerAccessor) {
try {
messagingTemplate.convertAndSend("/topic/public", message);
messagingTemplate.convertAndSendToUser(
headerAccessor.getSessionId(), "/queue/ack",
Map.of("messageId", message.getMessageId(), "status", "DELIVERED")
);
} catch (Exception e) {
messagingTemplate.convertAndSendToUser(
headerAccessor.getSessionId(), "/queue/error",
Map.of("messageId", message.getMessageId(), "error", e.getMessage())
);
}
}
六、总结
6.1 WebSocket 的应用场景
- 实时聊天:IM、客服系统
- 实时通知:系统告警、消息推送
- 实时数据:股票行情、游戏状态
- 协同编辑:在线文档、白板
- 物联网:设备状态监控
6.2 技术选型建议
| 场景 | 推荐方案 | 理由 |
|---|
| 简单应用 | 原生 WebSocket | 轻量、灵活 |
| 企业级应用 | STOMP + Spring | 功能完善、易于集成 |
| 高并发场景 | Netty | 性能优异 |
| 需要降级支持 | SockJS | 兼容性好 |
6.3 最佳实践
- 连接管理:统一管理会话,及时清理失效连接。
- 心跳机制:保持连接活跃,检测断连。
- 消息压缩:大消息启用压缩。
- 流量控制:限制消息大小和频率。
- 监控告警:实时监控连接数和消息量。
- 优雅关闭:确保资源正确释放。
document
getElementById
'chat-container'
style
display
'block'
document
getElementById
'currentUser'
textContent
`当前用户:${username}`
subscribe
'/topic/public'
function
message
displayMessage
JSON
parse
body
subscribe
'/user/queue/private'
function
message
displayMessage
JSON
parse
body
subscribe
'/topic/users.online'
function
message
updateUserList
JSON
parse
body
send
'/app/chat.join'
JSON
stringify
senderId
senderName
content
' 加入了聊天室'
type
'JOIN'
startHeartbeat
displaySystemMessage
'连接成功!'
send
'/app/users.online'
function
error
console
error
'连接失败:'
alert
'连接失败,请重试'
function
disconnect
if
null
send
'/app/chat.leave'
JSON
stringify
senderId
userId
senderName
username
disconnect
stopHeartbeat
document
getElementById
'login-container'
style
display
'block'
document
getElementById
'chat-container'
style
display
'none'
document
getElementById
'messages'
innerHTML
''
document
getElementById
'userList'
innerHTML
''
null
function
sendMessage
const
document
getElementById
'messageInput'
const
value
trim
if
return
if
send
'/app/chat.private'
JSON
stringify
senderId
userId
senderName
username
receiverId
content
type
'CHAT'
else
send
'/app/chat.send'
JSON
stringify
senderId
userId
senderName
username
content
type
'CHAT'
value
''
function
displayMessage
message
const
document
getElementById
'messages'
const
senderId
userId
const
document
createElement
'div'
className
`message ${message.type === 'SYSTEM' ? 'system' : ''}${isOwn ? 'own' : ''}`
const
timestamp
new
Date
timestamp
toLocaleTimeString
new
Date
toLocaleTimeString
let
''
if
type
'SYSTEM'
`<div>[${time}] ${message.content}</div>`
else
`<div><span>${message.senderName}</span> <span>${time}</span></div><div>${message.content}</div>`
innerHTML
appendChild
scrollTop
scrollHeight
function
updateUserList
data
const
document
getElementById
'userList'
const
document
getElementById
'onlineCount'
textContent
count
0
let
''
users
forEach
user =>
if
userId
userId
`<li onclick="selectUser('${user.userId}', '${user.username}')">${user.username}</li>`
innerHTML
function
selectUser
userId, username
if
null
document
getElementById
'messageInput'
placeholder
'输入消息...'
else
document
getElementById
'messageInput'
placeholder
`私聊 ${username}...`
function
startHeartbeat
setInterval
() =>
if
connected
send
'/app/heartbeat'
JSON
stringify
senderId
userId
type
'HEARTBEAT'
30000
function
stopHeartbeat
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online