跳到主要内容
极客日志极客日志
首页博客AI提示词GitHub精选代理工具
搜索
|注册
博客列表
Rust

Rust 异步代码的测试与调试实践

Rust 异步编程面临时序不确定、状态共享及资源清理等挑战。探讨基于 Tokio 的异步测试框架,涵盖单元测试、集成测试(数据库、HTTP、Redis)及边界异常场景。介绍 tracing 日志与 tokio-console 调试工具,结合实战项目展示模块化测试结构设计。强调测试隔离、资源自动清理及覆盖率分析,提供编写高效安全异步代码的最佳实践方案。

ArchDesign发布于 2026/3/24更新于 2026/5/35 浏览
Rust 异步代码的测试与调试实践

Rust 异步代码的测试与调试实践

一、异步测试的本质与难点

1.1 异步测试与同步测试的区别

在 Rust 同步编程中,测试通常是顺序执行的,每个测试函数会阻塞线程直到完成,结果是确定的。而异步测试的结果可能受到任务调度、网络延迟、数据库连接等因素的影响,时序性和状态管理更加复杂。

同步测试示例:

#[cfg(test)]
mod tests {
    #[test]
    fn test_add() {
        assert_eq!(1 + 1, 2);
    }
}

异步测试示例(使用 Tokio 测试宏):

#[cfg(test)]
mod tests {
    use tokio::time::sleep;
    use std::time::Duration;

    #[tokio::test]
    async fn test_async_add() {
        sleep(Duration::from_millis(100)).await;
        assert_eq!(1 + 1, 2);
    }
}
1.2 异步测试的核心挑战
1.2.1 时序性问题

异步任务的执行顺序是不确定的,可能导致测试结果在不同的运行中有所不同。例如:

#[tokio::test]
async fn test_task_order() {
    let mut vec = Vec::new();
    tokio::spawn(async {
        vec.push(1);
    });
    tokio::spawn(async {
        vec.();
    });
    tokio::time::(std::time::Duration::()).;
    
    (vec, [, ]);
}
push
2
sleep
from_millis
100
await
// 注意:这里可能失败,因为任务执行顺序不确定
assert_eq!
vec!
1
2
1.2.2 状态管理问题

异步任务可能会修改共享状态,需要使用同步原语(如互斥锁、原子变量)来保证测试的正确性。例如:

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::test]
async fn test_shared_state() {
    let shared_vec = Arc::new(Mutex::new(Vec::new()));
    let mut handles = Vec::new();
    for i in 1..=3 {
        let shared_vec_clone = shared_vec.clone();
        handles.push(tokio::spawn(async move {
            let mut vec = shared_vec_clone.lock().await;
            vec.push(i);
        }));
    }
    for handle in handles {
        handle.await.unwrap();
    }
    let vec = shared_vec.lock().await;
    assert_eq!(vec.len(), 3);
    assert!(vec.contains(&1));
    assert!(vec.contains(&2));
    assert!(vec.contains(&3));
}
1.2.3 资源清理问题

异步测试可能会创建外部资源(如数据库连接、网络连接),需要确保这些资源在测试后被正确清理。例如:

use sqlx::PgPool;

#[tokio::test]
async fn test_database_connection() {
    let pool = PgPool::connect("postgresql://user:password@localhost:5432/test_db")
        .await
        .unwrap();
    // 执行测试操作
    let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(count, 0);
    // 资源会在测试结束后自动清理
}

二、基础异步测试框架

2.1 Tokio 测试宏的使用

Tokio 提供了 #[tokio::test] 宏,用于简化异步测试的编写。该宏会自动创建一个异步运行时,并在测试结束后清理资源。

use tokio::time::sleep;
use std::time::Duration;

#[tokio::test]
async fn test_basic_async() {
    println!("Test starting");
    sleep(Duration::from_millis(100)).await;
    println!("Test finished");
    assert!(true);
}
2.1.1 配置 Tokio 测试运行时

我们可以通过属性参数配置 Tokio 测试运行时:

use tokio::time::sleep;
use std::time::Duration;

