Rust 异步编程实战:构建高性能 WebSocket 服务
基于 Rust 异步编程构建高性能 WebSocket 服务的实战指南。内容涵盖 WebSocket 协议概述、使用 Axum 框架开发服务端(支持消息广播、心跳检测)、使用 Tungstenite 库开发客户端及重连机制。最后通过实时聊天应用案例,展示了用户管理、消息收发及性能优化方法。文章提供了完整的代码示例和常见问题解决方案,帮助开发者掌握 Rust 网络编程技能。

基于 Rust 异步编程构建高性能 WebSocket 服务的实战指南。内容涵盖 WebSocket 协议概述、使用 Axum 框架开发服务端(支持消息广播、心跳检测)、使用 Tungstenite 库开发客户端及重连机制。最后通过实时聊天应用案例,展示了用户管理、消息收发及性能优化方法。文章提供了完整的代码示例和常见问题解决方案,帮助开发者掌握 Rust 网络编程技能。

💡 WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它允许服务器主动向客户端发送消息,而不需要客户端先发起请求。这种通信方式适用于实时应用,如聊天应用、实时通知、在线游戏等。
WebSocket 协议的主要特点:
| 特性 | HTTP | WebSocket |
|---|---|---|
| 通信方式 | 客户端发起请求,服务器响应 | 全双工通信,服务器可以主动发送消息 |
| 连接类型 | 无状态,每次请求建立新连接 | 持久连接,连接建立后保持打开状态 |
| 延迟 | 高,因为每次请求需要建立连接 | 低,连接建立后直接通信 |
| 适用场景 | 静态资源请求、RESTful API | 实时应用,如聊天、通知、游戏等 |
Axum 是 Rust 社区中常用的异步 HTTP 框架,它提供了简单易用的 API 来实现 WebSocket 服务端。
在 Cargo.toml 中添加依赖:
[dependencies]
axum = { version = "0.5", features = ["ws"] }
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
实现 WebSocket 服务端:
use axum::{routing::get, extract::ws::{WebSocket, Message, WebSocketUpgrade}, response::IntoResponse, Router};
use tracing_subscriber::prelude::*;
use tracing::info;
async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
info!("New WebSocket connection");
ws.on_upgrade(|socket| handle_socket(socket))
}
async fn handle_socket(mut socket: WebSocket) {
// 发送欢迎消息
if let Err(e) = socket.send(Message::Text("Welcome to WebSocket server".to_string())).await {
info!("Error sending welcome message: {}", e);
return;
}
while let Some(msg) = socket.recv().await {
match msg {
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("Received message: {}", text);
// 回复消息
let reply = format!("You said: {}", text);
if let Err(e) = socket.send(Message::Text(reply)).await {
info!("Error sending message: {}", e);
break;
}
}
Message::Binary(data) => {
info!("Received binary message ({} bytes)", data.len());
}
Message::Ping(data) => {
info!("Received ping");
if let Err(e) = socket.send(Message::Pong(data)).await {
info!("Error sending pong: {}", e);
break;
}
}
Message::Pong(_) => {
info!("Received pong");
}
Message::Close(_) => {
info!("Connection closed");
break;
}
}
}
Err(e) => {
info!("Error receiving message: {}", e);
break;
}
}
}
info!("WebSocket connection closed");
}
#[tokio::main]
async fn main() {
// 初始化日志
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
// 创建路由
let app = Router::new().route("/ws", get(ws_handler));
// 启动服务器
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
info!("WebSocket server running on http://0.0.0.0:3000");
axum::serve(listener, app).await.unwrap();
}
在实际应用中,我们需要管理多个 WebSocket 连接,并支持消息广播。我们可以使用 tokio::sync::broadcast 通道来实现消息广播。
use axum::{routing::get, extract::ws::{WebSocket, Message, WebSocketUpgrade}, response::IntoResponse, Router};
use tracing_subscriber::prelude::*;
use tracing::info;
use tokio::sync::broadcast;
async fn ws_handler(ws: WebSocketUpgrade, tx: broadcast::Sender<String>) -> impl IntoResponse {
info!("New WebSocket connection");
let rx = tx.subscribe();
ws.on_upgrade(|socket| handle_socket(socket, tx, rx))
}
async fn handle_socket(mut socket: WebSocket, tx: broadcast::Sender<String>, mut rx: broadcast::Receiver<String>) {
// 发送欢迎消息
if let Err(e) = socket.send(Message::Text("Welcome to WebSocket server".to_string())).await {
info!("Error sending welcome message: {}", e);
return;
}
tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if let Err(e) = socket.send(Message::Text(msg)).await {
info!("Error sending broadcast message: {}", e);
break;
}
}
});
while let Some(msg) = socket.recv().await {
match msg {
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("Received message: {}", text);
// 广播消息
if let Err(e) = tx.send(text) {
info!("Error broadcasting message: {}", e);
break;
}
}
Message::Binary(data) => {
info!("Received binary message ({} bytes)", data.len());
}
Message::Ping(data) => {
info!("Received ping");
if let Err(e) = socket.send(Message::Pong(data)).await {
info!("Error sending pong: {}", e);
break;
}
}
Message::Pong(_) => {
info!("Received pong");
}
Message::Close(_) => {
info!("Connection closed");
break;
}
}
}
Err(e) => {
info!("Error receiving message: {}", e);
break;
}
}
}
info!("WebSocket connection closed");
}
#[tokio::main]
async fn main() {
// 初始化日志
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
// 创建广播通道
let (tx, _) = broadcast::channel(100);
// 创建路由
let app = Router::new().route("/ws", get(ws_handler)).with_state(tx);
// 启动服务器
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
info!("WebSocket server running on http://0.0.0.0:3000");
axum::serve(listener, app).await.unwrap();
}
为了确保 WebSocket 连接的有效性,我们需要实现心跳检测与连接超时机制。
use axum::{routing::get, extract::ws::{WebSocket, Message, WebSocketUpgrade}, response::IntoResponse, Router};
use tracing_subscriber::prelude::*;
use tracing::info;
use tokio::sync::broadcast;
use tokio::time::{interval, Duration};
async fn ws_handler(ws: WebSocketUpgrade, tx: broadcast::Sender<String>) -> impl IntoResponse {
info!("New WebSocket connection");
let rx = tx.subscribe();
ws.on_upgrade(|socket| handle_socket(socket, tx, rx))
}
async fn handle_socket(mut socket: WebSocket, tx: broadcast::Sender<String>, mut rx: broadcast::Receiver<String>) {
// 发送欢迎消息
if let Err(e) = socket.send(Message::Text("Welcome to WebSocket server".to_string())).await {
info!("Error sending welcome message: {}", e);
return;
}
// 心跳检测
let mut heartbeat_interval = interval(Duration::from_secs(10));
// 广播接收任务
let broadcast_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if let Err(e) = socket.send(Message::Text(msg)).await {
info!("Error sending broadcast message: {}", e);
break;
}
}
});
// 消息处理任务
let message_task = tokio::spawn(async move {
while let Some(msg) = socket.recv().await {
match msg {
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("Received message: {}", text);
if let Err(e) = tx.send(text) {
info!("Error broadcasting message: {}", e);
break;
}
}
Message::Binary(data) => {
info!("Received binary message ({} bytes)", data.len());
}
Message::Ping(data) => {
info!("Received ping");
if let Err(e) = socket.send(Message::Pong(data)).await {
info!("Error sending pong: {}", e);
break;
}
}
Message::Pong(_) => {
info!("Received pong");
}
Message::Close(_) => {
info!("Connection closed");
break;
}
}
}
Err(e) => {
info!("Error receiving message: {}", e);
break;
}
}
}
});
// 心跳检测任务
let heartbeat_task = tokio::spawn(async move {
loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
if let Err(e) = socket.send(Message::Ping(vec![])).await {
info!("Error sending ping: {}", e);
break;
}
}
_ = message_task => {
break;
}
_ = broadcast_task => {
break;
}
}
}
});
// 等待所有任务完成
let _ = tokio::try_join!(heartbeat_task, message_task, broadcast_task);
info!("WebSocket connection closed");
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
let (tx, _) = broadcast::channel(100);
let app = Router::new().route("/ws", get(ws_handler)).with_state(tx);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
info!("WebSocket server running on http://0.0.0.0:3000");
axum::serve(listener, app).await.unwrap();
}
Tungstenite 是 Rust 社区中常用的 WebSocket 客户端库,它提供了异步和同步两种 API。
在 Cargo.toml 中添加依赖:
[dependencies]
tungstenite = "0.18"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
实现 WebSocket 客户端:
use tungstenite::connect;
use tungstenite::Message;
use url::Url;
use tracing_subscriber::prelude::*;
use tracing::info;
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
info!("Connecting to WebSocket server");
let (mut socket, response) = connect(Url::parse("ws://127.0.0.1:3000/ws").unwrap()).unwrap();
info!("Connected to WebSocket server, response: {:?}", response);
// 发送消息
socket.write_message(Message::Text("Hello, WebSocket!".to_string())).unwrap();
// 接收消息
loop {
match socket.read_message() {
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("Received message: {}", text);
}
Message::Binary(data) => {
info!("Received binary message ({} bytes)", data.len());
}
Message::Ping(data) => {
info!("Received ping");
socket.write_message(Message::Pong(data)).unwrap();
}
Message::Pong(_) => {
info!("Received pong");
}
Message::Close(_) => {
info!("Connection closed");
break;
}
}
}
Err(e) => {
info!("Error receiving message: {}", e);
break;
}
}
}
info!("Client disconnected");
}
Tungstenite 也提供了异步 API,我们可以使用 Tokio 的异步运行时来实现异步 WebSocket 客户端。
use tokio_tungstenite::connect_async;
use tungstenite::protocol::Message;
use url::Url;
use tracing_subscriber::prelude::*;
use tracing::info;
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
info!("Connecting to WebSocket server");
let (ws_stream, response) = connect_async(Url::parse("ws://127.0.0.1:3000/ws").unwrap()).await.unwrap();
info!("Connected to WebSocket server, response: {:?}", response);
let (mut write, mut read) = ws_stream.split();
// 发送消息任务
tokio::spawn(async move {
write.send(Message::Text("Hello, WebSocket!".to_string())).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
write.send(Message::Text("Another message".to_string())).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
write.send(Message::Close(None)).await.unwrap();
});
// 接收消息任务
while let Some(msg) = read.next().await {
match msg {
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("Received message: {}", text);
}
Message::Binary(data) => {
info!("Received binary message ({} bytes)", data.len());
}
Message::Ping(data) => {
info!("Received ping");
write.send(Message::Pong(data)).await.unwrap();
}
Message::Pong(_) => {
info!("Received pong");
}
Message::Close(_) => {
info!("Connection closed");
break;
}
}
}
Err(e) => {
info!("Error receiving message: {}", e);
break;
}
}
}
info!("Client disconnected");
}
在实际应用中,WebSocket 连接可能会由于网络问题而断开,我们需要实现重连机制。
use tokio_tungstenite::connect_async;
use tungstenite::protocol::Message;
use url::Url;
use tracing_subscriber::prelude::*;
use tracing::info;
use tokio::time::{sleep, Duration};
async fn connect_and_handle() -> Result<(), Box<dyn std::error::Error>> {
info!("Connecting to WebSocket server");
let (ws_stream, response) = connect_async(Url::parse("ws://127.0.0.1:3000/ws").unwrap()).await?;
info!("Connected to WebSocket server, response: {:?}", response);
let (mut write, mut read) = ws_stream.split();
// 发送消息任务
tokio::spawn(async move {
write.send(Message::Text("Hello, WebSocket!".to_string())).await.unwrap();
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
write.send(Message::Text("Ping".to_string())).await.unwrap();
}
});
// 接收消息任务
while let Some(msg) = read.next().await {
match msg {
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("Received message: {}", text);
}
Message::Binary(data) => {
info!("Received binary message ({} bytes)", data.len());
}
Message::Ping(data) => {
info!("Received ping");
write.send(Message::Pong(data)).await.unwrap();
}
Message::Pong(_) => {
info!("Received pong");
}
Message::Close(_) => {
info!("Connection closed");
break;
}
}
}
Err(e) => {
info!("Error receiving message: {}", e);
break;
}
}
}
info!("Client disconnected");
Ok(())
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
loop {
if let Err(e) = connect_and_handle().await {
info!("Connection error: {}, retrying in 5 seconds", e);
sleep(Duration::from_secs(5)).await;
}
}
}
我们将构建一个简单的实时聊天应用,支持以下功能:
项目架构设计:
创建 src/main.rs:
use axum::{routing::{get, post}, extract::{ws::{WebSocket, Message, WebSocketUpgrade}, State}, response::{IntoResponse, Html}, Router};
use tracing_subscriber::prelude::*;
use tracing::info;
use tokio::sync::{broadcast, Mutex};
use tokio::time::{interval, Duration};
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
struct User {
id: String,
name: String,
}
type Users = Arc<Mutex<HashMap<String, User>>>;
async fn index() -> Html<&'static str> {
Html(r#"
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Real-Time Chat</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 0;
background-color: #f5f5f5;
}
.container {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #333;
text-align: center;
}
#chat {
height: 400px;
overflow-y: scroll;
border: 1px solid #ddd;
padding: 10px;
margin-bottom: 10px;
background-color: white;
}
.message {
margin-bottom: 10px;
padding: 10px;
background-color: #f0f0f0;
border-radius: 5px;
}
.system {
color: #666;
font-style: italic;
}
.user {
font-weight: bold;
}
#message-input {
width: 80%;
padding: 10px;
font-size: 14px;
}
#send-btn {
padding: 10px 20px;
font-size: 14px;
background-color: #007bff;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
}
#send-btn:hover {
background-color: #0056b3;
}
</style>
</head>
<body>
<div>
<h1>Real-Time Chat</h1>
<div></div>
<input type="text" placeholder="Enter your message">
<button>Send</button>
</div>
<script>
const ws = new WebSocket('ws://' + location.host + '/ws');
const chat = document.getElementById('chat');
const messageInput = document.getElementById('message-input');
const sendBtn = document.getElementById('send-btn');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
const messageDiv = document.createElement('div');
messageDiv.className = 'message';
if (data.type === 'system') {
messageDiv.className += ' system';
messageDiv.textContent = data.content;
} else if (data.type === 'user') {
messageDiv.innerHTML = `<span>${data.user}:</span> ${data.content}`;
}
chat.appendChild(messageDiv);
chat.scrollTop = chat.scrollHeight;
};
sendBtn.addEventListener('click', () => {
const message = messageInput.value.trim();
if (message) {
ws.send(JSON.stringify({ type: 'message', content: message }));
messageInput.value = '';
}
});
messageInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
sendBtn.click();
}
});
</script>
</body>
</html>
"#)
}
async fn ws_handler(
ws: WebSocketUpgrade,
users: State<Users>,
tx: State<broadcast::Sender<String>>,
) -> impl IntoResponse {
info!("New WebSocket connection");
let user_id = uuid::Uuid::new_v4().to_string();
let user_name = format!("User{}", user_id.chars().take(8).collect::<String>());
users.lock().await.insert(user_id.clone(), User{ id: user_id.clone(), name: user_name.clone()});
let rx = tx.get_ref().subscribe();
ws.on_upgrade(|socket| handle_socket(socket, users, tx, rx, user_id, user_name))
}
async fn handle_socket(mut socket: WebSocket, users: State<Users>, tx: State<broadcast::Sender<String>>, mut rx: broadcast::Receiver<String>, user_id: String, user_name: String) {
// 发送系统消息通知用户加入
let join_message = serde_json::json!({"type":"system","content":format!("{} joined the chat", user_name)});
tx.get_ref().send(serde_json::to_string(&join_message).unwrap()).unwrap();
// 发送欢迎消息
let welcome_message = serde_json::json!({"type":"system","content":format!("Welcome, {}!", user_name)});
socket.send(Message::Text(serde_json::to_string(&welcome_message).unwrap())).await.unwrap();
// 心跳检测
let mut heartbeat_interval = interval(Duration::from_secs(10));
// 广播接收任务
let broadcast_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if let Err(e) = socket.send(Message::Text(msg)).await {
info!("Error sending broadcast message: {}", e);
break;
}
});
// 消息处理任务
let message_task = tokio::spawn(async move {
while let Some(msg) = socket.recv().await {
match msg {
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("Received message: {}", text);
let data: serde_json::Value = serde_json::from_str(&text).unwrap();
if data["type"] == "message" {
let user_message = serde_json::json!({"type":"user","user": user_name,"content": data["content"]});
tx.get_ref().send(serde_json::to_string(&user_message).unwrap()).unwrap();
}
}
Message::Binary(data) => {
info!("Received binary message ({} bytes)", data.len());
}
Message::Ping(data) => {
info!("Received ping");
socket.send(Message::Pong(data)).await.unwrap();
}
Message::Pong(_) => {
info!("Received pong");
}
Message::Close(_) => {
info!("Connection closed");
break;
}
}
}
Err(e) => {
info!("Error receiving message: {}", e);
break;
}
}
});
// 心跳检测任务
let heartbeat_task = tokio::spawn(async move {
loop {
tokio::select!{
_ = heartbeat_interval.tick() => {
if let Err(e) = socket.send(Message::Ping(vec![])).await {
info!("Error sending ping: {}", e);
break;
}
}
_ = message_task => {
break;
}
_ = broadcast_task => {
break;
}
}
});
// 等待所有任务完成
let _ = tokio::try_join!(heartbeat_task, message_task, broadcast_task);
// 发送系统消息通知用户离开
let leave_message = serde_json::json!({"type":"system","content":format!("{} left the chat", user_name)});
tx.get_ref().send(serde_json::to_string(&leave_message).unwrap()).unwrap();
// 从用户列表中删除
users.lock().await.remove(&user_id);
info!("WebSocket connection closed");
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
let users = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(100);
let app = Router::new()
.route("/", get(index))
.route("/ws", get(ws_handler))
.with_state(users)
.with_state(tx);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
info!("WebSocket server running on http://0.0.0.0:3000");
axum::serve(listener, app).await.unwrap();
}
在 Cargo.toml 中添加依赖:
[dependencies]
axum = { version = "0.5", features = ["ws"] }
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.1", features = ["v4"] }
客户端界面已经包含在服务端的 index 函数中,使用 HTML 和 JavaScript 实现。当用户访问 http://localhost:3000 时,会显示聊天界面。
我们可以使用 ab(Apache Bench)工具测试服务端的 HTTP 性能:
ab -n 1000 -c 100 http://localhost:3000/
或者使用 wrk 工具测试 WebSocket 连接的性能:
wrk -t 12 -c 400 -d 30s http://localhost:3000/ws
性能优化方法:
问题:客户端无法与服务器建立 WebSocket 连接。
解决方案:
问题:客户端发送消息失败。
解决方案:
问题:WebSocket 连接在一段时间后自动断开。
解决方案:
问题:客户端接收到的消息顺序不正确。
解决方案:
问题:WebSocket 服务的性能不佳。
解决方案:
WebSocket 协议为实时应用提供了高效的通信方式,Rust 的异步编程能力使得构建高性能 WebSocket 服务变得简单。在本章中,我们介绍了 WebSocket 协议的基本概念、异步服务端和客户端的开发,以及实战项目的实现。
我们使用 Axum 框架实现了 WebSocket 服务端,支持消息广播、心跳检测和连接超时机制。我们还使用 Tungstenite 库实现了异步 WebSocket 客户端,支持重连机制。最后,我们构建了一个实时聊天应用,展示了 WebSocket 在实际项目中的应用。
通过学习本章内容,读者可以掌握 Rust 异步编程中 WebSocket 服务的开发方法,并了解常见问题和最佳实践。希望读者能够将这些知识应用到实际项目中,构建高性能的实时应用。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online