Rust 异步代码的测试与调试艺术
Rust 异步代码的测试与调试方法。内容涵盖异步测试与同步测试的区别、时序性与状态管理等核心挑战。讲解了基于 Tokio 的基础测试宏、运行时配置及超时管理。深入探讨了数据库、HTTP 接口及 Redis 消息的集成测试方案,包括边界条件与异常场景测试。此外,还介绍了 tracing 日志库、tokio-console 调试工具及堆栈跟踪技术。最后结合微服务项目实战,展示了模块化测试结构设计与最佳实践,旨在帮助开发者构建高效可靠的异步系统。

Rust 异步代码的测试与调试方法。内容涵盖异步测试与同步测试的区别、时序性与状态管理等核心挑战。讲解了基于 Tokio 的基础测试宏、运行时配置及超时管理。深入探讨了数据库、HTTP 接口及 Redis 消息的集成测试方案,包括边界条件与异常场景测试。此外,还介绍了 tracing 日志库、tokio-console 调试工具及堆栈跟踪技术。最后结合微服务项目实战,展示了模块化测试结构设计与最佳实践,旨在帮助开发者构建高效可靠的异步系统。

在 Rust 同步编程中,测试通常是顺序执行的,每个测试函数会阻塞线程直到完成,结果是确定的。而异步测试的结果可能受到任务调度、网络延迟、数据库连接等因素的影响,时序性和状态管理更加复杂。
同步测试示例:
#[cfg(test)]
mod tests {
#[test]
fn test_add() {
assert_eq!(1 + 1, 2);
}
}
异步测试示例(使用 Tokio 测试宏):
#[cfg(test)]
mod tests {
use tokio::time::sleep;
use std::time::Duration;
#[tokio::test]
async fn test_async_add() {
sleep(Duration::from_millis(100)).await;
assert_eq!(1 + 1, 2);
}
}
异步任务的执行顺序是不确定的,可能导致测试结果在不同的运行中有所不同。例如:
#[tokio::test]
async fn test_task_order() {
let mut vec = Vec::new();
tokio::spawn(async {
vec.push(1);
});
tokio::spawn(async {
vec.push(2);
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(vec, vec![1, 2]); // 可能失败,因为任务执行顺序不确定
}
异步任务可能会修改共享状态,需要使用同步原语(如互斥锁、原子变量)来保证测试的正确性。例如:
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::test]
async fn test_shared_state() {
let shared_vec = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::new();
for i in 1..=3 {
let shared_vec_clone = shared_vec.clone();
handles.push(tokio::spawn(async move {
let mut vec = shared_vec_clone.lock().await;
vec.push(i);
}));
}
for handle in handles {
handle.await.unwrap();
}
let vec = shared_vec.lock().await;
assert_eq!(vec.len(), 3);
assert!(vec.contains(&1));
assert!(vec.contains(&2));
assert!(vec.contains(&3));
}
异步测试可能会创建外部资源(如数据库连接、网络连接),需要确保这些资源在测试后被正确清理。例如:
use sqlx::PgPool;
#[tokio::test]
async fn test_database_connection() {
let pool = PgPool::connect("postgresql://user:password@localhost:5432/test_db")
.await
.unwrap();
// 执行测试操作
let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 0);
// 资源会在测试结束后自动清理
}
Tokio 提供了 #[tokio::test] 宏,用于简化异步测试的编写。该宏会自动创建一个异步运行时,并在测试结束后清理资源。
use tokio::time::sleep;
use std::time::Duration;
#[tokio::test]
async fn test_basic_async() {
println!("Test starting");
sleep(Duration::from_millis(100)).await;
println!("Test finished");
assert!(true);
}
我们可以通过属性参数配置 Tokio 测试运行时:
use tokio::time::sleep;
use std::time::Duration;
// 使用单线程运行时
#[tokio::test(flavor = "current_thread")]
async fn test_single_thread() {
sleep(Duration::from_millis(100)).await;
assert!(true);
}
// 忽略测试
#[tokio::test(ignore)]
async fn test_ignored() {
sleep(Duration::from_millis(100)).await;
assert!(true);
}
// 配置超时时间
#[tokio::test(timeout = 5000)] // 5 秒超时
async fn test_timeout() {
sleep(Duration::from_secs(10)).await; // 超过超时时间
assert!(true);
}
我们可以直接测试异步函数的功能:
use tokio::time::sleep;
use std::time::Duration;
async fn async_add(a: i32, b: i32) -> i32 {
sleep(Duration::from_millis(100)).await;
a + b
}
async fn async_multiply(a: i32, b: i32) -> i32 {
sleep(Duration::from_millis(50)).await;
a * b
}
#[tokio::test]
async fn test_async_add() {
assert_eq!(async_add(2, 3).await, 5);
}
#[tokio::test]
async fn test_async_multiply() {
assert_eq!(async_multiply(2, 3).await, 6);
}
异步任务可能会因为网络延迟、死锁等原因导致测试超时。我们可以使用 tokio::time::timeout 函数来管理超时:
use tokio::time::{timeout, Duration};
async fn long_running_task() -> i32 {
tokio::time::sleep(Duration::from_secs(5)).await;
42
}
#[tokio::test]
async fn test_timeout_task() {
let result = timeout(Duration::from_secs(3), long_running_task()).await;
assert!(result.is_err()); // 任务超时,结果为 Err(Elapsed)
}
#[tokio::test]
async fn test_timeout_success() {
let result = timeout(Duration::from_secs(6), long_running_task()).await;
assert_eq!(result.unwrap(), 42);
}
数据库操作的集成测试需要连接到实际的数据库,并在测试后清理数据。我们可以使用 SQLx 的测试宏和数据库迁移功能。
在 Cargo.toml 中添加依赖:
[dependencies]
sqlx = { version = "0.6", features = ["postgres", "runtime-tokio-rustls", "migrate", "chrono"] }
创建测试文件(tests/database.rs):
use sqlx::PgPool;
use tokio::time::sleep;
use std::time::Duration;
use common::models::User;
use common::db::create_pool;
use crate::common::errors::AppError;
#[sqlx::test]
async fn test_create_user(pool: PgPool) {
let user = User {
id: uuid::Uuid::new_v4(),
third_party_id: "test_user_1".to_string(),
name: "Test User 1".to_string(),
email: "[email protected]".to_string(),
phone: Some("1234567890".to_string()),
status: "active".to_string(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
last_synced_at: chrono::Utc::now(),
};
sqlx::query!(r#"
INSERT INTO users (
id, third_party_id, name, email, phone, status,
created_at, updated_at, last_synced_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#, user.id, user.third_party_id, user.name, user.email, user.phone,
user.status, user.created_at, user.updated_at, user.last_synced_at)
.execute(&pool)
.await
.unwrap();
let result = sqlx::query_as!(User, "SELECT * FROM users WHERE third_party_id = $1", user.third_party_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(result.id, user.id);
assert_eq!(result.third_party_id, user.third_party_id);
assert_eq!(result.name, user.name);
assert_eq!(result.email, user.email);
assert_eq!(result.phone, user.phone);
assert_eq!(result.status, user.status);
}
#[sqlx::test]
async fn test_delete_user(pool: PgPool) {
let user = User {
id: uuid::Uuid::new_v4(),
third_party_id: "test_user_2".to_string(),
name: "Test User 2".to_string(),
email: "[email protected]".to_string(),
phone: Some("1234567891".to_string()),
status: "active".to_string(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
last_synced_at: chrono::Utc::now(),
};
sqlx::query!(r#"
INSERT INTO users (
id, third_party_id, name, email, phone, status,
created_at, updated_at, last_synced_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#, user.id, user.third_party_id, user.name, user.email, user.phone,
user.status, user.created_at, user.updated_at, user.last_synced_at)
.execute(&pool)
.await
.unwrap();
sqlx::query!("DELETE FROM users WHERE third_party_id = $1", user.third_party_id)
.execute(&pool)
.await
.unwrap();
let result = sqlx::query_as!(User, "SELECT * FROM users WHERE third_party_id = $1", user.third_party_id)
.fetch_optional(&pool)
.await
.unwrap();
assert!(result.is_none());
}
我们可以使用 Reqwest 库测试 HTTP API 的功能:
use reqwest::Client;
use tokio::time::sleep;
use std::time::Duration;
async fn test_create_user_api(client: &Client) {
let request_body = serde_json::json!({
"third_party_id": "api_test_user_1",
"name": "API Test User 1",
"email": "[email protected]",
"phone": "1234567892",
"status": "active"
});
let response = client
.post("http://localhost:3000/users")
.json(&request_body)
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 201); // Created
let response_body = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(response_body["third_party_id"], "api_test_user_1");
assert_eq!(response_body["name"], "API Test User 1");
assert_eq!(response_body["email"], "[email protected]");
assert_eq!(response_body["phone"], "1234567892");
assert_eq!(response_body["status"], "active");
}
async fn test_get_user_api(client: &Client) {
let user_id = "api_test_user_1";
let response = client
.get(&format!("http://localhost:3000/users/{}", user_id))
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 200); // OK
let response_body = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(response_body["third_party_id"], user_id);
}
async fn test_update_user_api(client: &Client) {
let user_id = "api_test_user_1";
let request_body = serde_json::json!({
"name": "Updated API Test User 1",
"phone": "9876543210"
});
let response = client
.put(&format!("http://localhost:3000/users/{}", user_id))
.json(&request_body)
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 200); // OK
let response_body = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(response_body["third_party_id"], user_id);
assert_eq!(response_body["name"], "Updated API Test User 1");
assert_eq!(response_body["phone"], "9876543210");
}
async fn test_delete_user_api(client: &Client) {
let user_id = "api_test_user_1";
let response = client
.delete(&format!("http://localhost:3000/users/{}", user_id))
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 204); // No Content
let response = client
.get(&format!("http://localhost:3000/users/{}", user_id))
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 404); // Not Found
}
#[tokio::test]
async fn test_user_api() {
let client = Client::new();
sleep(Duration::from_secs(1)).await; // 等待服务器启动
test_create_user_api(&client).await;
test_get_user_api(&client).await;
test_update_user_api(&client).await;
test_delete_user_api(&client).await;
}
我们可以使用 Redis 的客户端测试消息的发布和订阅功能:
use redis::Client;
use tokio::time::timeout;
use std::time::Duration;
async fn test_publish_and_subscribe(client: &Client) {
let mut subscriber = client.get_tokio_connection().await.unwrap().into_pubsub();
subscriber.subscribe("test_channel").await.unwrap();
let publisher = client.get_tokio_connection().await.unwrap();
redis::cmd("PUBLISH")
.arg("test_channel")
.arg("test_message")
.query_async::<_, i64>(&mut publisher)
.await
.unwrap();
let msg = timeout(Duration::from_secs(1), subscriber.get_message())
.await
.unwrap()
.unwrap();
let payload: String = msg.get_payload().await.unwrap();
assert_eq!(payload, "test_message");
}
async fn test_channel_exists(client: &Client) {
let mut subscriber = client.get_tokio_connection().await.unwrap().into_pubsub();
let result = subscriber.subscribe("non_existent_channel").await;
assert!(result.is_ok());
let result = subscriber.unsubscribe("non_existent_channel").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_redis_pubsub() {
let client = Client::open("redis://localhost:6379/0").unwrap();
test_publish_and_subscribe(&client).await;
test_channel_exists(&client).await;
}
边界条件测试是测试参数的最大值、最小值、空值等情况:
use reqwest::Client;
use tokio::time::sleep;
use std::time::Duration;
async fn test_empty_name(client: &Client) {
let request_body = serde_json::json!({
"third_party_id": "test_user_empty_name",
"name": "",
"email": "[email protected]",
"phone": "1234567893",
"status": "active"
});
let response = client
.post("http://localhost:3000/users")
.json(&request_body)
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 400); // Bad Request
}
async fn test_invalid_email(client: &Client) {
let request_body = serde_json::json!({
"third_party_id": "test_user_invalid_email",
"name": "Test User Invalid Email",
"email": "invalid_email",
"phone": "1234567894",
"status": "active"
});
let response = client
.post("http://localhost:3000/users")
.json(&request_body)
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 400); // Bad Request
}
async fn test_negative_amount(client: &Client) {
let request_body = serde_json::json!({
"user_id": "a8f7d9e0-1234-5678-90ab-cdef12345678",
"order_number": "ORD-002",
"amount": -100.0,
"currency": "USD",
"status": "pending"
});
let response = client
.post("http://localhost:3000/orders")
.json(&request_body)
.send()
.await
.unwrap();
assert_eq!(response.status().as_u16(), 400); // Bad Request
}
#[tokio::test]
async fn test_boundary_conditions() {
let client = Client::new();
sleep(Duration::from_secs(1)).await; // 等待服务器启动
test_empty_name(&client).await;
test_invalid_email(&client).await;
test_negative_amount(&client).await;
}
异常场景测试是测试服务不可用、超时、权限不足等情况:
use reqwest::Client;
use reqwest::Error;
async fn test_server_unavailable(client: &Client) {
let response = client.get("http://localhost:9999/health").send().await;
assert!(response.is_err());
let error = response.unwrap_err();
assert!(error.is_connect()); // 连接错误
}
async fn test_api_timeout(client: &Client) {
let response = tokio::time::timeout(
std::time::Duration::from_millis(500),
client.get("http://localhost:3000/long_running").send()
).await;
assert!(response.is_err());
let error = response.unwrap_err();
assert!(error.is_timeout());
}
async fn test_unauthorized_access(client: &Client) {
let response = client.get("http://localhost:3000/protected").send().await.unwrap();
assert_eq!(response.status().as_u16(), 401); // Unauthorized
}
#[tokio::test]
async fn test_exception_scenarios() {
let client = Client::new();
test_server_unavailable(&client).await;
// test_api_timeout(&client).await; // 需要服务器提供长时间运行的接口
// test_unauthorized_access(&client).await; // 需要服务器提供受保护的接口
}
tracing 是 Rust 的日志库,可以用于记录异步任务的执行信息,包括任务的创建、完成、错误等。我们可以使用 tokio-tracing 库来记录 Tokio 的任务执行信息。
在 Cargo.toml 中添加依赖:
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tokio-tracing = "0.1"
在代码中使用 tracing:
use tracing::info;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
info!("Application started");
let mut handles = Vec::new();
for i in 1..=3 {
let handle = tokio::spawn(async move {
info!("Task {} started", i);
sleep(Duration::from_millis(100 * i)).await;
info!("Task {} finished", i);
i
});
handles.push(handle);
}
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
info!("All tasks finished. Results: {:?}", results);
}
运行程序并查看日志:
RUST_LOG=info cargo run
tokio-console 是 Tokio 提供的调试工具,可以用于定位异步任务的性能问题,包括任务的执行时间、等待时间、内存使用等。
安装 tokio-console:
cargo install tokio-console
在 Cargo.toml 中添加依赖:
[dependencies]
tokio = { version = "1.0", features = ["full", "trace"] }
运行程序并使用 tokio-console:
RUSTFLAGS="--cfg tokio_unstable" RUST_LOG=info cargo run -- tokio-console
异步代码的堆栈跟踪与同步代码不同,需要使用 tokio-tracing 或 backtrace 库来获取完整的堆栈信息。
在 Cargo.toml 中添加依赖:
[dependencies]
backtrace = "0.3"
使用 backtrace 获取堆栈信息:
use backtrace::Backtrace;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Custom error: {0}")]
Custom(String),
}
impl AppError {
pub fn with_backtrace(self) -> (Self, Backtrace) {
(self, Backtrace::new())
}
}
async fn foo() -> Result<(), AppError> {
bar().await
}
async fn bar() -> Result<(), AppError> {
baz().await
}
async fn baz() -> Result<(), AppError> {
Err(AppError::Custom("Test error".to_string()))
}
#[tokio::main]
async fn main() {
let (error, backtrace) = foo().await.unwrap_err().with_backtrace();
println!("Error: {:?}", error);
println!("Backtrace: {:?}", backtrace);
}
我们可以按照模块组织测试,每个模块包含单元测试和集成测试:
rust-async-microservices/
├── common/
│ ├── src/
│ │ ├── errors.rs
│ │ ├── models.rs
│ │ ├── db.rs
│ │ ├── redis.rs
│ │ └── http.rs
│ └── tests/
│ ├── errors_test.rs
│ ├── models_test.rs
│ ├── db_test.rs
│ ├── redis_test.rs
│ └── http_test.rs
├── user-sync-service/
│ ├── src/
│ │ ├── config.rs
│ │ ├── sync.rs
│ │ ├── scheduler.rs
│ │ └── metrics.rs
│ └── tests/
│ ├── config_test.rs
│ ├── sync_test.rs
│ ├── scheduler_test.rs
│ └── metrics_test.rs
├── order-processing-service/
│ ├── src/
│ │ ├── config.rs
│ │ ├── processing.rs
│ │ └── metrics.rs
│ └── tests/
│ ├── config_test.rs
│ ├── processing_test.rs
│ └── metrics_test.rs
└── monitoring-service/
├── src/
│ ├── config.rs
│ ├── status.rs
│ ├── websocket.rs
│ └── routes.rs
└── tests/
├── config_test.rs
├── status_test.rs
├── websocket_test.rs
└── routes_test.rs
common/tests/models_test.rs:
use common::models::{User, ThirdPartyUser, Order, OrderMessage};
use chrono::Utc;
#[test]
fn test_third_party_user_to_user() {
let third_party_user = ThirdPartyUser {
id: "test_user_1".to_string(),
name: "Test User 1".to_string(),
email: "[email protected]".to_string(),
phone: Some("1234567890".to_string()),
status: "active".to_string(),
created_at: Utc::now().to_rfc3339(),
updated_at: Utc::now().to_rfc3339(),
};
let user = User::try_from(third_party_user).unwrap();
assert_eq!(user.third_party_id, "test_user_1");
assert_eq!(user.name, "Test User 1");
assert_eq!(user.email, "[email protected]");
assert_eq!(user.phone, Some("1234567890".to_string()));
assert_eq!(user.status, "active");
}
#[test]
fn test_order_message_to_order() {
let order_message = OrderMessage {
user_id: "a8f7d9e0-1234-5678-90ab-cdef12345678".to_string(),
order_number: "ORD-001".to_string(),
amount: 100.0,
currency: "USD".to_string(),
status: "pending".to_string(),
};
let order = Order::try_from(order_message).unwrap();
assert_eq!(order.user_id.to_string(), "a8f7d9e0-1234-5678-90ab-cdef12345678");
assert_eq!(order.order_number, "ORD-001");
assert_eq!(order.amount, 100.0);
assert_eq!(order.currency, "USD");
assert_eq!(order.status, "pending");
}
common/tests/errors_test.rs:
use common::errors::AppError;
#[test]
fn test_error_conversion() {
let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found");
let app_error = AppError::from(io_error);
assert!(matches!(app_error, AppError::Io(_)));
let sqlx_error = sqlx::Error::RowNotFound;
let app_error = AppError::from(sqlx_error);
assert!(matches!(app_error, AppError::Database(_)));
let custom_error = AppError::Custom("Test error".to_string());
assert!(matches!(custom_error, AppError::Custom(_)));
}
#[test]
fn test_error_display() {
let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found");
let app_error = AppError::from(io_error);
assert!(app_error.to_string().contains("File not found"));
let custom_error = AppError::Custom("Test error".to_string());
assert_eq!(custom_error.to_string(), "Custom error: Test error");
}
user-sync-service/tests/sync_test.rs:
use user_sync_service::sync::sync_users;
use user_sync_service::config::AppConfig;
#[tokio::test]
async fn test_user_sync() {
let config = AppConfig::from_env().unwrap();
let result = sync_users(&config).await;
assert!(result.is_ok());
}
order-processing-service/tests/processing_test.rs:
use order_processing_service::processing::process_order_message;
use order_processing_service::config::AppConfig;
use common::redis::publish_message;
use common::models::OrderMessage;
#[tokio::test]
async fn test_order_processing() {
let config = AppConfig::from_env().unwrap();
let order_message = OrderMessage {
user_id: "a8f7d9e0-1234-5678-90ab-cdef12345678".to_string(),
order_number: "ORD-001".to_string(),
amount: 100.0,
currency: "USD".to_string(),
status: "pending".to_string(),
};
let redis_client = common::redis::create_client(config.redis.clone()).await.unwrap();
publish_message(&redis_client, "orders", &serde_json::to_string(&order_message).unwrap())
.await
.unwrap();
// 等待订单处理完成
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let pool = common::db::create_pool(config.db.clone()).await.unwrap();
let order = sqlx::query_as!(common::models::Order, "SELECT * FROM orders WHERE order_number = $1", order_message.order_number)
.fetch_optional(&pool)
.await
.unwrap();
assert!(order.is_some());
}
monitoring-service/tests/status_test.rs:
use monitoring_service::status::get_system_status;
use monitoring_service::config::AppConfig;
#[tokio::test]
async fn test_system_status() {
let config = AppConfig::from_env().unwrap();
let status = get_system_status(&config).await.unwrap();
assert!(status.user_sync_service.is_running);
assert!(status.order_processing_service.is_running);
assert!(status.monitoring_service.is_running);
assert!(status.total_users >= 0);
assert!(status.total_orders >= 0);
assert!(status.failed_tasks >= 0);
}
每个测试应该是独立的,不应该依赖其他测试的结果。我们可以使用测试数据库和测试 Redis 实例来隔离测试:
use sqlx::PgPool;
#[sqlx::test]
async fn test_create_user(pool: PgPool) {
// 测试操作
let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 0);
// 插入用户
sqlx::query!(r#"
INSERT INTO users (
id, third_party_id, name, email, phone, status,
created_at, updated_at, last_synced_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#, uuid::Uuid::new_v4(), "test_user_1", "Test User 1", "[email protected]",
Some("1234567890"), "active", chrono::Utc::now(), chrono::Utc::now(), chrono::Utc::now())
.execute(&pool)
.await
.unwrap();
// 查询用户数量
let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 1);
}
测试结束后,我们需要清理资源,如数据库连接、网络连接、文件句柄等。SQLx 的测试宏会自动清理测试数据库:
use sqlx::PgPool;
#[sqlx::test]
async fn test_resource_cleanup(pool: PgPool) {
// 测试操作
let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 0);
sqlx::query!(r#"
INSERT INTO users (
id, third_party_id, name, email, phone, status,
created_at, updated_at, last_synced_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#, uuid::Uuid::new_v4(), "test_user_1", "Test User 1", "[email protected]",
Some("1234567890"), "active", chrono::Utc::now(), chrono::Utc::now(), chrono::Utc::now())
.execute(&pool)
.await
.unwrap();
// 测试结束后,SQLx 会自动清理数据库
}
我们可以使用 cargo test 的–test-threads 参数来启用并行测试:
cargo test --test-threads=4
我们可以使用测试数据复用的方法,避免每次测试都创建相同的数据:
use sqlx::PgPool;
use once_cell::sync::Lazy;
use common::models::User;
use chrono::Utc;
static TEST_USER: Lazy<User> = Lazy::new(|| User {
id: uuid::Uuid::new_v4(),
third_party_id: "test_user_reuse".to_string(),
name: "Test User Reuse".to_string(),
email: "[email protected]".to_string(),
phone: Some("1234567890".to_string()),
status: "active".to_string(),
created_at: Utc::now(),
updated_at: Utc::now(),
last_synced_at: Utc::now(),
});
#[sqlx::test]
async fn test_user_reuse(pool: PgPool) {
sqlx::query!(r#"
INSERT INTO users (
id, third_party_id, name, email, phone, status,
created_at, updated_at, last_synced_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#, TEST_USER.id, TEST_USER.third_party_id, TEST_USER.name, TEST_USER.email, TEST_USER.phone,
TEST_USER.status, TEST_USER.created_at, TEST_USER.updated_at, TEST_USER.last_synced_at)
.execute(&pool)
.await
.unwrap();
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE third_party_id = $1", TEST_USER.third_party_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(user.id, TEST_USER.id);
}
我们可以使用 cargo-tarpaulin 工具来分析测试覆盖率:
cargo install cargo-tarpaulin
cargo tarpaulin --ignore-tests
异步代码的测试与调试是 Rust 异步编程的重要环节。通过深入理解异步测试的本质与难点、基础异步测试框架、集成测试与边界条件测试、异步调试的核心工具、实战项目优化以及异步测试的最佳实践,我们可以编写出更高效、更安全的异步代码。
在实际项目中,我们应该根据项目的需求选择合适的测试方法,并注意测试隔离与资源清理、测试性能优化、测试覆盖率分析等方面的问题。同时,我们可以使用 Tokio 的调试工具和日志库来定位异步任务的性能问题。
希望本章的内容能够帮助您深入掌握 Rust 异步代码的测试与调试艺术,并在实际项目中应用。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online