// 使用单线程运行时
#[tokio::test(flavor = "current_thread")]
async fn test_single_thread() {
    sleep(Duration::from_millis(100)).await;
    assert!(true);
}

// 忽略测试
#[tokio::test(ignore)]
async fn test_ignored() {
    sleep(Duration::from_millis(100)).await;
    assert!(true);
}

// 配置超时时间
#[tokio::test(timeout = 5000)] // 5 秒超时
async fn test_timeout() {
    sleep(Duration::from_secs(10)).await; // 超过超时时间
    assert!(true);
}
2.2 基础异步函数测试

我们可以直接测试异步函数的功能:

use tokio::time::sleep;
use std::time::Duration;

async fn async_add(a: i32, b: i32) -> i32 {
    sleep(Duration::from_millis(100)).await;
    a + b
}

async fn async_multiply(a: i32, b: i32) -> i32 {
    sleep(Duration::from_millis(50)).await;
    a * b
}

#[tokio::test]
async fn test_async_add() {
    assert_eq!(async_add(2, 3).await, 5);
}

#[tokio::test]
async fn test_async_multiply() {
    assert_eq!(async_multiply(2, 3).await, 6);
}
2.3 异步任务的超时管理

异步任务可能会因为网络延迟、死锁等原因导致测试超时。我们可以使用 tokio::time::timeout 函数来管理超时:

use tokio::time::{timeout, Duration};

async fn long_running_task() -> i32 {
    tokio::time::sleep(Duration::from_secs(5)).await;
    42
}

#[tokio::test]
async fn test_timeout_task() {
    let result = timeout(Duration::from_secs(3), long_running_task()).await;
    assert!(result.is_err()); // 任务超时,结果为 Err(Elapsed)
}

#[tokio::test]
async fn test_timeout_success() {
    let result = timeout(Duration::from_secs(6), long_running_task()).await;
    assert_eq!(result.unwrap(), 42);
}

三、集成测试与边界条件测试

3.1 数据库操作的集成测试

数据库操作的集成测试需要连接到实际的数据库,并在测试后清理数据。我们可以使用 SQLx 的测试宏和数据库迁移功能。

在 Cargo.toml 中添加依赖:

[dependencies]
sqlx = { version = "0.6", features = ["postgres", "runtime-tokio-rustls", "migrate", "chrono"] }

创建测试文件(tests/database.rs):

use sqlx::PgPool;
use tokio::time::sleep;
use std::time::Duration;
use common::models::User;
use common::db::create_pool;
use crate::common::errors::AppError;

