跳到主要内容
Rust 异步编程的错误处理艺术 | 极客日志
Rust
Rust 异步编程的错误处理艺术 综述由AI生成 深入探讨 Rust 异步编程中的错误处理机制。内容涵盖异步与同步错误的差异分析,包括 IO、超时、取消及业务逻辑错误的分类处理。通过演示 Result 类型、? 操作符及自定义错误类型(thiserror/anyhow)的设计模式,解析了异步上下文中的标准处理方式。文章结合 Tokio 的 timeout 与 spawn 机制,展示了并发任务聚合与错误传播策略,并通过构建基于 Axum 和 SQLx 的 RESTful API 实战项目,演示了统一错误响应与中间件设计。最后介绍了 tracing 日志与 tokio-console 调试技巧,帮助开发者构建健壮可靠的异步系统。
未来可期 发布于 2026/3/16 更新于 2026/5/4 3 浏览Rust 异步编程的错误处理艺术
异步错误的本质与分类
在 Rust 同步编程中,错误通常通过 Result<T, E> 返回,Err 变体包含错误信息,程序会阻塞线程直到操作完成。而在异步编程中,操作结果是一个 Future<Output = Result<T, E>>,任务暂停直到完成,Err 可能包含 IO、超时或取消等异步特有错误。
同步与异步对比
同步读取文件时,线程会被阻塞:
use std::fs::File;
use std::io::Read;
fn read_file_sync () -> Result <String , std::io::Error> {
let mut file = File::open ("test.txt" )?;
let mut content = String ::new ();
file.read_to_string (&mut content)?;
Ok (content)
}
fn main () {
match read_file_sync () {
Ok (content) => println! ("File content: {}" , content),
Err (e) => println! ("Error reading file: {}" , e),
}
}
异步读取则让出控制权:
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn () < , std::io::Error> {
= File:: ( ). ?;
= :: ();
file. (& content). ?;
(content)
}
() {
(). {
(content) => ( , content),
(e) => ( , e),
}
}
read_file_async
->
Result
String
let
mut
file
open
"test.txt"
await
let
mut
content
String
new
read_to_string
mut
await
Ok
#[tokio::main]
async
fn
main
match
read_file_async
await
Ok
println!
"File content: {}"
Err
println!
"Error reading file: {}"
常见错误类型 IO 错误 :网络断开、文件读写失败等,通常由 std::io::Error 表示。
use tokio::net::TcpStream;
async fn connect_to_server () -> Result <TcpStream, std::io::Error> {
let stream = TcpStream::connect ("127.0.0.1:8080" ).await ?;
Ok (stream)
}
超时错误 :操作未在规定时间内完成。Tokio 的 timeout 函数会返回 tokio::time::error::Elapsed。
use tokio::time::{timeout, Duration};
async fn async_operation () {
tokio::time::sleep (Duration::from_secs (3 )).await ;
println! ("Operation completed" );
}
#[tokio::main]
async fn main () {
let result = timeout (Duration::from_secs (2 ), async_operation ()).await ;
match result {
Ok (_) => println! ("Success" ),
Err (e) => println! ("Error: {}" , e),
}
}
取消错误 :任务被主动终止。JoinHandle::abort 会返回 tokio::task::JoinError。
use tokio::time::sleep;
use std::time::Duration;
async fn task () {
sleep (Duration::from_secs (5 )).await ;
println! ("Task completed" );
}
#[tokio::main]
async fn main () {
let handle = tokio::spawn (task ());
sleep (Duration::from_secs (2 )).await ;
handle.abort ();
let result = handle.await ;
match result {
Ok (_) => println! ("Success" ),
Err (e) => println! ("Error: {}" , e),
}
}
业务逻辑错误 :如用户不存在、余额不足等,需自定义错误类型。
#[derive(Debug)]
enum BusinessError {
UserNotFound,
InsufficientBalance,
InvalidPassword,
}
impl std ::fmt::Display for BusinessError {
fn fmt (&self , f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
BusinessError::UserNotFound => write! (f, "User not found" ),
BusinessError::InsufficientBalance => write! (f, "Insufficient balance" ),
BusinessError::InvalidPassword => write! (f, "Invalid password" ),
}
}
}
impl std ::error::Error for BusinessError {}
async fn login (username: &str , password: &str ) -> Result <(), BusinessError> {
if username != "admin" {
return Err (BusinessError::UserNotFound);
}
if password != "password" {
return Err (BusinessError::InvalidPassword);
}
Ok (())
}
系统错误 :内存不足、权限不足等,通常也映射为 std::io::Error。
标准 Result 在异步上下文中的应用
? 操作符的使用 在异步函数中,? 操作符的行为与同步函数一致。遇到 Err 时,它会立即返回并包装错误。
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use std::error::Error;
async fn read_file_and_parse () -> Result <Vec <u32 >, Box <dyn Error>> {
let mut file = File::open ("numbers.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
let numbers : Vec <u32 > = content
.lines ()
.map (|line| line.parse::<u32 >())
.collect::<Result <_, _>>()?;
Ok (numbers)
}
#[tokio::main]
async fn main () {
match read_file_and_parse ().await {
Ok (numbers) => println! ("Numbers: {:?}" , numbers),
Err (e) => println! ("Error: {}" , e),
}
}
unwrap/expect 的风险 在异步任务中使用 unwrap() 或 expect() 会导致任务崩溃且难以追踪。建议始终使用 match 或 ?。
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn read_file () {
let mut file = File::open ("nonexistent.txt" ).await .unwrap ();
let mut content = String ::new ();
file.read_to_string (&mut content).await .unwrap ();
println! ("File content: {}" , content);
}
async fn read_file_safe () -> Result <(), Box <dyn std::error::Error>> {
let mut file = File::open ("nonexistent.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
println! ("File content: {}" , content);
Ok (())
}
Box 的统一处理 Box<dyn Error> 是统一错误类型的常用手段,可容纳任何实现 Error trait 的类型。
use tokio::time::{timeout, Duration};
use std::error::Error;
async fn async_operation () -> Result <(), Box <dyn Error>> {
let result = timeout (Duration::from_secs (2 ), tokio::time::sleep (Duration::from_secs (3 ))).await ?;
Ok (result)
}
async fn read_file () -> Result <String , Box <dyn Error>> {
let mut file = tokio::fs::File::open ("test.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
Ok (content)
}
async fn combined_operation () -> Result <(), Box <dyn Error>> {
async_operation ().await ?;
let content = read_file ().await ?;
println! ("File content: {}" , content);
Ok (())
}
自定义异步错误类型设计
thiserror 与 anyhow 的选择
thiserror :适合库开发,编译期生成代码,零开销。
anyhow :适合应用开发,提供丰富的错误链支持。
[dependencies]
thiserror = "1.0"
anyhow = "1.0"
使用 thiserror 定义错误 利用 #[derive(Error)] 宏快速实现 Error trait。
use thiserror::Error;
use tokio::time::error::Elapsed;
#[derive(Error, Debug)]
pub enum AppError {
#[error("IO error: {0}" )]
Io (#[from] std::io::Error),
#[error("Timeout error: {0}" )]
Timeout (#[from] Elapsed),
#[error("Business error: {0}" )]
Business (#[from] BusinessError),
#[error("System error: {0}" )]
System (String ),
}
#[derive(Error, Debug)]
pub enum BusinessError {
#[error("User not found" )]
UserNotFound,
#[error("Insufficient balance" )]
InsufficientBalance,
#[error("Invalid password" )]
InvalidPassword,
}
async fn login (username: &str , password: &str ) -> Result <(), AppError> {
if username != "admin" {
return Err (AppError::Business (BusinessError::UserNotFound));
}
if password != "password" {
return Err (AppError::Business (BusinessError::InvalidPassword));
}
Ok (())
}
async fn connect_to_server () -> Result <tokio::net::TcpStream, AppError> {
let stream = tokio::net::TcpStream::connect ("127.0.0.1:8080" ).await ?;
Ok (stream)
}
使用 anyhow 简化处理 anyhow 的 Result<T> 是 Result<T, anyhow::Error> 的别名,便于快速原型开发。
use anyhow::Result ;
use tokio::time::{timeout, Duration};
async fn async_operation () -> Result <()> {
let result = timeout (Duration::from_secs (2 ), tokio::time::sleep (Duration::from_secs (3 ))).await ?;
Ok (result)
}
async fn read_file () -> Result <String > {
let mut file = tokio::fs::File::open ("test.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
Ok (content)
}
async fn combined_operation () -> Result <()> {
async_operation ().await ?;
let content = read_file ().await ?;
println! ("File content: {}" , content);
Ok (())
}
#[tokio::main]
async fn main () {
if let Err (e) = combined_operation ().await {
println! ("Error: {}" , e);
println! ("Chain:" );
for (i, cause) in e.chain ().enumerate () {
println! (" {}: {}" , i, cause);
}
}
}
库与应用的分层策略 建议在库中使用 thiserror 定义明确错误,在应用层使用 anyhow 进行转换和展示。
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MyLibError {
#[error("Invalid input: {0}" )]
InvalidInput (String ),
#[error("Network error: {0}" )]
Network (#[from] reqwest::Error),
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
}
pub async fn fetch_data () -> Result <String , MyLibError> {
let response = reqwest::get ("https://example.com/api/data" ).await ?;
let data = response.text ().await ?;
Ok (data)
}
use my_lib::fetch_data;
use anyhow::Result ;
async fn process_data () -> Result <()> {
let data = fetch_data ().await ?;
println! ("Data: {}" , data);
Ok (())
}
#[tokio::main]
async fn main () {
if let Err (e) = process_data ().await {
println! ("Error: {}" , e);
}
}
超时与取消错误的处理
超时处理 tokio::time::timeout 返回 Result<T, Elapsed>,需显式处理超时情况。
use tokio::time::{timeout, Duration};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Timeout error" )]
Timeout,
#[error("Other error: {0}" )]
Other (#[from] anyhow::Error),
}
async fn async_operation () -> Result <(), AppError> {
tokio::time::sleep (Duration::from_secs (5 )).await ;
Ok (())
}
async fn run_with_timeout () -> Result <(), AppError> {
let result = timeout (Duration::from_secs (3 ), async_operation ()).await ;
match result {
Ok (_) => Ok (()),
Err (_) => Err (AppError::Timeout),
}
}
#[tokio::main]
async fn main () {
match run_with_timeout ().await {
Ok (_) => println! ("Success" ),
Err (AppError::Timeout) => println! ("Error: Operation timed out" ),
Err (e) => println! ("Error: {}" , e),
}
}
取消处理 JoinHandle::abort 会触发取消,捕获 JoinError 判断是否被取消。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task canceled" )]
Canceled,
#[error("Other error: {0}" )]
Other (#[from] anyhow::Error),
}
async fn task () -> Result <(), AppError> {
for i in 0 ..5 {
println! ("Task iteration {}" , i);
sleep (Duration::from_secs (1 )).await ;
}
Ok (())
}
async fn run_with_cancel () -> Result <(), AppError> {
let handle = tokio::spawn (task ());
sleep (Duration::from_secs (2 )).await ;
handle.abort ();
let result = handle.await ;
match result {
Ok (_) => Ok (()),
Err (e) => {
if e.is_cancelled () {
Err (AppError::Canceled)
} else {
Err (AppError::Other (e.into ()))
}
}
}
}
组合处理 结合超时与取消信号(如 oneshot)可实现更精细的控制。
use tokio::time::{timeout, Duration};
use tokio::sync::oneshot;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Timeout error" )]
Timeout,
#[error("Task canceled" )]
Canceled,
#[error("Other error: {0}" )]
Other (#[from] anyhow::Error),
}
async fn async_operation (mut cancel_rx: oneshot::Receiver<()>) -> Result <(), AppError> {
for i in 0 ..5 {
println! ("Task iteration {}" , i);
tokio::select! {
_ = tokio::time::sleep (Duration::from_secs (1 )) => {},
_ = &mut cancel_rx => return Err (AppError::Canceled),
}
}
Ok (())
}
async fn run_with_timeout_and_cancel () -> Result <(), AppError> {
let (cancel_tx, cancel_rx) = oneshot::channel ();
let handle = tokio::spawn (async_operation (cancel_rx));
let result = timeout (
Duration::from_secs (3 ),
async move {
let _ = cancel_tx.send (());
handle.await
},
)
.await ;
match result {
Ok (Ok (_)) => Ok (()),
Ok (Err (e)) => {
if e.is_cancelled () {
Err (AppError::Canceled)
} else {
Err (AppError::Other (e.into ()))
}
}
Err (_) => Err (AppError::Timeout),
}
}
并发任务的错误聚合
join! 与 try_join!
join! :等待所有任务,不聚合错误。
try_join! :任一任务失败即返回错误。
use tokio::join;
use tokio::try_join;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task1 error" )]
Task1,
#[error("Task2 error" )]
Task2,
}
async fn task1 () -> Result <(), AppError> {
tokio::time::sleep (tokio::time::Duration::from_secs (1 )).await ;
Ok (())
}
async fn task2 () -> Result <(), AppError> {
tokio::time::sleep (tokio::time::Duration::from_secs (2 )).await ;
Err (AppError::Task2)
}
#[tokio::main]
async fn main () {
println! ("Using join!:" );
let (res1, res2) = join!(task1 (), task2 ());
println! ("Task1 result: {:?}" , res1);
println! ("Task2 result: {:?}" , res2);
println! ("\nUsing try_join!:" );
let result = try_join!(task1 (), task2 ());
println! ("Result: {:?}" , result);
}
批量任务聚合 使用 futures::future::try_join_all 处理动态任务列表。
use futures::future::try_join_all;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task {0} error" )]
Task (usize ),
}
async fn task (i: usize ) -> Result <usize , AppError> {
tokio::time::sleep (tokio::time::Duration::from_secs (i as u64 )).await ;
if i % 2 == 0 {
Ok (i)
} else {
Err (AppError::Task (i))
}
}
#[tokio::main]
async fn main () {
let tasks = (0 ..5 ).map (|i| task (i));
let result = try_join_all (tasks).await ;
println! ("Result: {:?}" , result);
}
spawn 的错误捕获 tokio::spawn 返回 JoinHandle,需调用 await 获取结果。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task error" )]
Task,
}
async fn task () -> Result <(), AppError> {
sleep (Duration::from_secs (2 )).await ;
Err (AppError::Task)
}
#[tokio::main]
async fn main () {
let handle = tokio::spawn (task ());
let result = handle.await ;
println! ("Task result: {:?}" , result);
}
异步错误的传递与传播
构建错误链 使用 thiserror 的 #[source] 属性保留底层错误信息。
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Failed to fetch data: {0}" )]
FetchData (#[source] reqwest::Error),
#[error("Failed to parse data: {0}" )]
ParseData (#[source] serde_json::Error),
}
async fn fetch_and_parse_data () -> Result <serde_json::Value, AppError> {
let response = reqwest::get ("https://example.com/api/data" ).await .map_err (AppError::FetchData)?;
let data = response.text ().await .map_err (AppError::FetchData)?;
let json = serde_json::from_str (&data).map_err (AppError::ParseData)?;
Ok (json)
}
#[tokio::main]
async fn main () {
let result = fetch_and_parse_data ().await ;
match result {
Ok (json) => println! ("Data: {}" , serde_json::to_string_pretty (&json).unwrap ()),
Err (e) => {
println! ("Error: {}" , e);
println! ("Cause chain:" );
let mut cause = e.source ();
while let Some (c) = cause {
println! (" - {}" , c);
cause = c.source ();
}
}
}
}
任务间错误传递 通过通道(Channel)在异步任务间传递错误状态。
use tokio::sync::oneshot;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task error" )]
Task,
}
async fn task (tx: oneshot::Sender<Result <(), AppError>>) {
tokio::time::sleep (tokio::time::Duration::from_secs (2 )).await ;
let _ = tx.send (Err (AppError::Task));
}
#[tokio::main]
async fn main () {
let (tx, rx) = oneshot::channel ();
tokio::spawn (task (tx));
let result = rx.await .unwrap ();
println! ("Task result: {:?}" , result);
}
父子任务错误传递 父任务可通过 JoinHandle 捕获子任务错误。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Child task error" )]
ChildTask,
}
async fn child_task () -> Result <(), AppError> {
sleep (Duration::from_secs (1 )).await ;
Err (AppError::ChildTask)
}
async fn parent_task () -> Result <(), AppError> {
let handle = tokio::spawn (child_task ());
let result = handle.await ?;
Ok (result)
}
#[tokio::main]
async fn main () {
let result = parent_task ().await ;
println! ("Parent task result: {:?}" , result);
}
实战项目:异步 API 的错误处理体系 我们将构建一个基于 Axum 和 SQLx 的 RESTful API,实现完整的错误处理闭环。
架构设计
HTTP 框架:Axum
数据库访问:SQLx
错误定义:thiserror
序列化:serde
日志监控:tracing
依赖配置 [dependencies]
axum = { version = "0.5" , features = ["ws" ] }
tokio = { version = "1.0" , features = ["full" ] }
sqlx = { version = "0.6" , features = ["postgres" , "runtime-tokio-rustls" ] }
thiserror = "1.0"
serde = { version = "1.0" , features = ["derive" ] }
serde_json = "1.0"
uuid = { version = "1.1" , features = ["v4" ] }
tracing = "0.1"
tracing-subscriber = { version = "0.3" , features = ["env-filter" , "json" ] }
模型与数据库 use sqlx::PgPool;
pub async fn create_pool (database_url: &str ) -> PgPool {
PgPool::connect (database_url).await .unwrap ()
}
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
pub struct User {
pub id: Uuid,
pub name: String ,
pub email: String ,
pub password: String ,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateUserRequest {
pub name: String ,
pub email: String ,
pub password: String ,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserRequest {
pub name: Option <String >,
pub email: Option <String >,
pub password: Option <String >,
}
统一错误响应 在 src/errors.rs 中定义错误枚举并实现 IntoResponse:
use thiserror::Error;
use axum::{http::StatusCode, response::IntoResponse, Json};
use serde::{Serialize, Deserialize};
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
#[error("Request validation error: {0}" )]
Validation (String ),
#[error("User not found" )]
UserNotFound,
#[error("User already exists" )]
UserAlreadyExists,
#[error("Internal server error" )]
InternalServerError,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub error: String ,
pub code: u16 ,
pub message: String ,
}
impl IntoResponse for AppError {
fn into_response (self ) -> axum::response::Response {
let (status, code, message) = match self {
AppError::Database (e) => {
tracing::error!("Database error: {:?}" , e);
(StatusCode::INTERNAL_SERVER_ERROR, 500 , "Internal server error" )
}
AppError::Validation (msg) => (StatusCode::BAD_REQUEST, 400 , msg.as_str ()),
AppError::UserNotFound => (StatusCode::NOT_FOUND, 404 , "User not found" ),
AppError::UserAlreadyExists => (StatusCode::CONFLICT, 409 , "User already exists" ),
AppError::InternalServerError => (StatusCode::INTERNAL_SERVER_ERROR, 500 , "Internal server error" ),
};
let response = ErrorResponse {
error: self .to_string (),
code,
message: message.to_string (),
};
(status, Json (response)).into_response ()
}
}
业务逻辑层 在 src/repository.rs 中处理数据库交互:
use sqlx::PgPool;
use uuid::Uuid;
use crate::models::*;
use crate::errors::AppError;
pub async fn create_user (pool: &PgPool, user: CreateUserRequest) -> Result <User, AppError> {
let exists = sqlx::query_scalar!("SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)" , user.email)
.fetch_one (pool)
.await ?;
if exists {
return Err (AppError::UserAlreadyExists);
}
let user = sqlx::query_as!(User,
"INSERT INTO users (id, name, email, password, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *" ,
Uuid::new_v4 (), user.name, user.email, user.password, chrono::Utc::now (), chrono::Utc::now ()
).fetch_one (pool).await ?;
Ok (user)
}
pub async fn get_user_by_id (pool: &PgPool, id: Uuid) -> Result <User, AppError> {
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1" , id)
.fetch_one (pool)
.map_err (|e| {
if e.as_database_error ().is_some () && e.to_string ().contains ("no row found" ) {
AppError::UserNotFound
} else {
AppError::Database (e)
}
})?;
Ok (user)
}
pub async fn update_user (pool: &PgPool, id: Uuid, user: UpdateUserRequest) -> Result <User, AppError> {
let updated_user = sqlx::query_as!(User,
"UPDATE users SET name = COALESCE($1, name), email = COALESCE($2, email), password = COALESCE($3, password), updated_at = $4 WHERE id = $5 RETURNING *" ,
user.name, user.email, user.password, chrono::Utc::now (), id
).fetch_one (pool).await .map_err (|e| {
if e.as_database_error ().is_some () && e.to_string ().contains ("no row found" ) {
AppError::UserNotFound
} else {
AppError::Database (e)
}
})?;
Ok (updated_user)
}
pub async fn delete_user (pool: &PgPool, id: Uuid) -> Result <(), AppError> {
let result = sqlx::query!("DELETE FROM users WHERE id = $1" , id).execute (pool).await ?;
if result.rows_affected () == 0 {
return Err (AppError::UserNotFound);
}
Ok (())
}
服务层 src/service.rs 负责参数校验:
use crate::repository::*;
use crate::models::*;
use crate::errors::AppError;
use sqlx::PgPool;
pub async fn create_user_service (pool: &PgPool, user: CreateUserRequest) -> Result <User, AppError> {
if user.name.is_empty () {
return Err (AppError::Validation ("Name is required" .to_string ()));
}
if user.email.is_empty () {
return Err (AppError::Validation ("Email is required" .to_string ()));
}
if !user.email.contains ("@" ) {
return Err (AppError::Validation ("Invalid email format" .to_string ()));
}
if user.password.len () < 6 {
return Err (AppError::Validation ("Password must be at least 6 characters long" .to_string ()));
}
create_user (pool, user).await
}
pub async fn get_user_by_id_service (pool: &PgPool, id: uuid::Uuid) -> Result <User, AppError> {
get_user_by_id (pool, id).await
}
pub async fn update_user_service (pool: &PgPool, id: uuid::Uuid, user: UpdateUserRequest) -> Result <User, AppError> {
if let Some (email) = &user.email {
if !email.contains ("@" ) {
return Err (AppError::Validation ("Invalid email format" .to_string ()));
}
}
if let Some (password) = &user.password {
if password.len () < 6 {
return Err (AppError::Validation ("Password must be at least 6 characters long" .to_string ()));
}
}
update_user (pool, id, user).await
}
pub async fn delete_user_service (pool: &PgPool, id: uuid::Uuid) -> Result <(), AppError> {
delete_user (pool, id).await
}
路由与入口 use axum::{routing::{get, post, put, delete}, extract::Path, extract::State, Json};
use uuid::Uuid;
use crate::models::*;
use crate::service::*;
use crate::errors::AppError;
use sqlx::PgPool;
pub fn create_routes () -> axum::Router<PgPool> {
axum::Router::new ()
.route ("/users" , post (create_user_handler))
.route ("/users/:id" , get (get_user_handler))
.route ("/users/:id" , put (update_user_handler))
.route ("/users/:id" , delete (delete_user_handler))
}
async fn create_user_handler (State (pool): State<PgPool>, Json (req): Json<CreateUserRequest>) -> Result <Json<User>, AppError> {
let user = create_user_service (&pool, req).await ?;
Ok (Json (user))
}
async fn get_user_handler (State (pool): State<PgPool>, Path (id): Path<Uuid>) -> Result <Json<User>, AppError> {
let user = get_user_by_id_service (&pool, id).await ?;
Ok (Json (user))
}
async fn update_user_handler (State (pool): State<PgPool>, Path (id): Path<Uuid>, Json (req): Json<UpdateUserRequest>) -> Result <Json<User>, AppError> {
let user = update_user_service (&pool, id, req).await ?;
Ok (Json (user))
}
async fn delete_user_handler (State (pool): State<PgPool>, Path (id): Path<Uuid>) -> Result <(), AppError> {
delete_user_service (&pool, id).await
}
use axum::Router;
use tracing_subscriber::prelude::*;
use tracing::info;
use crate::db::create_pool;
use crate::routes::create_routes;
mod db;
mod errors;
mod models;
mod repository;
mod routes;
mod service;
#[tokio::main]
async fn main () {
tracing_subscriber::registry ()
.with (tracing_subscriber::EnvFilter::new ("info" ))
.with (tracing_subscriber::fmt::layer ())
.init ();
let database_url = "postgresql://user:password@localhost:5432/mydb" ;
let pool = create_pool (database_url).await ;
info!("Database pool created" );
let app : Router<sqlx::PgPool> = create_routes ();
let listener = tokio::net::TcpListener::bind ("0.0.0.0:3000" ).await .unwrap ();
info!("API server running on http://0.0.0.0:3000" );
axum::serve (listener, app.with_state (pool)).await .unwrap ();
}
数据库迁移 mkdir -p migrations/20230101000000_create_users
CREATE TABLE users (
id UUID PRIMARY KEY ,
name VARCHAR (255 ) NOT NULL ,
email VARCHAR (255 ) NOT NULL UNIQUE ,
password VARCHAR (255 ) NOT NULL ,
created_at TIMESTAMP NOT NULL ,
updated_at TIMESTAMP NOT NULL
);
测试 API
创建用户:
curl -X POST -H "Content-Type: application/json" -d '{"name": "Alice", "email": "[email protected] ", "password": "password123"}' http://localhost:3000/users
查询用户:
curl -X GET http://localhost:3000/users/{user_id}
更新用户:
curl -X PUT -H "Content-Type: application/json" -d '{"name": "Alice Smith"}' http://localhost:3000/users/{user_id}
删除用户:
curl -X DELETE http://localhost:3000/users/{user_id}
调试与排查技巧
使用 tracing 记录错误 use tracing::error;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
}
async fn fetch_data () -> Result <(), AppError> {
let result = sqlx::query!("SELECT * FROM nonexistent_table" ).execute (&sqlx::PgPool::connect ("postgresql://user:password@localhost:5432/mydb" ).await ?).await ;
result?;
Ok (())
}
#[tokio::main]
async fn main () {
tracing_subscriber::registry ()
.with (tracing_subscriber::EnvFilter::new ("info" ))
.with (tracing_subscriber::fmt::layer ())
.init ();
if let Err (e) = fetch_data ().await {
error!("Error fetching data: {:?}" , e);
error!("Backtrace: {:?}" , e.backtrace ());
}
}
tokio-console 定位问题 cargo install tokio-console
RUSTFLAGS="--cfg tokio_unstable" cargo run
错误信息格式化 use serde_json;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
}
async fn fetch_data () -> Result <(), AppError> {
let result = sqlx::query!("SELECT * FROM nonexistent_table" ).execute (&sqlx::PgPool::connect ("postgresql://user:password@localhost:5432/mydb" ).await ?).await ;
result?;
Ok (())
}
#[tokio::main]
async fn main () {
if let Err (e) = fetch_data ().await {
let error_json = serde_json::json!({
"error" : e.to_string (),
"code" : 500 ,
"message" : "Internal server error" ,
"details" : format! ("{:?}" , e)
});
println! ("{}" , serde_json::to_string_pretty (&error_json).unwrap ());
}
}
常见问题与最佳实践 异步任务崩溃 :务必在任务内部处理错误或使用 JoinHandle 捕获,避免静默失败。
错误信息模糊 :利用 thiserror 的 #[source] 构建完整错误链,配合 tracing 记录堆栈。
代码重复 :定义统一的错误类型并实现 IntoResponse,通过中间件集中处理全局异常。
任务泄漏 :确保所有 spawn 的任务都有明确的取消机制或完成路径,定期检查 JoinHandle 状态。
总结 Rust 异步编程中的错误处理不仅关乎代码健壮性,更直接影响系统的可维护性。理解异步错误的本质,合理选用 Result、thiserror 或 anyhow,并结合 Tokio 提供的超时与取消机制,能显著提升开发效率。在实际项目中,建立统一的错误响应体系,配合完善的日志与调试工具,是构建高可用异步服务的关键。
相关免费在线工具 Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
JSON美化和格式化 将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online