Rust 异步编程的错误处理艺术
深入探讨了 Rust 异步编程中的错误处理机制。内容涵盖异步与同步错误的区别、IO/超时/取消/业务逻辑等错误分类、标准 Result 在异步上下文的应用、自定义错误类型设计(thiserror/anyhow)、超时与取消的具体处理策略、并发任务的错误聚合方法以及父子任务间的错误传递。最后通过一个基于 Axum 和 SQLx 的实战项目展示了完整的异步 API 错误处理体系,并介绍了使用 tracing 和 tokio-console 进行调试排查的最佳实践。

深入探讨了 Rust 异步编程中的错误处理机制。内容涵盖异步与同步错误的区别、IO/超时/取消/业务逻辑等错误分类、标准 Result 在异步上下文的应用、自定义错误类型设计(thiserror/anyhow)、超时与取消的具体处理策略、并发任务的错误聚合方法以及父子任务间的错误传递。最后通过一个基于 Axum 和 SQLx 的实战项目展示了完整的异步 API 错误处理体系,并介绍了使用 tracing 和 tokio-console 进行调试排查的最佳实践。

在 Rust 同步编程中,错误通常是通过 Result<T, E> 类型返回的,Err 变体包含了错误信息,程序会阻塞线程直到操作完成。而在异步编程中,操作的结果是一个 Future<Output = Result<T, E>>,程序会暂停任务直到操作完成,Err 变体可能是 IO 错误、超时错误、取消错误等异步场景特有的错误。
同步错误示例:
use std::fs::File;
use std::io::Read;
// 同步读取文件,阻塞线程
fn read_file_sync() -> Result<String, std::io::Error> {
let mut file = File::open("test.txt")?;
let mut content = String::new();
file.read_to_string(&mut content)?;
Ok(content)
}
fn main() {
match read_file_sync() {
Ok(content) => println!("File content: {}", content),
Err(e) => println!("Error reading file: {}", e),
}
}
异步错误示例:
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// 异步读取文件,暂停任务
async fn read_file_async() -> Result<String, std::io::Error> {
let mut file = File::open("test.txt").await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
Ok(content)
}
#[tokio::main]
async fn main() {
match read_file_async().await {
Ok(content) => println!("File content: {}", content),
Err(e) => println!("Error reading file: {}", e),
}
}
异步错误可以分为以下几类:
IO 错误是异步编程中最常见的错误类型,包括网络连接失败、文件读取失败、数据库连接失败等。这类错误通常由 std::io::Error 表示。
use tokio::net::TcpStream;
async fn connect_to_server() -> Result<TcpStream, std::io::Error> {
let stream = TcpStream::connect("127.0.0.1:8080").await?;
Ok(stream)
}
超时错误是指异步操作在指定时间内没有完成。Tokio 提供了 tokio::time::timeout 函数来设置超时,超时会返回 tokio::time::error::Elapsed 错误。
use tokio::time::{timeout, Duration};
async fn async_operation() {
tokio::time::sleep(Duration::from_secs(3)).await;
println!("Operation completed");
}
#[tokio::main]
async fn main() {
let result = timeout(Duration::from_secs(2), async_operation()).await;
match result {
Ok(_) => println!("Success"),
Err(e) => println!("Error: {}", e),
}
}
取消错误是指异步任务被主动取消。Tokio 的 tokio::task::JoinHandle::abort 方法可以取消任务,取消会返回 tokio::task::JoinError 错误。
use tokio::time::sleep;
use std::time::Duration;
async fn task() {
sleep(Duration::from_secs(5)).await;
println!("Task completed");
}
#[tokio::main]
async fn main() {
let handle = tokio::spawn(task());
sleep(Duration::from_secs(2)).await;
handle.abort();
let result = handle.await;
match result {
Ok(_) => println!("Success"),
Err(e) => println!("Error: {}", e),
}
}
业务逻辑错误是指应用程序的业务规则违反,例如用户不存在、余额不足、密码错误等。这类错误需要我们自定义错误类型来表示。
// 自定义业务逻辑错误类型
#[derive(Debug)]
enum BusinessError {
UserNotFound,
InsufficientBalance,
InvalidPassword,
}
impl std::fmt::Display for BusinessError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
BusinessError::UserNotFound => write!(f, "User not found"),
BusinessError::InsufficientBalance => write!(f, "Insufficient balance"),
BusinessError::InvalidPassword => write!(f, "Invalid password"),
}
}
}
impl std::error::Error for BusinessError {}
async fn login(username: &str, password: &str) -> Result<(), BusinessError> {
if username != "admin" {
return Err(BusinessError::UserNotFound);
}
if password != "password" {
return Err(BusinessError::InvalidPassword);
}
Ok(())
}
系统错误是指操作系统级别的错误,例如内存不足、权限不足等。这类错误通常由 std::io::Error 表示。
use tokio::fs::File;
async fn read_sensitive_file() -> Result<String, std::io::Error> {
let mut file = File::open("/etc/shadow").await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
Ok(content)
}
在异步函数中,? 操作符的使用方式与同步函数相同。当遇到 Err 变体时,它会将错误包装在 Result<_, Box<dyn Error>> 类型中并返回。
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use std::error::Error;
async fn read_file_and_parse() -> Result<Vec<u32>, Box<dyn Error>> {
let mut file = File::open("numbers.txt").await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
let numbers: Vec<u32> = content
.lines()
.map(|line| line.parse::<u32>())
.collect::<Result<_, _>>()?;
Ok(numbers)
}
#[tokio::main]
async fn main() {
match read_file_and_parse().await {
Ok(numbers) => println!("Numbers: {:?}", numbers),
Err(e) => println!("Error: {}", e),
}
}
在异步任务中使用 unwrap() 或 expect() 是非常危险的,因为它们会导致任务崩溃。我们应该尽量使用 match 语句或 ? 操作符来处理错误。
危险示例:
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn read_file() {
let mut file = File::open("nonexistent.txt").await.unwrap(); // 会导致任务崩溃
let mut content = String::new();
file.read_to_string(&mut content).await.unwrap();
println!("File content: {}", content);
}
#[tokio::main]
async fn main() {
read_file().await;
}
安全示例:
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn read_file() -> Result<(), Box<dyn std::error::Error>> {
let mut file = File::open("nonexistent.txt").await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
println!("File content: {}", content);
Ok(())
}
#[tokio::main]
async fn main() {
match read_file().await {
Ok(_) => println!("Success"),
Err(e) => println!("Error: {}", e),
}
}
Box<dyn Error> 是 Rust 中用于统一错误类型的常用方法。它可以容纳任何实现了 Error trait 的类型,包括标准库的错误类型和自定义错误类型。
use tokio::time::{timeout, Duration};
use std::error::Error;
async fn async_operation() -> Result<(), Box<dyn Error>> {
// 模拟超时错误
let result = timeout(Duration::from_secs(2), tokio::time::sleep(Duration::from_secs(3))).await?;
Ok(result)
}
async fn read_file() -> Result<String, Box<dyn Error>> {
let mut file = tokio::fs::File::open("test.txt").await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
Ok(content)
}
async fn combined_operation() -> Result<(), Box<dyn Error>> {
async_operation().await?;
let content = read_file().await?;
println!("File content: {}", content);
Ok(())
}
#[tokio::main]
async fn main() {
match combined_operation().await {
Ok(_) => println!("Success"),
Err(e) => println!("Error: {}", e),
}
}
在 Rust 中,有两个常用的错误处理库:thiserror 和 anyhow。
在 Cargo.toml 中添加依赖:
[dependencies]
thiserror = "1.0"
anyhow = "1.0"
thiserror 提供了 #[derive(Error)] 宏,可以快速生成 Error trait 的实现。
use thiserror::Error;
use tokio::time::error::Elapsed;
#[derive(Error, Debug)]
pub enum AppError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Timeout error: {0}")]
Timeout(#[from] Elapsed),
#[error("Business error: {0}")]
Business(#[from] BusinessError),
#[error("System error: {0}")]
System(String),
}
#[derive(Error, Debug)]
pub enum BusinessError {
#[error("User not found")]
UserNotFound,
#[error("Insufficient balance")]
InsufficientBalance,
#[error("Invalid password")]
InvalidPassword,
}
async fn login(username: &str, password: &str) -> Result<(), AppError> {
if username != "admin" {
return Err(AppError::Business(BusinessError::UserNotFound));
}
if password != "password" {
return Err(AppError::Business(BusinessError::InvalidPassword));
}
Ok(())
}
async fn connect_to_server() -> Result<tokio::net::TcpStream, AppError> {
let stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await?;
Ok(stream)
}
anyhow 提供了 Result<T> 类型,它是 Result<T, anyhow::Error> 的别名。使用 anyhow 可以快速处理各种类型的错误。
use anyhow::Result;
use tokio::time::{timeout, Duration};
async fn async_operation() -> Result<()> {
let result = timeout(Duration::from_secs(2), tokio::time::sleep(Duration::from_secs(3))).await?;
Ok(result)
}
async fn read_file() -> Result<String> {
let mut file = tokio::fs::File::open("test.txt").await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
Ok(content)
}
async fn combined_operation() -> Result<()> {
async_operation().await?;
let content = read_file().await?;
println!("File content: {}", content);
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = combined_operation().await {
println!("Error: {}", e);
println!("Chain:");
for (i, cause) in e.chain().enumerate() {
println!(" {}: {}", i, cause);
}
}
}
我们可以结合 thiserror 和 anyhow,在库中使用 thiserror 定义自定义错误类型,在应用中使用 anyhow 处理错误。
库代码(my-lib/src/lib.rs):
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MyLibError {
#[error("Invalid input: {0}")]
InvalidInput(String),
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
}
pub async fn fetch_data() -> Result<String, MyLibError> {
let response = reqwest::get("https://example.com/api/data").await?;
let data = response.text().await?;
Ok(data)
}
应用代码(src/main.rs):
use my_lib::fetch_data;
use anyhow::Result;
async fn process_data() -> Result<()> {
let data = fetch_data().await?;
println!("Data: {}", data);
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = process_data().await {
println!("Error: {}", e);
}
}
Tokio 的 tokio::time::timeout 函数会返回 Result<T, Elapsed> 类型,我们需要对 Elapsed 错误进行处理。
use tokio::time::{timeout, Duration};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Timeout error")]
Timeout,
#[error("Other error: {0}")]
Other(#[from] anyhow::Error),
}
async fn async_operation() -> Result<(), AppError> {
// 模拟长时间运行的操作
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
async fn run_with_timeout() -> Result<(), AppError> {
let result = timeout(Duration::from_secs(3), async_operation()).await;
match result {
Ok(_) => Ok(()),
Err(_) => Err(AppError::Timeout),
}
}
#[tokio::main]
async fn main() {
match run_with_timeout().await {
Ok(_) => println!("Success"),
Err(AppError::Timeout) => println!("Error: Operation timed out"),
Err(e) => println!("Error: {}", e),
}
}
Tokio 的 tokio::task::JoinHandle::abort 方法会取消任务,取消会返回 tokio::task::JoinError 错误。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task canceled")]
Canceled,
#[error("Other error: {0}")]
Other(#[from] anyhow::Error),
}
async fn task() -> Result<(), AppError> {
for i in 0..5 {
println!("Task iteration {}", i);
sleep(Duration::from_secs(1)).await;
}
Ok(())
}
async fn run_with_cancel() -> Result<(), AppError> {
let handle = tokio::spawn(task());
sleep(Duration::from_secs(2)).await;
handle.abort();
let result = handle.await;
match result {
Ok(_) => Ok(()),
Err(e) => {
if e.is_cancelled() {
Err(AppError::Canceled)
} else {
Err(AppError::Other(e.into()))
}
}
}
}
#[tokio::main]
async fn main() {
match run_with_cancel().await {
Ok(_) => println!("Success"),
Err(AppError::Canceled) => println!("Error: Task canceled"),
Err(e) => println!("Error: {}", e),
}
}
在实际应用中,我们可能需要同时处理超时和取消错误。
use tokio::time::{timeout, Duration};
use tokio::sync::oneshot;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Timeout error")]
Timeout,
#[error("Task canceled")]
Canceled,
#[error("Other error: {0}")]
Other(#[from] anyhow::Error),
}
async fn async_operation(mut cancel_rx: oneshot::Receiver<()>) -> Result<(), AppError> {
for i in 0..5 {
println!("Task iteration {}", i);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
_ = &mut cancel_rx => return Err(AppError::Canceled),
}
}
Ok(())
}
async fn run_with_timeout_and_cancel() -> Result<(), AppError> {
let (cancel_tx, cancel_rx) = oneshot::channel();
let handle = tokio::spawn(async move {
let _ = cancel_tx.send(());
async_operation(cancel_rx).await
});
let result = timeout(Duration::from_secs(3), handle).await;
match result {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => {
if e.is_cancelled() {
Err(AppError::Canceled)
} else {
Err(AppError::Other(e.into()))
}
}
Err(_) => Err(AppError::Timeout),
}
}
#[tokio::main]
async fn main() {
match run_with_timeout_and_cancel().await {
Ok(_) => println!("Success"),
Err(AppError::Timeout) => println!("Error: Operation timed out"),
Err(AppError::Canceled) => println!("Error: Task canceled"),
Err(e) => println!("Error: {}", e),
}
}
use tokio::join;
use tokio::try_join;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task1 error")]
Task1,
#[error("Task2 error")]
Task2,
}
async fn task1() -> Result<(), AppError> {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(())
}
async fn task2() -> Result<(), AppError> {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Err(AppError::Task2)
}
#[tokio::main]
async fn main() {
println!("Using join!:");
let (res1, res2) = join!(task1(), task2());
println!("Task1 result: {:?}", res1);
println!("Task2 result: {:?}", res2);
println!("\nUsing try_join!:");
let result = try_join!(task1(), task2());
println!("Result: {:?}", result);
}
我们可以使用 futures::future::try_join_all 函数聚合多个任务的错误。
use futures::future::try_join_all;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task {0} error")]
Task(usize),
}
async fn task(i: usize) -> Result<usize, AppError> {
tokio::time::sleep(tokio::time::Duration::from_secs(i as u64)).await;
if i % 2 == 0 {
Ok(i)
} else {
Err(AppError::Task(i))
}
}
#[tokio::main]
async fn main() {
let tasks = (0..5).map(|i| task(i));
let result = try_join_all(tasks).await;
println!("Result: {:?}", result);
}
Tokio 的 tokio::spawn 函数会返回 tokio::task::JoinHandle<T> 类型,我们需要使用 join() 方法等待任务完成并处理错误。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task error")]
Task,
}
async fn task() -> Result<(), AppError> {
sleep(Duration::from_secs(2)).await;
Err(AppError::Task)
}
#[tokio::main]
async fn main() {
let handle = tokio::spawn(task());
let result = handle.await;
println!("Task result: {:?}", result);
}
错误链是指错误之间的因果关系,我们可以使用 thiserror 的 #[source] 属性来构建错误链。
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Failed to fetch data: {0}")]
FetchData(#[source] reqwest::Error),
#[error("Failed to parse data: {0}")]
ParseData(#[source] serde_json::Error),
}
async fn fetch_and_parse_data() -> Result<serde_json::Value, AppError> {
let response = reqwest::get("https://example.com/api/data").await.map_err(AppError::FetchData)?;
let data = response.text().await.map_err(AppError::FetchData)?;
let json = serde_json::from_str(&data).map_err(AppError::ParseData)?;
Ok(json)
}
#[tokio::main]
async fn main() {
let result = fetch_and_parse_data().await;
match result {
Ok(json) => println!("Data: {}", serde_json::to_string_pretty(&json).unwrap()),
Err(e) => {
println!("Error: {}", e);
println!("Cause chain:");
let mut cause = e.source();
while let Some(c) = cause {
println!(" {}", c);
cause = c.source();
}
}
}
}
我们可以使用 tokio::sync::oneshot 通道在异步任务间传递错误。
use tokio::sync::oneshot;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task error")]
Task,
}
async fn task(tx: oneshot::Sender<Result<(), AppError>>) {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let _ = tx.send(Err(AppError::Task));
}
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(task(tx));
let result = rx.await.unwrap();
println!("Task result: {:?}", result);
}
当父任务创建子任务时,我们可以将子任务的错误传递到父任务。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Child task error")]
ChildTask,
}
async fn child_task() -> Result<(), AppError> {
sleep(Duration::from_secs(1)).await;
Err(AppError::ChildTask)
}
async fn parent_task() -> Result<(), AppError> {
let handle = tokio::spawn(child_task());
let result = handle.await?;
Ok(result)
}
#[tokio::main]
async fn main() {
let result = parent_task().await;
println!("Parent task result: {:?}", result);
}
我们将构建一个异步 RESTful API,支持用户管理功能,包括创建用户、查询用户、更新用户和删除用户。我们将设计完整的错误处理体系,包括请求验证错误、数据库操作错误、业务逻辑错误的统一响应。
项目架构设计:
创建项目:
cargo new rust-async-api-error-handling
cd rust-async-api-error-handling
在 Cargo.toml 中添加依赖:
[dependencies]
axum = { version = "0.5", features = ["ws"] }
tokio = { version = "1.0", features = ["full"] }
sqlx = { version = "0.6", features = ["postgres", "runtime-tokio-rustls"] }
thiserror = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.1", features = ["v4"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
创建 src/db.rs:
use sqlx::PgPool;
pub async fn create_pool(database_url: &str) -> PgPool {
PgPool::connect(database_url).await.unwrap()
}
创建 src/models.rs:
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
pub struct User {
pub id: Uuid,
pub name: String,
pub email: String,
pub password: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateUserRequest {
pub name: String,
pub email: String,
pub password: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserRequest {
pub name: Option<String>,
pub email: Option<String>,
pub password: Option<String>,
}
创建 src/errors.rs:
use thiserror::Error;
use axum::{http::StatusCode, response::{IntoResponse, Response}};
use serde::{Serialize, Deserialize};
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Request validation error: {0}")]
Validation(String),
#[error("User not found")]
UserNotFound,
#[error("User already exists")]
UserAlreadyExists,
#[error("Internal server error")]
InternalServerError,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub error: String,
pub code: u16,
pub message: String,
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, code, message) = match self {
AppError::Database(e) => {
tracing::error!("Database error: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, 500, "Internal server error")
},
AppError::Validation(msg) => (StatusCode::BAD_REQUEST, 400, msg.as_str()),
AppError::UserNotFound => (StatusCode::NOT_FOUND, 404, "User not found"),
AppError::UserAlreadyExists => (StatusCode::CONFLICT, 409, "User already exists"),
AppError::InternalServerError => (StatusCode::INTERNAL_SERVER_ERROR, 500, "Internal server error"),
};
let response = ErrorResponse {
error: self.to_string(),
code: code,
message: message.to_string(),
};
(status, axum::Json(response)).into_response()
}
}
创建 src/repository.rs:
use sqlx::PgPool;
use uuid::Uuid;
use crate::models::*;
use crate::errors::AppError;
pub async fn create_user(pool: &PgPool, user: CreateUserRequest) -> Result<User, AppError> {
// 检查用户是否已经存在
let exists = sqlx::query_scalar!("SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)", user.email).fetch_one(pool).await?;
if exists {
return Err(AppError::UserAlreadyExists);
}
// 创建用户
let user = sqlx::query_as!(User, "INSERT INTO users (id, name, email, password, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *", Uuid::new_v4(), user.name, user.email, user.password, chrono::Utc::now(), chrono::Utc::now()).fetch_one(pool).await?;
Ok(user)
}
pub async fn get_user_by_id(pool: &PgPool, id: Uuid) -> Result<User, AppError> {
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id).fetch_one(pool).await.map_err(|e| {
if e.as_database_error().is_some() && e.to_string().contains("no row found") {
AppError::UserNotFound
} else {
AppError::Database(e)
}
})?;
Ok(user)
}
pub async fn update_user(pool: &PgPool, id: Uuid, user: UpdateUserRequest) -> Result<User, AppError> {
let updated_user = sqlx::query_as!(User, "UPDATE users SET name = COALESCE($1, name), email = COALESCE($2, email), password = COALESCE($3, password), updated_at = $4 WHERE id = $5 RETURNING *", user.name, user.email, user.password, chrono::Utc::now(), id).fetch_one(pool).await.map_err(|e| {
if e.as_database_error().is_some() && e.to_string().contains("no row found") {
AppError::UserNotFound
} else {
AppError::Database(e)
}
})?;
Ok(updated_user)
}
pub async fn delete_user(pool: &PgPool, id: Uuid) -> Result<(), AppError> {
let result = sqlx::query!("DELETE FROM users WHERE id = $1", id).execute(pool).await?;
if result.rows_affected() == 0 {
return Err(AppError::UserNotFound);
}
Ok(())
}
创建 src/service.rs:
use crate::repository::*;
use crate::models::*;
use crate::errors::AppError;
use sqlx::PgPool;
pub async fn create_user_service(pool: &PgPool, user: CreateUserRequest) -> Result<User, AppError> {
// 验证请求参数
if user.name.is_empty() {
return Err(AppError::Validation("Name is required".to_string()));
}
if user.email.is_empty() {
return Err(AppError::Validation("Email is required".to_string()));
}
if !user.email.contains("@") {
return Err(AppError::Validation("Invalid email format".to_string()));
}
if user.password.len() < 6 {
return Err(AppError::Validation("Password must be at least 6 characters long".to_string()));
}
create_user(pool, user).await
}
pub async fn get_user_by_id_service(pool: &PgPool, id: uuid::Uuid) -> Result<User, AppError> {
get_user_by_id(pool, id).await
}
pub async fn update_user_service(pool: &PgPool, id: uuid::Uuid, user: UpdateUserRequest) -> Result<User, AppError> {
// 验证请求参数
if let Some(email) = &user.email {
if !email.contains("@") {
return Err(AppError::Validation("Invalid email format".to_string()));
}
}
if let Some(password) = &user.password {
if password.len() < 6 {
return Err(AppError::Validation("Password must be at least 6 characters long".to_string()));
}
}
update_user(pool, id, user).await
}
pub async fn delete_user_service(pool: &PgPool, id: uuid::Uuid) -> Result<(), AppError> {
delete_user(pool, id).await
}
创建 src/routes.rs:
use axum::{routing::{get, post, put, delete}, extract::Path, extract::State, Json};
use uuid::Uuid;
use crate::models::*;
use crate::service::*;
use crate::errors::AppError;
use sqlx::PgPool;
pub fn create_routes() -> axum::Router<PgPool> {
axum::Router::new()
.route("/users", post(create_user_handler))
.route("/users/:id", get(get_user_handler))
.route("/users/:id", put(update_user_handler))
.route("/users/:id", delete(delete_user_handler))
}
async fn create_user_handler(State(pool): State<PgPool>, Json(req): Json<CreateUserRequest>,) -> Result<Json<User>, AppError> {
let user = create_user_service(&pool, req).await?;
Ok(Json(user))
}
async fn get_user_handler(State(pool): State<PgPool>, Path(id): Path<Uuid>,) -> Result<Json<User>, AppError> {
let user = get_user_by_id_service(&pool, id).await?;
Ok(Json(user))
}
async fn update_user_handler(State(pool): State<PgPool>, Path(id): Path<Uuid>, Json(req): Json<UpdateUserRequest>,) -> Result<Json<User>, AppError> {
let user = update_user_service(&pool, id, req).await?;
Ok(Json(user))
}
async fn delete_user_handler(State(pool): State<PgPool>, Path(id): Path<Uuid>,) -> Result<(), AppError> {
delete_user_service(&pool, id).await
}
创建 src/main.rs:
use axum::Router;
use tracing_subscriber::prelude::*;
use tracing::info;
use crate::db::create_pool;
use crate::routes::create_routes;
mod db;
mod errors;
mod models;
mod repository;
mod routes;
mod service;
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
let database_url = "postgresql://user:password@localhost:5432/mydb";
let pool = create_pool(database_url).await;
info!("Database pool created");
let app: Router<sqlx::PgPool> = create_routes();
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
info!("API server running on http://0.0.0.0:3000");
axum::serve(listener, app.with_state(pool)).await.unwrap();
}
创建 migrations 目录:
mkdir -p migrations/20230101000000_create_users
创建 migrations/20230101000000_create_users/up.sql:
CREATE TABLE users (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
创建 migrations/20230101000000_create_users/down.sql:
DROP TABLE users;
运行数据库迁移:
cargo sqlx migrate run
我们可以使用 curl 工具测试 API:
curl -X POST -H "Content-Type: application/json" -d '{"name": "Alice", "email": "[email protected]", "password": "password123"}' http://localhost:3000/users
curl -X GET http://localhost:3000/users/{user_id}
curl -X PUT -H "Content-Type: application/json" -d '{"name": "Alice Smith"}' http://localhost:3000/users/{user_id}
curl -X DELETE http://localhost:3000/users/{user_id}
我们可以使用 tokio-tracing 库记录错误的详细信息,包括错误的位置、堆栈信息等。
use tracing::error;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
}
async fn fetch_data() -> Result<(), AppError> {
let result = sqlx::query!("SELECT * FROM nonexistent_table").execute(&sqlx::PgPool::connect("postgresql://user:password@localhost:5432/mydb").await?).await;
result?;
Ok(())
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new("info"))
.with(tracing_subscriber::fmt::layer())
.init();
if let Err(e) = fetch_data().await {
error!("Error fetching data: {:?}", e);
error!("Backtrace: {:?}", e.backtrace());
}
}
tokio-console 是 Tokio 提供的调试工具,可以用于定位异步任务的错误。
安装 tokio-console:
cargo install tokio-console
运行程序并使用 tokio-console:
RUSTFLAGS="--cfg tokio_unstable" cargo run
# Then run tokio-console separately
我们可以使用 format! 或 serde_json 库格式化错误信息,使错误信息更容易阅读。
use serde_json;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
}
async fn fetch_data() -> Result<(), AppError> {
let result = sqlx::query!("SELECT * FROM nonexistent_table").execute(&sqlx::PgPool::connect("postgresql://user:password@localhost:5432/mydb").await?).await;
result?;
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = fetch_data().await {
let error_json = serde_json::json!({
"error": e.to_string(),
"code": 500,
"message": "Internal server error",
"details": format!("{:?}", e)
});
println!("{}", serde_json::to_string_pretty(&error_json).unwrap());
}
}
问题:异步任务崩溃导致程序终止。
解决方案:
match 语句或 ? 操作符处理错误。tokio::spawn 的 JoinHandle 来捕获任务的崩溃。tokio-tracing 库记录崩溃的详细信息。问题:错误信息不够详细,无法定位问题所在。
解决方案:
thiserror 的 #[source] 属性构建错误链。tokio-tracing 库记录错误的位置和堆栈信息。serde_json 库格式化错误信息。问题:错误处理代码在多个地方重复。
解决方案:
thiserror 库。IntoResponse trait,统一处理错误响应。问题:异步任务泄漏导致内存泄漏。
解决方案:
tokio::task::JoinHandle::abort 方法。tokio-console 工具定位任务泄漏。异步错误处理是 Rust 异步编程的重要组成部分。通过深入理解异步错误的本质与分类、异步上下文中的标准 Result 处理、自定义异步错误类型的设计、超时与取消错误的处理、并发任务的错误聚合、异步错误的传递与传播、实战项目的错误处理体系以及异步错误调试与排查,我们可以编写出更高效、更安全的异步代码。
在实际项目中,我们应该根据项目的需求选择合适的错误处理方式。对于库开发,我们应该使用 thiserror 定义自定义错误类型,支持 Error trait 的实现。对于应用开发,我们可以使用 anyhow 快速处理各种类型的错误。同时,我们应该使用 tokio-tracing 库记录错误的详细信息,使用 tokio-console 工具定位异步任务的错误。
希望本章的内容能够帮助您深入掌握 Rust 异步编程的错误处理艺术,并在实际项目中应用。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online