#[sqlx::test]
async fn test_create_user(pool: PgPool) {
    let user = User {
        id: uuid::Uuid::new_v4(),
        third_party_id: "test_user_1".to_string(),
        name: "Test User 1".to_string(),
        email: "[email protected]".to_string(),
        phone: Some("1234567890".to_string()),
        status: "active".to_string(),
        created_at: chrono::Utc::now(),
        updated_at: chrono::Utc::now(),
        last_synced_at: chrono::Utc::now(),
    };
    sqlx::query!(r#"
        INSERT INTO users (
            id, third_party_id, name, email, phone, status,
            created_at, updated_at, last_synced_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    "#, user.id, user.third_party_id, user.name, user.email, user.phone,
       user.status, user.created_at, user.updated_at, user.last_synced_at)
    .execute(&pool)
    .await
    .unwrap();

    let result = sqlx::query_as!(User, "SELECT * FROM users WHERE third_party_id = $1", user.third_party_id)
        .fetch_one(&pool)
        .await
        .unwrap();

    assert_eq!(result.id, user.id);
    assert_eq!(result.third_party_id, user.third_party_id);
    assert_eq!(result.name, user.name);
    assert_eq!(result.email, user.email);
    assert_eq!(result.phone, user.phone);
    assert_eq!(result.status, user.status);
}

#[sqlx::test]
async fn test_delete_user(pool: PgPool) {
    let user = User {
        id: uuid::Uuid::new_v4(),
        third_party_id: "test_user_2".to_string(),
        name: "Test User 2".to_string(),
        email: "[email protected]".to_string(),
        phone: Some("1234567891".to_string()),
        status: "active".to_string(),
        created_at: chrono::Utc::now(),
        updated_at: chrono::Utc::now(),
        last_synced_at: chrono::Utc::now(),
    };
    sqlx::query!(r#"
        INSERT INTO users (
            id, third_party_id, name, email, phone, status,
            created_at, updated_at, last_synced_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    "#, user.id, user.third_party_id, user.name, user.email, user.phone,
       user.status, user.created_at, user.updated_at, user.last_synced_at)
    .execute(&pool)
    .await
    .unwrap();

    sqlx::query!("DELETE FROM users WHERE third_party_id = $1", user.third_party_id)
        .execute(&pool)
        .await
        .unwrap();

    let result = sqlx::query_as!(User, "SELECT * FROM users WHERE third_party_id = $1", user.third_party_id)
        .fetch_optional(&pool)
        .await
        .unwrap();

    assert!(result.is_none());
}
3.2 HTTP 请求的集成测试

我们可以使用 Reqwest 库测试 HTTP API 的功能:

use reqwest::Client;
use tokio::time::sleep;
use std::time::Duration;

async fn test_create_user_api(client: &Client) {
    let request_body = serde_json::json!({
        "third_party_id": "api_test_user_1",
        "name": "API Test User 1",
        "email": "[email protected]",
        "phone": "1234567892",
        "status": "active"
    });
    let response = client
        .post("http://localhost:3000/users")
        .json(&request_body)
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 201); // Created

    let response_body = response.json::<serde_json::Value>().await.unwrap();
    assert_eq!(response_body["third_party_id"], "api_test_user_1");
    assert_eq!(response_body["name"], "API Test User 1");
    assert_eq!(response_body["email"], "[email protected]");
    assert_eq!(response_body["phone"], "1234567892");
    assert_eq!(response_body["status"], "active");
}

async fn test_get_user_api(client: &Client) {
    let user_id = "api_test_user_1";
    let response = client
        .get(&format!("http://localhost:3000/users/{}", user_id))
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 200); // OK

    let response_body = response.json::<serde_json::Value>().await.unwrap();
    assert_eq!(response_body["third_party_id"], user_id);
}

async fn test_update_user_api(client: &Client) {
    let user_id = "api_test_user_1";
    let request_body = serde_json::json!({
        "name": "Updated API Test User 1",
        "phone": "9876543210"
    });
    let response = client
        .put(&format!("http://localhost:3000/users/{}", user_id))
        .json(&request_body)
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 200); // OK

    let response_body = response.json::<serde_json::Value>().await.unwrap();
    assert_eq!(response_body["third_party_id"], user_id);
    assert_eq!(response_body["name"], "Updated API Test User 1");
    assert_eq!(response_body["phone"], "9876543210");
}

async fn test_delete_user_api(client: &Client) {
    let user_id = "api_test_user_1";
    let response = client
        .delete(&format!("http://localhost:3000/users/{}", user_id))
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 204); // No Content

    let response = client
        .get(&format!("http://localhost:3000/users/{}", user_id))
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 404); // Not Found
}

#[tokio::test]
async fn test_user_api() {
    let client = Client::new();
    sleep(Duration::from_secs(1)).await; // 等待服务器启动
    test_create_user_api(&client).await;
    test_get_user_api(&client).await;
    test_update_user_api(&client).await;
    test_delete_user_api(&client).await;
}
3.3 Redis 消息的集成测试

我们可以使用 Redis 的客户端测试消息的发布和订阅功能:

use redis::Client;
use tokio::time::timeout;
use std::time::Duration;

async fn test_publish_and_subscribe(client: &Client) {
    let mut subscriber = client.get_tokio_connection().await.unwrap().into_pubsub();
    subscriber.subscribe("test_channel").await.unwrap();

    let publisher = client.get_tokio_connection().await.unwrap();
    redis::cmd("PUBLISH")
        .arg("test_channel")
        .arg("test_message")
        .query_async::<_, i64>(&mut publisher)
        .await
        .unwrap();

    let msg = timeout(Duration::from_secs(1), subscriber.get_message())
        .await
        .unwrap()
        .unwrap();
    let payload: String = msg.get_payload().await.unwrap();
    assert_eq!(payload, "test_message");
}

