跳到主要内容
Rust 异步编程的错误处理实战 | 极客日志
Rust
Rust 异步编程的错误处理实战 Rust 异步编程中错误处理至关重要。本文探讨同步与异步错误的区别,涵盖 IO、超时、取消及业务逻辑错误分类。通过 Result、? 操作符及 thiserror/anyhow 库实现标准化处理。结合 Tokio 的 timeout 和 spawn 机制,演示并发任务错误聚合与传递策略。最后通过 Axum + SQLx 实战项目,展示从自定义错误类型到统一响应中间件的完整体系,辅以 tracing 调试技巧,帮助开发者构建健壮的高并发服务。
Rust 异步编程的错误处理艺术
一、异步错误的本质与分类
1.1 异步错误与同步错误的区别
在 Rust 同步编程中,错误通常是通过 Result<T, E> 类型返回的,Err 变体包含了错误信息,程序会阻塞线程直到操作完成。而在异步编程中,操作的结果是一个 Future<Output = Result<T, E>>,程序会暂停任务直到操作完成,Err 变体可能是 IO 错误、超时错误、取消错误等异步场景特有的错误。
同步错误示例:
use std::fs::File;
use std::io::Read;
fn read_file_sync () -> Result <String , std::io::Error> {
let mut file = File::open ("test.txt" )?;
let mut content = String ::new ();
file.read_to_string (&mut content)?;
Ok (content)
}
fn main () {
match read_file_sync () {
Ok (content) => println! ("File content: {}" , content),
Err (e) => println! ("Error reading file: {}" , e),
}
}
异步错误示例:
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn read_file_async () -> Result <String , std::io::Error> {
= File:: ( ). ?;
= :: ();
file. (& content). ?;
(content)
}
() {
(). {
(content) => ( , content),
(e) => ( , e),
}
}
let
mut
file
open
"test.txt"
await
let
mut
content
String
new
read_to_string
mut
await
Ok
#[tokio::main]
async
fn
main
match
read_file_async
await
Ok
println!
"File content: {}"
Err
println!
"Error reading file: {}"
1.2 异步错误的分类
1. IO 错误 IO 错误是异步编程中最常见的错误类型,包括网络连接失败、文件读取失败、数据库连接失败等。这类错误通常由 std::io::Error 表示。
use tokio::net::TcpStream;
async fn connect_to_server () -> Result <TcpStream, std::io::Error> {
let stream = TcpStream::connect ("127.0.0.1:8080" ).await ?;
Ok (stream)
}
2. 超时错误 超时错误是指异步操作在指定时间内没有完成。Tokio 提供了 tokio::time::timeout 函数来设置超时,超时会返回 tokio::time::error::Elapsed 错误。
use tokio::time::{timeout, Duration};
async fn async_operation () {
tokio::time::sleep (Duration::from_secs (3 )).await ;
println! ("Operation completed" );
}
#[tokio::main]
async fn main () {
let result = timeout (Duration::from_secs (2 ), async_operation ()).await ;
match result {
Ok (_) => println! ("Success" ),
Err (e) => println! ("Error: {}" , e),
}
}
3. 取消错误 取消错误是指异步任务被主动取消。Tokio 的 tokio::task::JoinHandle::abort 方法可以取消任务,取消会返回 tokio::task::JoinError 错误。
use tokio::time::sleep;
use std::time::Duration;
async fn task () {
sleep (Duration::from_secs (5 )).await ;
println! ("Task completed" );
}
#[tokio::main]
async fn main () {
let handle = tokio::spawn (task ());
sleep (Duration::from_secs (2 )).await ;
handle.abort ();
let result = handle.await ;
match result {
Ok (_) => println! ("Success" ),
Err (e) => println! ("Error: {}" , e),
}
}
4. 业务逻辑错误 业务逻辑错误是指应用程序的业务规则违反,例如用户不存在、余额不足、密码错误等。这类错误需要我们自定义错误类型来表示。
#[derive(Debug)]
enum BusinessError {
UserNotFound,
InsufficientBalance,
InvalidPassword,
}
impl std ::fmt::Display for BusinessError {
fn fmt (&self , f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
BusinessError::UserNotFound => write! (f, "User not found" ),
BusinessError::InsufficientBalance => write! (f, "Insufficient balance" ),
BusinessError::InvalidPassword => write! (f, "Invalid password" ),
}
}
}
impl std ::error::Error for BusinessError {}
async fn login (username: &str , password: &str ) -> Result <(), BusinessError> {
if username != "admin" {
return Err (BusinessError::UserNotFound);
}
if password != "password" {
return Err (BusinessError::InvalidPassword);
}
Ok (())
}
5. 系统错误 系统错误是指操作系统级别的错误,例如内存不足、权限不足等。这类错误通常由 std::io::Error 表示。
use tokio::fs::File;
async fn read_sensitive_file () -> Result <String , std::io::Error> {
let mut file = File::open ("/etc/shadow" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
Ok (content)
}
二、异步上下文中的标准 Result 处理
2.1 ?操作符的异步应用 在异步函数中,? 操作符的使用方式与同步函数相同。当遇到 Err 变体时,它会将错误包装在 Result<_, Box<dyn Error>> 类型中并返回。
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use std::error::Error;
async fn read_file_and_parse () -> Result <Vec <u32 >, Box <dyn Error>> {
let mut file = File::open ("numbers.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
let numbers : Vec <u32 > = content
.lines ()
.map (|line| line.parse::<u32 >())
.collect::<Result <_, _>>()?;
Ok (numbers)
}
#[tokio::main]
async fn main () {
match read_file_and_parse ().await {
Ok (numbers) => println! ("Numbers: {:?}" , numbers),
Err (e) => println! ("Error: {}" , e),
}
}
2.2 unwrap/expect 的注意事项 在异步任务中使用 unwrap() 或 expect() 是非常危险的,因为它们会导致任务崩溃。我们应该尽量使用 match 语句或 ? 操作符来处理错误。
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn read_file () {
let mut file = File::open ("nonexistent.txt" ).await .unwrap ();
let mut content = String ::new ();
file.read_to_string (&mut content).await .unwrap ();
println! ("File content: {}" , content);
}
#[tokio::main]
async fn main () {
read_file ().await ;
}
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn read_file () -> Result <(), Box <dyn std::error::Error>> {
let mut file = File::open ("nonexistent.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
println! ("File content: {}" , content);
Ok (())
}
#[tokio::main]
async fn main () {
match read_file ().await {
Ok (_) => println! ("Success" ),
Err (e) => println! ("Error: {}" , e),
}
}
2.3 Box 的应用 Box<dyn Error> 是 Rust 中用于统一错误类型的常用方法。它可以容纳任何实现了 Error trait 的类型,包括标准库的错误类型和自定义错误类型。
use tokio::time::{timeout, Duration};
use std::error::Error;
async fn async_operation () -> Result <(), Box <dyn Error>> {
let result = timeout (Duration::from_secs (2 ), tokio::time::sleep (Duration::from_secs (3 ))).await ?;
Ok (result)
}
async fn read_file () -> Result <String , Box <dyn Error>> {
let mut file = tokio::fs::File::open ("test.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
Ok (content)
}
async fn combined_operation () -> Result <(), Box <dyn Error>> {
async_operation ().await ?;
let content = read_file ().await ?;
println! ("File content: {}" , content);
Ok (())
}
#[tokio::main]
async fn main () {
match combined_operation ().await {
Ok (_) => println! ("Success" ),
Err (e) => println! ("Error: {}" , e),
}
}
三、自定义异步错误类型的设计
3.1 thiserror 与 anyhow 的对比 在 Rust 中,有两个常用的错误处理库:thiserror 和 anyhow。
thiserror :用于定义自定义错误类型,支持 Error trait 的实现,适合库开发。
anyhow :用于快速原型开发,支持 Error 链,适合应用开发。
[dependencies]
thiserror = "1.0"
anyhow = "1.0"
3.2 用 thiserror 定义自定义错误类型 thiserror 提供了 #[derive(Error)] 宏,可以快速生成 Error trait 的实现。
use thiserror::Error;
use tokio::time::error::Elapsed;
#[derive(Error, Debug)]
pub enum AppError {
#[error("IO error: {0}" )]
Io (#[from] std::io::Error),
#[error("Timeout error: {0}" )]
Timeout (#[from] Elapsed),
#[error("Business error: {0}" )]
Business (#[from] BusinessError),
#[error("System error: {0}" )]
System (String ),
}
#[derive(Error, Debug)]
pub enum BusinessError {
#[error("User not found" )]
UserNotFound,
#[error("Insufficient balance" )]
InsufficientBalance,
#[error("Invalid password" )]
InvalidPassword,
}
async fn login (username: &str , password: &str ) -> Result <(), AppError> {
if username != "admin" {
return Err (AppError::Business (BusinessError::UserNotFound));
}
if password != "password" {
return Err (AppError::Business (BusinessError::InvalidPassword));
}
Ok (())
}
async fn connect_to_server () -> Result <tokio::net::TcpStream, AppError> {
let stream = tokio::net::TcpStream::connect ("127.0.0.1:8080" ).await ?;
Ok (stream)
}
3.3 用 anyhow 处理错误 anyhow 提供了 Result<T> 类型,它是 Result<T, anyhow::Error> 的别名。使用 anyhow 可以快速处理各种类型的错误。
use anyhow::Result ;
use tokio::time::{timeout, Duration};
async fn async_operation () -> Result <()> {
let result = timeout (Duration::from_secs (2 ), tokio::time::sleep (Duration::from_secs (3 ))).await ?;
Ok (result)
}
async fn read_file () -> Result <String > {
let mut file = tokio::fs::File::open ("test.txt" ).await ?;
let mut content = String ::new ();
file.read_to_string (&mut content).await ?;
Ok (content)
}
async fn combined_operation () -> Result <()> {
async_operation ().await ?;
let content = read_file ().await ?;
println! ("File content: {}" , content);
Ok (())
}
#[tokio::main]
async fn main () {
if let Err (e) = combined_operation ().await {
println! ("Error: {}" , e);
println! ("Chain:" );
for (i, cause) in e.chain ().enumerate () {
println! (" {}: {}" , i, cause);
}
}
}
3.4 自定义错误类型的实战应用 我们可以结合 thiserror 和 anyhow,在库中使用 thiserror 定义自定义错误类型,在应用中使用 anyhow 处理错误。
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MyLibError {
#[error("Invalid input: {0}" )]
InvalidInput (String ),
#[error("Network error: {0}" )]
Network (#[from] reqwest::Error),
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
}
pub async fn fetch_data () -> Result <String , MyLibError> {
let response = reqwest::get ("https://example.com/api/data" ).await ?;
let data = response.text ().await ?;
Ok (data)
}
use my_lib::fetch_data;
use anyhow::Result ;
async fn process_data () -> Result <()> {
let data = fetch_data ().await ?;
println! ("Data: {}" , data);
Ok (())
}
#[tokio::main]
async fn main () {
if let Err (e) = process_data ().await {
println! ("Error: {}" , e);
}
}
四、超时与取消错误的处理
4.1 超时错误的处理 Tokio 的 tokio::time::timeout 函数会返回 Result<T, Elapsed> 类型,我们需要对 Elapsed 错误进行处理。
use tokio::time::{timeout, Duration};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Timeout error" )]
Timeout,
#[error("Other error: {0}" )]
Other (#[from] anyhow::Error),
}
async fn async_operation () -> Result <(), AppError> {
tokio::time::sleep (Duration::from_secs (5 )).await ;
Ok (())
}
async fn run_with_timeout () -> Result <(), AppError> {
let result = timeout (Duration::from_secs (3 ), async_operation ()).await ;
match result {
Ok (_) => Ok (()),
Err (_) => Err (AppError::Timeout),
}
}
#[tokio::main]
async fn main () {
match run_with_timeout ().await {
Ok (_) => println! ("Success" ),
Err (AppError::Timeout) => println! ("Error: Operation timed out" ),
Err (e) => println! ("Error: {}" , e),
}
}
4.2 取消错误的处理 Tokio 的 tokio::task::JoinHandle::abort 方法会取消任务,取消会返回 tokio::task::JoinError 错误。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task canceled" )]
Canceled,
#[error("Other error: {0}" )]
Other (#[from] anyhow::Error),
}
async fn task () -> Result <(), AppError> {
for i in 0 ..5 {
println! ("Task iteration {}" , i);
sleep (Duration::from_secs (1 )).await ;
}
Ok (())
}
async fn run_with_cancel () -> Result <(), AppError> {
let handle = tokio::spawn (task ());
sleep (Duration::from_secs (2 )).await ;
handle.abort ();
let result = handle.await ;
match result {
Ok (_) => Ok (()),
Err (e) => {
if e.is_cancelled () {
Err (AppError::Canceled)
} else {
Err (AppError::Other (e.into ()))
}
}
}
}
#[tokio::main]
async fn main () {
match run_with_cancel ().await {
Ok (_) => println! ("Success" ),
Err (AppError::Canceled) => println! ("Error: Task canceled" ),
Err (e) => println! ("Error: {}" , e),
}
}
4.3 超时与取消的组合处理 在实际应用中,我们可能需要同时处理超时和取消错误。
use tokio::time::{timeout, Duration};
use tokio::sync::oneshot;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Timeout error" )]
Timeout,
#[error("Task canceled" )]
Canceled,
#[error("Other error: {0}" )]
Other (#[from] anyhow::Error),
}
async fn async_operation (mut cancel_rx: oneshot::Receiver<()>) -> Result <(), AppError> {
for i in 0 ..5 {
println! ("Task iteration {}" , i);
tokio::select! {
_ = tokio::time::sleep (Duration::from_secs (1 )) => {},
_ = &mut cancel_rx => return Err (AppError::Canceled),
}
}
Ok (())
}
async fn run_with_timeout_and_cancel () -> Result <(), AppError> {
let (cancel_tx, cancel_rx) = oneshot::channel ();
let handle = tokio::spawn (async move {
let _ = cancel_tx.send (());
async_operation (cancel_rx).await
});
let result = timeout (Duration::from_secs (3 ), handle).await ;
match result {
Ok (Ok (_)) => Ok (()),
Ok (Err (e)) => {
if e.is_cancelled () {
Err (AppError::Canceled)
} else {
Err (AppError::Other (e.into ()))
}
}
Err (_) => Err (AppError::Timeout),
}
}
#[tokio::main]
async fn main () {
match run_with_timeout_and_cancel ().await {
Ok (_) => println! ("Success" ),
Err (AppError::Timeout) => println! ("Error: Operation timed out" ),
Err (AppError::Canceled) => println! ("Error: Task canceled" ),
Err (e) => println! ("Error: {}" , e),
}
}
五、并发任务的错误聚合
5.1 join! 与 try_join! 的错误处理
join! :等待所有任务完成,不处理错误。
try_join! :有一个任务失败就返回错误。
use tokio::join;
use tokio::try_join;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task1 error" )]
Task1,
#[error("Task2 error" )]
Task2,
}
async fn task1 () -> Result <(), AppError> {
tokio::time::sleep (tokio::time::Duration::from_secs (1 )).await ;
Ok (())
}
async fn task2 () -> Result <(), AppError> {
tokio::time::sleep (tokio::time::Duration::from_secs (2 )).await ;
Err (AppError::Task2)
}
#[tokio::main]
async fn main () {
println! ("Using join!" );
let (res1, res2) = join!(task1 (), task2 ());
println! ("Task1 result: {:?}" , res1);
println! ("Task2 result: {:?}" , res2);
println! ("\nUsing try_join!" );
let result = try_join!(task1 (), task2 ());
println! ("Result: {:?}" , result);
}
5.2 多个任务的错误聚合 我们可以使用 futures::future::try_join_all 函数聚合多个任务的错误。
use futures::future::try_join_all;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task {0} error" )]
Task (usize ),
}
async fn task (i: usize ) -> Result <usize , AppError> {
tokio::time::sleep (tokio::time::Duration::from_secs (i as u64 )).await ;
if i % 2 == 0 {
Ok (i)
} else {
Err (AppError::Task (i))
}
}
#[tokio::main]
async fn main () {
let tasks = (0 ..5 ).map (|i| task (i));
let result = try_join_all (tasks).await ;
println! ("Result: {:?}" , result);
}
5.3 spawn 的错误处理 Tokio 的 tokio::spawn 函数会返回 tokio::task::JoinHandle<T> 类型,我们需要使用 join() 方法等待任务完成并处理错误。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task error" )]
Task,
}
async fn task () -> Result <(), AppError> {
sleep (Duration::from_secs (2 )).await ;
Err (AppError::Task)
}
#[tokio::main]
async fn main () {
let handle = tokio::spawn (task ());
let result = handle.await ;
println! ("Task result: {:?}" , result);
}
六、异步错误的传递与传播
6.1 错误链的构建 错误链是指错误之间的因果关系,我们可以使用 thiserror 的 #[source] 属性来构建错误链。
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Failed to fetch data: {0}" )]
FetchData (#[source] reqwest::Error),
#[error("Failed to parse data: {0}" )]
ParseData (#[source] serde_json::Error),
}
async fn fetch_and_parse_data () -> Result <serde_json::Value, AppError> {
let response = reqwest::get ("https://example.com/api/data" ).await .map_err (AppError::FetchData)?;
let data = response.text ().await .map_err (AppError::FetchData)?;
let json = serde_json::from_str (&data).map_err (AppError::ParseData)?;
Ok (json)
}
#[tokio::main]
async fn main () {
let result = fetch_and_parse_data ().await ;
match result {
Ok (json) => println! ("Data: {}" , serde_json::to_string_pretty (&json).unwrap ()),
Err (e) => {
println! ("Error: {}" , e);
println! ("Cause chain:" );
let mut cause = e.source ();
while let Some (c) = cause {
println! (" {}" , c);
cause = c.source ();
}
}
}
}
6.2 异步任务间的错误传递 我们可以使用 tokio::sync::oneshot 通道在异步任务间传递错误。
use tokio::sync::oneshot;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Task error" )]
Task,
}
async fn task (tx: oneshot::Sender<Result <(), AppError>>) {
tokio::time::sleep (tokio::time::Duration::from_secs (2 )).await ;
let _ = tx.send (Err (AppError::Task));
}
#[tokio::main]
async fn main () {
let (tx, rx) = oneshot::channel ();
tokio::spawn (task (tx));
let result = rx.await .unwrap ();
println! ("Task result: {:?}" , result);
}
6.3 父子任务间的错误传递 当父任务创建子任务时,我们可以将子任务的错误传递到父任务。
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Child task error" )]
ChildTask,
}
async fn child_task () -> Result <(), AppError> {
sleep (Duration::from_secs (1 )).await ;
Err (AppError::ChildTask)
}
async fn parent_task () -> Result <(), AppError> {
let handle = tokio::spawn (child_task ());
let result = handle.await ?;
Ok (result)
}
#[tokio::main]
async fn main () {
let result = parent_task ().await ;
println! ("Parent task result: {:?}" , result);
}
七、实战项目:异步 API 的错误处理体系
7.1 项目需求与架构设计 我们将构建一个异步 RESTful API,支持用户管理功能,包括创建用户、查询用户、更新用户和删除用户。我们将设计完整的错误处理体系,包括请求验证错误、数据库操作错误、业务逻辑错误的统一响应。
使用 Axum 作为 HTTP 框架
使用 SQLx 作为数据库访问库
使用 thiserror 定义自定义错误类型
使用 serde 定义 API 响应结构
实现错误中间件,统一处理所有错误
7.2 依赖配置与项目初始化 cargo new rust-async-api-error-handling
cd rust-async-api-error-handling
[dependencies]
axum = { version = "0.5" , features = ["ws" ] }
tokio = { version = "1.0" , features = ["full" ] }
sqlx = { version = "0.6" , features = ["postgres" , "runtime-tokio-rustls" ] }
thiserror = "1.0"
serde = { version = "1.0" , features = ["derive" ] }
serde_json = "1.0"
uuid = { version = "1.1" , features = ["v4" ] }
tracing = "0.1"
tracing-subscriber = { version = "0.3" , features = ["env-filter" , "json" ] }
7.3 数据库连接与模型定义 use sqlx::PgPool;
pub async fn create_pool (database_url: &str ) -> PgPool {
PgPool::connect (database_url).await .unwrap ()
}
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
pub struct User {
pub id: Uuid,
pub name: String ,
pub email: String ,
pub password: String ,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateUserRequest {
pub name: String ,
pub email: String ,
pub password: String ,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserRequest {
pub name: Option <String >,
pub email: Option <String >,
pub password: Option <String >,
}
7.4 错误类型与响应结构 use thiserror::Error;
use axum::{http::StatusCode, response::{IntoResponse, Response}};
use serde::{Serialize, Deserialize};
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
#[error("Request validation error: {0}" )]
Validation (String ),
#[error("User not found" )]
UserNotFound,
#[error("User already exists" )]
UserAlreadyExists,
#[error("Internal server error" )]
InternalServerError,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub error: String ,
pub code: u16 ,
pub message: String ,
}
impl IntoResponse for AppError {
fn into_response (self ) -> Response {
let (status, code, message) = match self {
AppError::Database (e) => {
tracing::error!("Database error: {:?}" , e);
(StatusCode::INTERNAL_SERVER_ERROR, 500 , "Internal server error" )
},
AppError::Validation (msg) => (StatusCode::BAD_REQUEST, 400 , msg.as_str ()),
AppError::UserNotFound => (StatusCode::NOT_FOUND, 404 , "User not found" ),
AppError::UserAlreadyExists => (StatusCode::CONFLICT, 409 , "User already exists" ),
AppError::InternalServerError => (StatusCode::INTERNAL_SERVER_ERROR, 500 , "Internal server error" ),
};
let response = ErrorResponse {
error: self .to_string (),
code,
message: message.to_string (),
};
(status, axum::Json (response)).into_response ()
}
}
7.5 业务逻辑实现 use sqlx::PgPool;
use uuid::Uuid;
use crate::models::*;
use crate::errors::AppError;
pub async fn create_user (pool: &PgPool, user: CreateUserRequest) -> Result <User, AppError> {
let exists = sqlx::query_scalar!("SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)" , user.email).fetch_one (pool).await ?;
if exists {
return Err (AppError::UserAlreadyExists);
}
let user = sqlx::query_as!(User, "INSERT INTO users (id, name, email, password, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *" , Uuid::new_v4 (), user.name, user.email, user.password, chrono::Utc::now (), chrono::Utc::now ()).fetch_one (pool).await ?;
Ok (user)
}
pub async fn get_user_by_id (pool: &PgPool, id: Uuid) -> Result <User, AppError> {
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1" , id).fetch_one (pool).await .map_err (|e| {
if e.as_database_error ().is_some () && e.to_string ().contains ("no row found" ) {
AppError::UserNotFound
} else {
AppError::Database (e)
}
})?;
Ok (user)
}
pub async fn update_user (pool: &PgPool, id: Uuid, user: UpdateUserRequest) -> Result <User, AppError> {
let updated_user = sqlx::query_as!(User, "UPDATE users SET name = COALESCE($1, name), email = COALESCE($2, email), password = COALESCE($3, password), updated_at = $4 WHERE id = $5 RETURNING *" , user.name, user.email, user.password, chrono::Utc::now (), id).fetch_one (pool).await .map_err (|e| {
if e.as_database_error ().is_some () && e.to_string ().contains ("no row found" ) {
AppError::UserNotFound
} else {
AppError::Database (e)
}
})?;
Ok (updated_user)
}
pub async fn delete_user (pool: &PgPool, id: Uuid) -> Result <(), AppError> {
let result = sqlx::query!("DELETE FROM users WHERE id = $1" , id).execute (pool).await ?;
if result.rows_affected () == 0 {
return Err (AppError::UserNotFound);
}
Ok (())
}
use crate::repository::*;
use crate::models::*;
use crate::errors::AppError;
use sqlx::PgPool;
pub async fn create_user_service (pool: &PgPool, user: CreateUserRequest) -> Result <User, AppError> {
if user.name.is_empty () {
return Err (AppError::Validation ("Name is required" .to_string ()));
}
if user.email.is_empty () {
return Err (AppError::Validation ("Email is required" .to_string ()));
}
if !user.email.contains ("@" ) {
return Err (AppError::Validation ("Invalid email format" .to_string ()));
}
if user.password.len () < 6 {
return Err (AppError::Validation ("Password must be at least 6 characters long" .to_string ()));
}
create_user (pool, user).await
}
pub async fn get_user_by_id_service (pool: &PgPool, id: uuid::Uuid) -> Result <User, AppError> {
get_user_by_id (pool, id).await
}
pub async fn update_user_service (pool: &PgPool, id: uuid::Uuid, user: UpdateUserRequest) -> Result <User, AppError> {
if let Some (email) = &user.email {
if !email.contains ("@" ) {
return Err (AppError::Validation ("Invalid email format" .to_string ()));
}
}
if let Some (password) = &user.password {
if password.len () < 6 {
return Err (AppError::Validation ("Password must be at least 6 characters long" .to_string ()));
}
}
update_user (pool, id, user).await
}
pub async fn delete_user_service (pool: &PgPool, id: uuid::Uuid) -> Result <(), AppError> {
delete_user (pool, id).await
}
7.6 API 路由与中间件 use axum::{routing::{get, post, put, delete}, extract::Path, extract::State, Json};
use uuid::Uuid;
use crate::models::*;
use crate::service::*;
use crate::errors::AppError;
use sqlx::PgPool;
pub fn create_routes () -> axum::Router<PgPool> {
axum::Router::new ()
.route ("/users" , post (create_user_handler))
.route ("/users/:id" , get (get_user_handler))
.route ("/users/:id" , put (update_user_handler))
.route ("/users/:id" , delete (delete_user_handler))
}
async fn create_user_handler (State (pool): State<PgPool>, Json (req): Json<CreateUserRequest>) -> Result <Json<User>, AppError> {
let user = create_user_service (&pool, req).await ?;
Ok (Json (user))
}
async fn get_user_handler (State (pool): State<PgPool>, Path (id): Path<Uuid>) -> Result <Json<User>, AppError> {
let user = get_user_by_id_service (&pool, id).await ?;
Ok (Json (user))
}
async fn update_user_handler (State (pool): State<PgPool>, Path (id): Path<Uuid>, Json (req): Json<UpdateUserRequest>) -> Result <Json<User>, AppError> {
let user = update_user_service (&pool, id, req).await ?;
Ok (Json (user))
}
async fn delete_user_handler (State (pool): State<PgPool>, Path (id): Path<Uuid>) -> Result <(), AppError> {
delete_user_service (&pool, id).await
}
7.7 应用程序入口 use axum::Router;
use tracing_subscriber::prelude::*;
use tracing::info;
use crate::db::create_pool;
use crate::routes::create_routes;
mod db;
mod errors;
mod models;
mod repository;
mod routes;
mod service;
#[tokio::main]
async fn main () {
tracing_subscriber::registry ()
.with (tracing_subscriber::EnvFilter::new ("info" ))
.with (tracing_subscriber::fmt::layer ())
.init ();
let database_url = "postgresql://user:password@localhost:5432/mydb" ;
let pool = create_pool (database_url).await ;
info!("Database pool created" );
let app : Router<sqlx::PgPool> = create_routes ();
let listener = tokio::net::TcpListener::bind ("0.0.0.0:3000" ).await .unwrap ();
info!("API server running on http://0.0.0.0:3000" );
axum::serve (listener, app.with_state (pool)).await .unwrap ();
}
7.8 数据库初始化 mkdir -p migrations/20230101000000_create_users
创建 migrations/20230101000000_create_users/up.sql:
CREATE TABLE users (
id UUID PRIMARY KEY ,
name VARCHAR (255 ) NOT NULL ,
email VARCHAR (255 ) NOT NULL UNIQUE ,
password VARCHAR (255 ) NOT NULL ,
created_at TIMESTAMP NOT NULL ,
updated_at TIMESTAMP NOT NULL
);
创建 migrations/20230101000000_create_users/down.sql:
7.9 测试 API curl -X POST -H "Content-Type: application/json" -d '{"name": "Alice", "email": "[email protected] ", "password": "password123"}' http://localhost:3000/users
curl -X GET http://localhost:3000/users/{user_id}
curl -X PUT -H "Content-Type: application/json" -d '{"name": "Alice Smith"}' http://localhost:3000/users/{user_id}
curl -X DELETE http://localhost:3000/users/{user_id}
八、异步错误调试与排查
8.1 用 tracing 记录错误 我们可以使用 tokio-tracing 库记录错误的详细信息,包括错误的位置、堆栈信息等。
use tracing::error;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
}
async fn fetch_data () -> Result <(), AppError> {
let result = sqlx::query!("SELECT * FROM nonexistent_table" ).execute (&sqlx::PgPool::connect ("postgresql://user:password@localhost:5432/mydb" ).await ?).await ;
result?;
Ok (())
}
#[tokio::main]
async fn main () {
tracing_subscriber::registry ()
.with (tracing_subscriber::EnvFilter::new ("info" ))
.with (tracing_subscriber::fmt::layer ())
.init ();
if let Err (e) = fetch_data ().await {
error!("Error fetching data: {:?}" , e);
error!("Backtrace: {:?}" , e.backtrace ());
}
}
8.2 用 tokio-console 定位错误 tokio-console 是 Tokio 提供的调试工具,可以用于定位异步任务的错误。
cargo install tokio-console
RUSTFLAGS="--cfg tokio_unstable" cargo run -- tokio-console
8.3 错误信息的格式化输出 我们可以使用 format! 或 serde_json 库格式化错误信息,使错误信息更容易阅读。
use serde_json;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}" )]
Database (#[from] sqlx::Error),
}
async fn fetch_data () -> Result <(), AppError> {
let result = sqlx::query!("SELECT * FROM nonexistent_table" ).execute (&sqlx::PgPool::connect ("postgresql://user:password@localhost:5432/mydb" ).await ?).await ;
result?;
Ok (())
}
#[tokio::main]
async fn main () {
if let Err (e) = fetch_data ().await {
let error_json = serde_json::json!({"error" : e.to_string (), "code" : 500 , "message" : "Internal server error" , "details" : format! ("{:?}" , e)});
println! ("{}" , serde_json::to_string_pretty (&error_json).unwrap ());
}
}
九、常见问题与最佳实践
9.1 异步任务崩溃
在任务中添加错误处理,使用 match 语句或 ? 操作符处理错误。
使用 tokio::spawn 的 JoinHandle 来捕获任务的崩溃。
使用 tokio-tracing 库记录崩溃的详细信息。
9.2 错误信息不够详细
使用 thiserror 的 #[source] 属性构建错误链。
使用 tokio-tracing 库记录错误的位置和堆栈信息。
使用 serde_json 库格式化错误信息。
9.3 错误处理代码重复
定义统一的错误类型,使用 thiserror 库。
实现 IntoResponse trait,统一处理错误响应。
使用中间件处理全局错误。
9.4 异步任务泄漏
正确处理任务的取消,使用 tokio::task::JoinHandle::abort 方法。
确保任务有明确的终止条件。
使用 tokio-console 工具定位任务泄漏。
十、总结 异步错误处理是 Rust 异步编程的重要组成部分。通过深入理解异步错误的本质与分类、异步上下文中的标准 Result 处理、自定义异步错误类型的设计、超时与取消错误的处理、并发任务的错误聚合、异步错误的传递与传播、实战项目的错误处理体系以及异步错误调试与排查,我们可以编写出更高效、更安全的异步代码。
在实际项目中,我们应该根据项目的需求选择合适的错误处理方式。对于库开发,我们应该使用 thiserror 定义自定义错误类型,支持 Error trait 的实现。对于应用开发,我们可以使用 anyhow 快速处理各种类型的错误。同时,我们应该使用 tokio-tracing 库记录错误的详细信息,使用 tokio-console 工具定位异步任务的错误。
希望本章的内容能够帮助您深入掌握 Rust 异步编程的错误处理艺术,并在实际项目中应用。
相关免费在线工具 Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
JSON美化和格式化 将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online