说明
这是一个使用 Java JDK 8 和 Spring Boot 实现的 WebSocket 演示项目。目的是为解决多端消息通讯的问题。
WebSocket 是一种基于 TCP 的全双工通信协议,核心作用是解决传统 HTTP 协议'请求 - 响应'模式的局限性,实现客户端与服务器之间的实时、双向、低延迟数据传输。
一、功能介绍
功能特性
- 基于 Maven 的 Spring Boot 项目骨架。
- 纯 WebSocket 端点 /ws,支持用户隔离,http:使用 ws,https:使用 wss。
- 支持分片设置和缓冲区大小设置,解决传输内容限制。
- 提供静态测试页面 index.html,用于连接、发送消息、查看消息。
项目结构
- pom.xml:Spring Boot 3.3,依赖 spring-boot-starter-web 和 spring-boot-starter-websocket。
- src/main/java/com/example/websocket/WebSocketApplication.java:应用入口。
- src/main/java/com/example/websocket/WebSocketConfig.java:注册 WebSocket 处理器,端点为 /ws。
- src/main/java/com/example/websocket/ChatWebSocketHandler.java:文本消息处理,广播到所有会话。
- src/main/resources/static/index.html:页面内置 JS,连接 ws://{host}/ws,可发送、显示消息。
关键代码位置
- 启动类:src/main/java/com/example/websocket/WebSocketApplication.java
- WebSocket 配置:src/main/java/com/example/websocket/WebSocketConfig.java
- 文本消息处理器:src/main/java/com/example/websocket/ChatWebSocketHandler.java
- 静态页面:src/main/resources/static/index.html
测试连接
- 打开 http://localhost:8800,使用页面上的'连接/发送'测试。
- WebSocket 地址:ws://localhost:8080/ws
二、运行测试
可通过 UserId 来创建独立的联接,进行用户隔离。
三、核心代码说明
由于 websocket 对传输的内容有限制,若内容较大可进行缓冲区大小设置,并对不同文本进行分片处理。
ChatWebSocketHandler.java
package com.example.websocket;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.web.socket.BinaryMessage;
org.springframework.web.socket.CloseStatus;
org.springframework.web.socket.TextMessage;
org.springframework.web.socket.WebSocketSession;
org.springframework.web.socket.handler.AbstractWebSocketHandler;
com.fasterxml.jackson.databind.ObjectMapper;
{
ConcurrentHashMap<String, Set<WebSocketSession>> userSessions = <>();
();
ConcurrentHashMap<String, StringBuilder> textFragments = <>();
ConcurrentHashMap<String, ByteArrayOutputStream> binaryFragments = <>();
Exception {
resolveUserId(session);
(uid == || uid.isEmpty()) {
session.close(CloseStatus.BAD_DATA);
;
}
session.getAttributes().put(, uid);
userSessions.computeIfAbsent(uid, k -> ConcurrentHashMap.newKeySet()).add(session);
}
Exception {
session.getId();
(!message.isLast()) {
textFragments.computeIfAbsent(id, k -> ()).append(message.getPayload());
;
}
textFragments.remove(id);
sb != ? sb.append(message.getPayload()).toString() : message.getPayload();
routePayload(session, payload);
}
Exception {
session.getId();
message.getPayload();
[] chunk = [buf.remaining()];
buf.get(chunk);
binaryFragments.computeIfAbsent(id, k -> ());
acc.write(chunk);
(message.isLast()) {
[] all = acc.toByteArray();
binaryFragments.remove(id);
(all, StandardCharsets.UTF_8);
routePayload(session, payload);
}
}
{
;
}
Exception {
session.getAttributes().get();
(v == ) ;
String.valueOf(v);
Set<WebSocketSession> set = userSessions.get(uid);
(set != ) {
set.remove(session);
(set.isEmpty()) userSessions.remove(uid);
}
}
String {
session.getUri();
(uri == ) ;
uri.getQuery();
(q == || q.isEmpty()) ;
String[] parts = q.split();
(String p : parts) {
p.indexOf();
(i > ) {
p.substring(, i);
p.substring(i + );
(.equals(k)) val;
}
}
;
}
Exception {
session.getAttributes().get();
(v == ) ;
String.valueOf(v);
();
message.setFromUserId(fromUid);
{
MAPPER.readValue(payload, Message.class);
message.setToUserId(receivedMsg.getToUserId());
message.setContent(receivedMsg.getContent());
message.setType(receivedMsg.getType());
} (Exception e) {
message.setContent(payload);
}
message.getToUserId();
toUid != && !toUid.isEmpty();
Set<WebSocketSession> targets;
(isP2P) {
targets = userSessions.get(toUid);
} {
targets = userSessions.get(fromUid);
}
MAPPER.writeValueAsString(message);
(outStr);
(targets == || targets.isEmpty()) {
(session.isOpen()) {
session.sendMessage(msg);
}
;
}
(WebSocketSession s : targets) {
(s.isOpen()) {
s.sendMessage(msg);
}
}
(isP2P && session.isOpen()) {
session.sendMessage(msg);
}
}
}