async fn test_channel_exists(client: &Client) {
    let mut subscriber = client.get_tokio_connection().await.unwrap().into_pubsub();
    let result = subscriber.subscribe("non_existent_channel").await;
    assert!(result.is_ok());
    let result = subscriber.unsubscribe("non_existent_channel").await;
    assert!(result.is_ok());
}

#[tokio::test]
async fn test_redis_pubsub() {
    let client = Client::open("redis://localhost:6379/0").unwrap();
    test_publish_and_subscribe(&client).await;
    test_channel_exists(&client).await;
}
3.4 边界条件与异常场景测试
3.4.1 边界条件测试

边界条件测试是测试参数的最大值、最小值、空值等情况:

use reqwest::Client;
use tokio::time::sleep;
use std::time::Duration;

async fn test_empty_name(client: &Client) {
    let request_body = serde_json::json!({
        "third_party_id": "test_user_empty_name",
        "name": "",
        "email": "[email protected]",
        "phone": "1234567893",
        "status": "active"
    });
    let response = client
        .post("http://localhost:3000/users")
        .json(&request_body)
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 400); // Bad Request
}

async fn test_invalid_email(client: &Client) {
    let request_body = serde_json::json!({
        "third_party_id": "test_user_invalid_email",
        "name": "Test User Invalid Email",
        "email": "invalid_email",
        "phone": "1234567894",
        "status": "active"
    });
    let response = client
        .post("http://localhost:3000/users")
        .json(&request_body)
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 400); // Bad Request
}

async fn test_negative_amount(client: &Client) {
    let request_body = serde_json::json!({
        "user_id": "a8f7d9e0-1234-5678-90ab-cdef12345678",
        "order_number": "ORD-002",
        "amount": -100.0,
        "currency": "USD",
        "status": "pending"
    });
    let response = client
        .post("http://localhost:3000/orders")
        .json(&request_body)
        .send()
        .await
        .unwrap();
    assert_eq!(response.status().as_u16(), 400); // Bad Request
}

#[tokio::test]
async fn test_boundary_conditions() {
    let client = Client::new();
    sleep(Duration::from_secs(1)).await; // 等待服务器启动
    test_empty_name(&client).await;
    test_invalid_email(&client).await;
    test_negative_amount(&client).await;
}
3.4.2 异常场景测试

异常场景测试是测试服务不可用、超时、权限不足等情况:

use reqwest::Client;
use reqwest::Error;

async fn test_server_unavailable(client: &Client) {
    let response = client.get("http://localhost:9999/health").send().await;
    assert!(response.is_err());
    let error = response.unwrap_err();
    assert!(error.is_connect()); // 连接错误
}

async fn test_api_timeout(client: &Client) {
    let response = tokio::time::timeout(
        std::time::Duration::from_millis(500),
        client.get("http://localhost:3000/long_running").send()
    ).await;
    assert!(response.is_err());
    let error = response.unwrap_err();
    assert!(error.is_timeout());
}

async fn test_unauthorized_access(client: &Client) {
    let response = client.get("http://localhost:3000/protected").send().await.unwrap();
    assert_eq!(response.status().as_u16(), 401); // Unauthorized
}

#[tokio::test]
async fn test_exception_scenarios() {
    let client = Client::new();
    test_server_unavailable(&client).await;
    // test_api_timeout(&client).await; // 需要服务器提供长时间运行的接口
    // test_unauthorized_access(&client).await; // 需要服务器提供受保护的接口
}

四、异步调试的核心工具

4.1 使用 tracing 记录调试信息

tracing 是 Rust 的日志库,可以用于记录异步任务的执行信息,包括任务的创建、完成、错误等。我们可以使用 tokio-tracing 库来记录 Tokio 的任务执行信息。

在 Cargo.toml 中添加依赖:

[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tokio-tracing = "0.1"

在代码中使用 tracing:

use tracing::info;
use tokio::time::sleep;
use std::time::Duration;

#[tokio::main]
async fn main() {
    tracing_subscriber::registry()
        .with(tracing_subscriber::EnvFilter::new("info"))
        .with(tracing_subscriber::fmt::layer())
        .init();

    info!("Application started");

    let mut handles = Vec::new();
    for i in 1..=3 {
        let handle = tokio::spawn(async move {
            info!("Task {} started", i);
            sleep(Duration::from_millis(100 * i)).await;
            info!("Task {} finished", i);
            i
        });
        handles.push(handle);
    }

    let results: Vec<_> = futures::future::join_all(handles)
        .await
        .into_iter()
        .map(|r| r.unwrap())
        .collect();

    info!("All tasks finished. Results: {:?}", results);
}

运行程序并查看日志:

RUST_LOG=info cargo run
4.2 使用 tokio-console 定位性能问题

tokio-console 是 Tokio 提供的调试工具,可以用于定位异步任务的性能问题,包括任务的执行时间、等待时间、内存使用等。

安装 tokio-console:

cargo install tokio-console

在 Cargo.toml 中添加依赖:

[dependencies]
tokio = { version = "1.0", features = ["full", "trace"] }

运行程序并使用 tokio-console:

RUSTFLAGS="--cfg tokio_unstable" RUST_LOG=info cargo run -- tokio-console
4.3 异步堆栈跟踪与错误定位

异步代码的堆栈跟踪与同步代码不同,需要使用 tokio-tracing 或 backtrace 库来获取完整的堆栈信息。

在 Cargo.toml 中添加依赖:

[dependencies]
backtrace = "0.3"

使用 backtrace 获取堆栈信息:

use backtrace::Backtrace;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum AppError {
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),
    #[error("Custom error: {0}")]
    Custom(String),
}

impl AppError {
    pub fn with_backtrace(self) -> (Self, Backtrace) {
        (self, Backtrace::new())
    }
}

async fn foo() -> Result<(), AppError> {
    bar().await
}

async fn bar() -> Result<(), AppError> {
    baz().await
}

async fn baz() -> Result<(), AppError> {
    Err(AppError::Custom("Test error".to_string()))
}

#[tokio::main]
async fn main() {
    let (error, backtrace) = foo().await.unwrap_err().with_backtrace();
    println!("Error: {:?}", error);
    println!("Backtrace: {:?}", backtrace);
}

五、实战项目优化:为项目添加测试

5.1 测试结构设计

我们可以按照模块组织测试,每个模块包含单元测试和集成测试:

rust-async-microservices/
├── common/
│   ├── src/
│   │   ├── errors.rs
│   │   ├── models.rs
│   │   ├── db.rs
│   │   ├── redis.rs
│   │   └── http.rs
│   └── tests/
│       ├── errors_test.rs
│       ├── models_test.rs
│       ├── db_test.rs
│       ├── redis_test.rs
│       └── http_test.rs
├── user-sync-service/
│   ├── src/
│   │   ├── config.rs
│   │   ├── sync.rs
│   │   ├── scheduler.rs
│   │   └── metrics.rs
│   └── tests/
│       ├── config_test.rs
│       ├── sync_test.rs
│       ├── scheduler_test.rs
│       └── metrics_test.rs
├── order-processing-service/
│   ├── src/
│   │   ├── config.rs
│   │   ├── processing.rs
│   │   └── metrics.rs
│   └── tests/
│       ├── config_test.rs
│       ├── processing_test.rs
│       └── metrics_test.rs
└── monitoring-service/
    ├── src/
    │   ├── config.rs
    │   ├── status.rs
    │   ├── websocket.rs
    │   └── routes.rs
    └── tests/
        ├── config_test.rs
        ├── status_test.rs
        ├── websocket_test.rs
        └── routes_test.rs
5.2 公共模块的单元测试
5.2.1 模型测试

common/tests/models_test.rs:

use common::models::{User, ThirdPartyUser, Order, OrderMessage};
use chrono::Utc;

#[test]
fn test_third_party_user_to_user() {
    let third_party_user = ThirdPartyUser {
        id: "test_user_1".to_string(),
        name: "Test User 1".to_string(),
        email: "[email protected]".to_string(),
        phone: Some("1234567890".to_string()),
        status: "active".to_string(),
        created_at: Utc::now().to_rfc3339(),
        updated_at: Utc::now().to_rfc3339(),
    };
    let user = User::try_from(third_party_user).unwrap();
    assert_eq!(user.third_party_id, "test_user_1");
    assert_eq!(user.name, "Test User 1");
    assert_eq!(user.email, "[email protected]");
    assert_eq!(user.phone, Some("1234567890".to_string()));
    assert_eq!(user.status, "active");
}

#[test]
fn test_order_message_to_order() {
    let order_message = OrderMessage {
        user_id: "a8f7d9e0-1234-5678-90ab-cdef12345678".to_string(),
        order_number: "ORD-001".to_string(),
        amount: 100.0,
        currency: "USD".to_string(),
        status: "pending".to_string(),
    };
    let order = Order::try_from(order_message).unwrap();
    assert_eq!(order.user_id.to_string(), "a8f7d9e0-1234-5678-90ab-cdef12345678");
    assert_eq!(order.order_number, "ORD-001");
    assert_eq!(order.amount, 100.0);
    assert_eq!(order.currency, "USD");
    assert_eq!(order.status, "pending");
}
5.2.2 错误处理测试

common/tests/errors_test.rs:

use common::errors::AppError;

#[test]
fn test_error_conversion() {
    let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found");
    let app_error = AppError::from(io_error);
    assert!(matches!(app_error, AppError::Io(_)));

    let sqlx_error = sqlx::Error::RowNotFound;
    let app_error = AppError::from(sqlx_error);
    assert!(matches!(app_error, AppError::Database(_)));

    let custom_error = AppError::Custom("Test error".to_string());
    assert!(matches!(custom_error, AppError::Custom(_)));
}

#[test]
fn test_error_display() {
    let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found");
    let app_error = AppError::from(io_error);
    assert!(app_error.to_string().contains("File not found"));

    let custom_error = AppError::Custom("Test error".to_string());
    assert_eq!(custom_error.to_string(), "Custom error: Test error");
}
5.3 用户同步服务的集成测试

user-sync-service/tests/sync_test.rs:

use user_sync_service::sync::sync_users;
use user_sync_service::config::AppConfig;

#[tokio::test]
async fn test_user_sync() {
    let config = AppConfig::from_env().unwrap();
    let result = sync_users(&config).await;
    assert!(result.is_ok());
}
5.4 订单处理服务的集成测试

order-processing-service/tests/processing_test.rs:

use order_processing_service::processing::process_order_message;
use order_processing_service::config::AppConfig;
use common::redis::publish_message;
use common::models::OrderMessage;

#[tokio::test]
async fn test_order_processing() {
    let config = AppConfig::from_env().unwrap();
    let order_message = OrderMessage {
        user_id: "a8f7d9e0-1234-5678-90ab-cdef12345678".to_string(),
        order_number: "ORD-001".to_string(),
        amount: 100.0,
        currency: "USD".to_string(),
        status: "pending".to_string(),
    };
    let redis_client = common::redis::create_client(config.redis.clone()).await.unwrap();
    publish_message(&redis_client, "orders", &serde_json::to_string(&order_message).unwrap(),)
        .await
        .unwrap();

    // 等待订单处理完成
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    let pool = common::db::create_pool(config.db.clone()).await.unwrap();
    let order = sqlx::query_as!(common::models::Order, "SELECT * FROM orders WHERE order_number = $1", order_message.order_number)
        .fetch_optional(&pool)
        .await
        .unwrap();

    assert!(order.is_some());
}
5.5 实时监控服务的集成测试

monitoring-service/tests/status_test.rs:

use monitoring_service::status::get_system_status;
use monitoring_service::config::AppConfig;

#[tokio::test]
async fn test_system_status() {
    let config = AppConfig::from_env().unwrap();
    let status = get_system_status(&config).await.unwrap();
    assert!(status.user_sync_service.is_running);
    assert!(status.order_processing_service.is_running);
    assert!(status.monitoring_service.is_running);
    assert!(status.total_users >= 0);
    assert!(status.total_orders >= 0);
    assert!(status.failed_tasks >= 0);
}

六、异步测试的最佳实践

6.1 测试隔离与资源清理
6.1.1 测试隔离

每个测试应该是独立的,不应该依赖其他测试的结果。我们可以使用测试数据库和测试 Redis 实例来隔离测试:

use sqlx::PgPool;

#[sqlx::test]
async fn test_create_user(pool: PgPool) {
    // 测试操作
    let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(count, 0);

    // 插入用户
    sqlx::query!(r#"
        INSERT INTO users (
            id, third_party_id, name, email, phone, status,
            created_at, updated_at, last_synced_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    "#, uuid::Uuid::new_v4(), "test_user_1", "Test User 1", "[email protected]",
       Some("1234567890"), "active", chrono::Utc::now(), chrono::Utc::now(), chrono::Utc::now())
    .execute(&pool)
    .await
    .unwrap();

    // 查询用户数量
    let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(count, 1);
}
6.1.2 资源清理

测试结束后,我们需要清理资源,如数据库连接、网络连接、文件句柄等。SQLx 的测试宏会自动清理测试数据库:

use sqlx::PgPool;

#[sqlx::test]
async fn test_resource_cleanup(pool: PgPool) {
    // 测试操作
    let count = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(count, 0);

    sqlx::query!(r#"
        INSERT INTO users (
            id, third_party_id, name, email, phone, status,
            created_at, updated_at, last_synced_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    "#, uuid::Uuid::new_v4(), "test_user_1", "Test User 1", "[email protected]",
       Some("1234567890"), "active", chrono::Utc::now(), chrono::Utc::now(), chrono::Utc::now())
    .execute(&pool)
    .await
    .unwrap();

    // 测试结束后,SQLx 会自动清理数据库
}
6.2 测试性能优化
6.2.1 并行测试

我们可以使用 cargo test 的 --test-threads 参数来启用并行测试:

cargo test --test-threads=4
6.2.2 测试数据复用

我们可以使用测试数据复用的方法,避免每次测试都创建相同的数据:

use sqlx::PgPool;
use once_cell::sync::Lazy;
use common::models::User;
use chrono::Utc;

static TEST_USER: Lazy<User> = Lazy::new(|| User {
    id: uuid::Uuid::new_v4(),
    third_party_id: "test_user_reuse".to_string(),
    name: "Test User Reuse".to_string(),
    email: "[email protected]".to_string(),
    phone: Some("1234567890".to_string()),
    status: "active".to_string(),
    created_at: Utc::now(),
    updated_at: Utc::now(),
    last_synced_at: Utc::now(),
});

#[sqlx::test]
async fn test_user_reuse(pool: PgPool) {
    sqlx::query!(r#"
        INSERT INTO users (
            id, third_party_id, name, email, phone, status,
            created_at, updated_at, last_synced_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    "#, TEST_USER.id, TEST_USER.third_party_id, TEST_USER.name, TEST_USER.email, TEST_USER.phone,
       TEST_USER.status, TEST_USER.created_at, TEST_USER.updated_at, TEST_USER.last_synced_at)
    .execute(&pool)
    .await
    .unwrap();

    let user = sqlx::query_as!(User, "SELECT * FROM users WHERE third_party_id = $1", TEST_USER.third_party_id)
        .fetch_one(&pool)
        .await
        .unwrap();

    assert_eq!(user.id, TEST_USER.id);
}
6.3 测试覆盖率分析

我们可以使用 cargo-tarpaulin 工具来分析测试覆盖率:

cargo install cargo-tarpaulin
cargo tarpaulin --ignore-tests

七、总结

异步代码的测试与调试是 Rust 异步编程的重要环节。通过深入理解异步测试的本质与难点、基础异步测试框架、集成测试与边界条件测试、异步调试的核心工具、实战项目优化以及异步测试的最佳实践,我们可以编写出更高效、更安全的异步代码。

在实际项目中,我们应该根据项目的需求选择合适的测试方法,并注意测试隔离与资源清理、测试性能优化、测试覆盖率分析等方面的问题。同时,我们可以使用 Tokio 的调试工具和日志库来定位异步任务的性能问题。

目录

  1. Rust 异步代码的测试与调试实践
  2. 一、异步测试的本质与难点
  3. 1.1 异步测试与同步测试的区别
  4. 1.2 异步测试的核心挑战
  5. 1.2.1 时序性问题
  6. 1.2.2 状态管理问题
  7. 1.2.3 资源清理问题
  8. 二、基础异步测试框架
  9. 2.1 Tokio 测试宏的使用
  10. 2.1.1 配置 Tokio 测试运行时
  11. 2.2 基础异步函数测试
  12. 2.3 异步任务的超时管理
  13. 三、集成测试与边界条件测试
  14. 3.1 数据库操作的集成测试
  15. 3.2 HTTP 请求的集成测试
  16. 3.3 Redis 消息的集成测试
  17. 3.4 边界条件与异常场景测试
  18. 3.4.1 边界条件测试
  19. 3.4.2 异常场景测试
  20. 四、异步调试的核心工具
  21. 4.1 使用 tracing 记录调试信息
  22. 4.2 使用 tokio-console 定位性能问题
  23. 4.3 异步堆栈跟踪与错误定位
  24. 五、实战项目优化:为项目添加测试
  25. 5.1 测试结构设计
  26. 5.2 公共模块的单元测试
  27. 5.2.1 模型测试
  28. 5.2.2 错误处理测试
  29. 5.3 用户同步服务的集成测试
  30. 5.4 订单处理服务的集成测试
  31. 5.5 实时监控服务的集成测试
  32. 六、异步测试的最佳实践
  33. 6.1 测试隔离与资源清理
  34. 6.1.1 测试隔离
  35. 6.1.2 资源清理
  36. 6.2 测试性能优化
  37. 6.2.1 并行测试
  38. 6.2.2 测试数据复用
  39. 6.3 测试覆盖率分析
  40. 七、总结
  • 💰 8折买阿里云服务器限时8折了解详情
  • GPT-5.5 超高智商模型1元抵1刀ChatGPT中转购买
  • 代充Chatgpt Plus/pro 帐号了解详情
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • Windows 环境部署 Qwen2.5 对话机器人
  • Spring Boot Web 后端开发核心注解详解
  • Web 服务与 I/O 模型详解及 Nginx 实战
  • LeetCode 92 链表区间反转:递归反转与哨兵技巧
  • 算法实战:双指针解决三数之和与四数之和
  • Figma设计稿转前端代码:基于Cursor IDE MCP功能的自动化方案
  • RoboTwin 双臂机器人基准平台完整配置指南
  • 从零开始模拟实现 STL vector 容器
  • LeetCode 最长公共前缀:三种解法详解
  • AIGC 时代下 R 语言在数据科学中的应用实践
  • Linux 线程安全与线程同步
  • NVIDIA 发布开放模型和数据,加速语言、生物与机器人 AI 创新
  • Stable Diffusion 秋叶整合包:环境配置与使用指南
  • Hunyuan-MT-7B-WEBUI 远程访问配置与安全策略
  • AI 驱动的产品落地全流程:从需求挖掘到上线管控实战
  • Moon VR Video Player 使用指南:支持 8K/12K 多音轨及外挂字幕
  • VMware 虚拟机安装 Ubuntu 24.04 详细指南
  • 基于 Python Flask 的二手教材交易平台设计与实现
  • VLA 机器人革命:10 篇关键视觉 - 语言 - 动作模型解析
  • OpenClaw Web UI 访问报错 Not Found 排查与修复

相关免费在线工具

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online

  • JSON 压缩

    通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online

  • JSON美化和格式化

    将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online