概述
在 Rust 的异步生态系统中,Tokio 的地位可以用一句话概括:它是 Rust 异步编程的事实标准运行时(Runtime),也是构建高性能网络应用的基石。
你可以把它类比为 Java 生态中的 Netty,或者是 JavaScript 生态中的 核心。它是目前 Rust 生态中最受欢迎、使用最广泛的异步运行时库。
Tokio 是 Rust 异步生态的事实标准运行时,负责任务调度和异步 I/O 驱动。涵盖环境配置、运行时启动、任务管理(spawn/join/select)、并发控制(Semaphore/Mutex)、异步 IO 编程及最佳实践,指导开发者构建高性能网络应用。

在 Rust 的异步生态系统中,Tokio 的地位可以用一句话概括:它是 Rust 异步编程的事实标准运行时(Runtime),也是构建高性能网络应用的基石。
你可以把它类比为 Java 生态中的 Netty,或者是 JavaScript 生态中的 核心。它是目前 Rust 生态中最受欢迎、使用最广泛的异步运行时库。
为了让你更全面地了解它,我从名气、地位、作用和核心特点四个方面为你详细介绍:
简单来说,Tokio 让你能够用较少的资源(线程、内存)处理 海量的并发任务(如成千上万个 TCP 连接)。
它主要扮演了以下三个角色:
| 特性 | 说明 | 为什么重要 |
|---|---|---|
| 高性能 | 基于事件驱动和非阻塞 I/O。 | 能够以极低的延迟处理数万甚至数十万的并发连接,非常适合构建微服务和代理服务器。 |
| 可靠 | 基于 Rust 的内存安全保证。 | 避免了传统 C/C++ 网络库常见的内存泄漏、空指针等崩溃问题,系统更加稳定。 |
| 易用 | 完美支持 async/await 语法。 | 让异步代码看起来像同步代码一样直观,降低了心智负担。 |
| 灵活 | 提供单线程和多线程运行时选项。 | 你可以根据场景选择:是追求极致的单核性能(单线程),还是充分利用多核(多线程)。 |
如果把编写一个异步应用比作经营一家餐厅:
虽然 Tokio 很强,但它不是万能的。你需要知道它的边界:
如果你想用 Rust 写一个 Web 服务器、数据库代理、消息队列或者任何需要处理大量网络连接的程序,Tokio 是你的首选。它是 Rust 异步生态的基石,学习它对于掌握现代 Rust 开发至关重要。
Tokio 采用模块化设计,可根据需求开启对应特性,核心依赖分为'运行时核心''宏支持''IO 扩展'等类别。在 Cargo.toml 中配置依赖,按需选择特性以减少体积。
满足基本异步任务运行需求,包含多线程运行时、async/await 宏支持:
[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
# rt-multi-thread:多线程运行时(推荐生产环境)
# macros:提供#[tokio::main]、#[tokio::test]等宏
实际开发中需补充 IO、网络、同步原语等特性,按需组合:
[dependencies]
tokio = {
version = "1.0",
features = [
"rt-multi-thread", # 多线程运行时
"macros", # 宏支持
"fs", # 异步文件 IO
"net", # 网络编程(TCP/UDP)
"sync", # 异步同步原语(Mutex、Semaphore 等)
"time", # 异步时间管理(定时任务)
"signal" # 信号处理(如中断信号)
]
}
提示:开发调试阶段可临时开启 "full" 特性(包含所有功能),生产环境需精简特性,避免冗余依赖。
Tokio 的核心是'异步运行时'——它负责调度异步任务、管理线程池、处理 IO 事件。异步任务基于 Future 特质,通过 async/await 语法简化编写,Tokio 运行时则确保任务高效执行。
通过 #[tokio::main] 宏可快速启动多线程运行时,这是最常用的入口方式;也可手动构建运行时以自定义配置(如线程数、栈大小)。
// 基础用法:#[tokio::main] 宏启动多线程运行时
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Tokio 多线程运行时启动成功");
// 异步任务逻辑...
Ok(())
}
// 进阶:手动构建运行时(自定义线程数)
fn main() {
// 配置运行时:核心线程数=4,最大线程数=8
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(8)
.enable_all()
.build()
.unwrap();
// 在运行时中执行异步任务
rt.block_on(async {
println!("手动构建的运行时执行异步任务");
});
}
关键说明:block_on 方法会阻塞当前线程,直到异步任务执行完成,仅用于运行时入口或同步代码调用异步逻辑的场景。
#[tokio::main(worker_threads = 4)]
async fn main() {
println!("Hello world!");
}
multi_thread: 使用多线程调度器。这是默认模式,会为每个 CPU 核心启动一个工作线程。current_thread: 使用单线程调度器,所有任务都在当前线程上执行,没有额外的线程开销。#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("使用单线程运行时");
}
意义作用:
tokio::spawn 包裹一个异步任务(Future 状态机),返回一个异步任务(JoinHandle),而且包裹的异步任务仅仅包裹之后是不能执行的,还需要让返回的异步任务通过 .await 来执行包裹的异步任务。而被包裹的异步任务本身就可以通过 .await 来执行,那么 tokio::spawn 包裹有什么意义呢?难道是画蛇添足吗?显然不是。
这是一个非常好的问题,直接触及了 Rust 异步编程的核心:并发(Concurrency)与异步(Asynchrony)的区别。
简单来说:直接 await 一个异步函数是 '顺序执行',而 tokio::spawn 包裹下,可以让多个被包裹的异步任务 '并发执行'。
| 方式 | 执行方式 | 适用场景 |
|---|---|---|
直接 await | 顺序执行(排队) | 任务有依赖、简单任务、必须等结果 |
tokio::spawn | 并发执行(并排跑) | 多个独立耗时任务、后台任务、提高效率 |
#[tokio::main]
async fn main() {
// 1. 创建异步任务(无返回值)
let task1 = async {
println!("任务 1 开始执行");
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
println!("任务 1 执行完成");
};
// 2. 创建带返回值的异步任务
let task2 = async {
println!("任务 2 开始执行");
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
println!("任务 2 执行完成");
42 // 返回值:i32
};
// 等待任务 1 完成(忽略返回值)
let _ = task1.await;
println!("任务 1 无返回值");
// 等待任务 2 完成并获取返回值
let result = task2.await;
println!("任务 2 返回值:{}", result);
}
#[tokio::main]
async fn main() {
// 1. 创建异步任务(无返回值)
let task1 = tokio::spawn(async {
println!("任务 1 开始执行");
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
println!("任务 1 执行完成");
});
// 2. 创建带返回值的异步任务
let task2 = tokio::spawn(async {
println!("任务 2 开始执行");
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
println!("任务 2 执行完成");
42 // 返回值:i32
});
// 等待任务 1 完成(忽略返回值)
let _ = task1.await;
println!("任务 1 无返回值");
// 等待任务 2 完成并获取返回值
let result = task2.await.unwrap();
println!("任务 2 返回值:{}", result);
}
注意:
spawn 包裹的任务若恐慌,会导致整个运行时终止(可通过 catch_unwind 捕获恐慌)。use tokio; 就能正常执行代码。简单来说,这是因为 #[tokio::main] 这个属性宏(Attribute Macro)在编译时'展开'成了底层的样板代码,而在这段生成的代码中,编译器能够找到并正确调用 tokio 库的路径。在宏展开后生成的代码中,所有对 Tokio 库的调用(例如 tokio::runtime::Builder)都使用了全路径(Fully Qualified Path)。这意味着,即使你的代码文件顶部没有 use tokio;,编译器也完全知道去哪里找到 tokio 这个 crate,因为宏已经明确地告诉了它。tokio::spawn 的返回值是 JoinHandle<T>。
tokio::spawn 启动的后台任务。.await 它来等待任务完成。.await 一个 JoinHandle,它会阻塞当前异步函数,直到后台任务完成,并返回任务的执行结果。
let handle = tokio::spawn(async { 1 + 2 });
let result = handle.await.unwrap(); // 等待任务完成,获取结果:3
println!("任务结果:{}", result);
这是 JoinHandle 最重要的特性之一:
join! 宏允许你并发地等待多个 Future,当所有 Future 都完成后,它才会返回。
use tokio::time::{sleep, Duration};
async fn fetch_user(user_id: u32) -> String {
println!("fetch_user start");
sleep(Duration::from_millis(500)).await;
println!("fetch_user end");
format!("User {}", user_id)
}
async fn fetch_orders(user_id: u32) -> Vec<String> {
println!("fetch_orders start");
sleep(Duration::from_millis(150)).await;
println!("fetch_orders end");
vec![format!("Order 1 for user {}", user_id)]
}
#[tokio::main]
async fn main() {
let user_id = 42;
// 并发执行两个任务,总耗时约等于耗时最长的那个(150ms)
let (user, orders) = tokio::join!(fetch_user(user_id), fetch_orders(user_id));
println!("User: {}", user);
println!("Orders: {:?}", orders);
}
异步任务的集合管理器。它极大地简化了动态数量的异步任务的管理,解决了传统方式(手动维护一个 Vec)的诸多痛点。 当 JoinSet 被丢弃时,其内部所有未完成的任务会自动被取消,有效防止资源泄露。
use tokio::task::JoinSet;
use std::time::Duration;
async fn process_item(id: u32) -> String {
// 模拟一些异步工作
tokio::time::sleep(Duration::from_millis(100)).await;
format!("处理完成:{}", id)
}
#[tokio::main]
async fn main() {
let mut join_set = JoinSet::new();
// 1. 动态添加 5 个任务
for i in 0..5 {
join_set.spawn(async move {
process_item(i).await;
});
}
// 2. 等待所有任务完成并收集结果
// join_all 会返回一个 Vec,包含每个任务的结果
let results = join_set.join_all().await;
// 3. 处理结果
for result in results {
println!("{}", result);
}
}
select! 宏用于监听多个 Future,当其中任意一个完成时,就会执行对应的分支代码。
需要注意的是,一旦某个分支被选中执行,其他未完成的 Future 会被取消。
这个机制适合处理'超时'或'任一结果先到即可'的场景,下面举这两个例子。
use tokio::time::{sleep, Duration};
async fn fetch_user(user_id: u32) -> String {
println!("fetch_user start");
sleep(Duration::from_millis(500)).await;
println!("fetch_user end");
format!("User {}", user_id)
}
async fn fetch_orders(user_id: u32) -> Vec<String> {
println!("fetch_orders start");
sleep(Duration::from_millis(150)).await;
println!("fetch_orders end");
vec![format!("Order 1 for user {}", user_id)]
}
#[tokio::main]
async fn main() {
let user_id = 42;
tokio::select! {
user = fetch_user(user_id) => println!("User: {}", user),
orders = fetch_orders(user_id) => println!("Orders: {:?}", orders),
}
println!("main end");
}
use tokio::time::{sleep, Duration, timeout};
async fn fetch_data() -> Result<String, &'static str> {
// 模拟一个可能耗时过长的操作
sleep(Duration::from_secs(10)).await;
Ok("Hello, World!".to_string())
}
#[tokio::main]
async fn main() {
tokio::select! {
// 分支 1:尝试获取数据
result = fetch_data() => {
match result {
Ok(data) => println!("成功获取数据:{}", data),
Err(e) => println!("获取数据失败:{}", e),
}
}
// 分支 2:一个 2 秒的超时定时器
_ = sleep(Duration::from_secs(2)) => {
println!("获取数据超时!");
}
}
}
简单来说:
spawn:负责处理异步的、非阻塞的 I/O 任务,它们在主工作线程池中运行。spawn_blocking:负责处理同步的、会阻塞的任务(如 CPU 密集型计算),它们被隔离到一个独立的线程池中运行,以保护主运行时。| 特性 | tokio::spawn | tokio::task::spawn_blocking |
|---|---|---|
| 任务类型 | 异步任务 (Async) 编写为 async 块或函数,内部使用 .await 进行非阻塞等待。 | 同步任务 (Sync/Blocking) 编写为普通的同步闭包或函数,可能会执行 std::thread::sleep、复杂的 CPU 计算等阻塞操作。 |
| 执行位置 | 主工作线程池 (Worker Pool) 任务由 Tokio 的调度器在少量的工作线程上快速切换执行。 | 专用阻塞线程池 (Blocking Pool) 任务被移动到一个独立的、专门为阻塞操作准备的线程池中执行。 |
| 适用场景 | I/O 密集型任务 如:网络请求、文件读写、数据库查询等。 | CPU 密集型或同步阻塞任务 如:复杂计算、数据压缩、调用不支持异步的库、std::fs 文件操作等。 |
| 对运行时的影响 | 协作式调度 任务会在 .await 点主动让出,保证其他任务公平执行。 | 隔离保护 阻塞任务被'移走',防止它霸占主工作线程,导致其他异步任务'饿死'。 |
| 返回值 | JoinHandle<T> 一个 Future,.await 后可获取异步任务的结果。 | JoinHandle<T> 同样是一个 Future,可以 .await 等待同步任务完成并获取结果。 |
如果 把 阻塞线程的任务 塞进 spawn,会导致 异步线程 '卡死',进而引发 调度器饥饿,甚至可能让整个程序崩溃。
使用案例:
use tokio::time::{self, Duration};
// 1. 定义一个耗时的同步计算函数 (这会阻塞线程)
// 注意:这不是 async 函数,它会一直占用 CPU 直到算完
fn cpu_intensive_task(n: u32) -> u64 {
println!("开始执行耗时的 CPU 计算 (斐波那契 {})...", n);
// 模拟复杂的计算过程 (简单的循环版斐波那契)
let (mut a, mut b) = (0, 1);
for _ in 0..n {
let temp = a + b;
a = b;
b = temp;
}
// ⚠️ 危险操作:在 async 函数中执行同步的 sleep
// 这会强制当前线程休眠 5 秒,期间无法做任何其他事
std::thread::sleep(Duration::from_millis(2000));
println!("CPU 计算完成!");
b
}
#[tokio::main]
async fn main() {
// --- 场景 A: 启动一个正常的异步任务 ---
// 这个任务负责每 500ms 打印一次,用来检测主线程是否卡顿
let monitor_handle = tokio::spawn(async move {
let mut interval = time::interval(Duration::from_millis(500));
for i in 0..10 {
interval.tick().await; // .await 让出控制权,不会卡住线程
println!("> {i} 监控任务:系统运行正常");
}
});
// --- 场景 B: 执行耗时的 CPU 计算 ---
// 我们需要把阻塞任务放在 spawn_blocking 里
// 这样它会被扔到专门的 "阻塞线程池" 里去跑,不会影响上面的 monitor_handle
let compute_handle = tokio::task::spawn_blocking(|| {
// 调用我们的同步函数
cpu_intensive_task(20) // 假设算第 40 个斐波那契数需要一点时间
});
// 等待两个任务都完成
// 注意:monitor 是非阻塞的,compute 是阻塞的,但被隔离了
let _ = monitor_handle.await;
let result = compute_handle.await.unwrap(); // 获取计算结果
println!("最终结果:{}", result);
}
从上面的输出可以看出 CPU 密集的任务没有阻塞主线程。
当你需要并发处理大量任务(例如,同时发起上千个网络请求)时,无限制地 spawn 任务可能会压垮系统资源或触发目标服务器的反爬虫机制。Semaphore(信号量)是控制并发数量的利器。
你可以把 Semaphore(信号量)想象成一个非常尽职的停车场管理员,或者更简单点,就是入场券的发放机。 它的核心作用就一个:控制同时使用某个资源的'人'数,防止一窝蜂地涌进去把系统搞崩。
use tokio::sync::Semaphore;
use std::sync::Arc;
use tokio::time::{self, Duration, Instant};
async fn download_file(file_id: u32, duration: u64, semaphore: Arc<Semaphore>) {
// 1. 先获取许可(在这里排队,直到有空位)
let _permit = semaphore.acquire().await.unwrap();
// 2. 获取许可后,再打印日志
// 此时该任务已经稳稳地占住了一个坑位
let current_active = 3 - semaphore.available_permits(); // 3 是最大数
let start = Instant::now();
println!("📥 [任务 {}] 开始工作 (耗时{}s),当前并发数:{}/3", file_id, duration, current_active);
// 3. 模拟耗时工作
time::sleep(Duration::from_secs(duration)).await;
println!("🚀 [任务 {}] 完成 (耗时{:.1}s)", file_id, start.elapsed().as_secs_f32());
}
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let durations = vec![2, 1, 3, 1, 2, 1, 4, 1, 2, 1];
println!("🚀 开始下载,最大并发数:3\n");
let mut join_set = tokio::task::JoinSet::new();
for (i, duration) in durations.into_iter().enumerate() {
let sem = semaphore.clone();
join_set.spawn(async move {
download_file(i as u32, duration, sem).await;
});
}
while let Some(_) = join_set.join_next().await {
// 等待全部完成
}
println!("\n🎉 全部完成!");
}
tokio::sync::mpsc 是 Tokio 异步运行时中最核心、最常用的通信工具之一。它的名字是'多生产者、单消费者'。
它的作用就是在一个或多个异步任务(生产者)与另一个异步任务(消费者)之间,建立一个异步、非阻塞的消息队列。
use tokio::sync::mpsc;
use std::time::Duration;
// 定义消息类型,可以是枚举,包含不同种类的信息
#[derive(Debug)]
enum Message {
Log {
level: String,
content: String,
},
Shutdown, // 用于通知消费者关闭
}
#[tokio::main]
async fn main() {
// 1. 创建一个有界通道
// 这里的 32 是缓冲区大小,如果发送太快,接收方没消费,缓冲区满了发送就会等待
let (tx, mut rx) = mpsc::channel(32);
// --- 启动消费者任务 (负责接收和处理) ---
// 注意:这里我们把接收端 rx 移动到了任务里
let handle = tokio::spawn(async move {
// 消费者逻辑
loop {
match rx.recv().await {
Some(Message::Log { level, content }) => {
println!("[消费者] 收到日志 [{}] : {}", level, content);
// 模拟处理耗时
tokio::time::sleep(Duration::from_millis(100)).await;
}
Some(Message::Shutdown) => {
println!("[消费者] 收到关闭指令,正在退出...");
break;
}
None => {
// 所有的发送端 (tx) 都被丢弃了,通道关闭
println!("[消费者] 所有发送端已关闭,退出循环。");
break;
}
}
}
});
// --- 模拟多个生产者任务 (负责发送) ---
// 我们克隆发送端,给每个任务一个副本
let tx1 = tx.clone();
let producer1 = tokio::spawn(async move {
for i in 1..=3 {
// 发送日志
let _ = tx1.send(Message::Log {
level: "INFO".to_string(),
content: format!("生产者 1 的任务 {}", i),
}).await;
tokio::time::sleep(Duration::from_millis(50)).await;
}
});
let producer2 = tokio::spawn(async move {
for i in 1..=3 {
let _ = tx.send(Message::Log {
level: "ERROR".to_string(),
content: format!("生产者 2 发现错误 {}", i),
}).await;
tokio::time::sleep(Duration::from_millis(70)).await;
}
});
// 等待生产者完成
let _ = tokio::join!(producer1, producer2);
// 等待消费者完成
let _ = handle.await;
println!("所有任务结束,程序退出。");
}
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
// 1. 创建一个被 Arc 包裹的 Mutex
// Arc 用于多所有权,Mutex 用于异步互斥访问
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
// 2. 派生 10 个任务
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
// 3. 锁定 Mutex (异步等待)
// 这里会返回一个 Guard,只要 Guard 存在,锁就持有
let mut num = counter.lock().await;
*num += 1;
// 锁在 num 离开作用域时自动释放
});
handles.push(handle);
}
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
// 检查最终结果
println!("最终计数:{}", *counter.lock().await); // 应该输出 10
}
由于 Tokio 的线程池特性,任务可能在不同线程间跳转,因此不能使用标准库的 std::sync::Mutex,必须使用 Tokio 提供的异步互斥锁。
这是一个非常核心且容易混淆的概念。简单来说,问题的核心在于 '阻塞'。
std::sync::Mutex 会 阻塞线程,而 Tokio 的运行时(Runtime)最怕的就是 线程阻塞。
下面我用一个比喻和具体的代码逻辑来拆解为什么不能混用。
std::sync::Mutex (标准库互斥锁):
lock() 一个已经被线程 B 锁住的 Mutex 时,线程 A 会被操作系统直接挂起(睡眠),直到 B 释放锁。tokio::sync::Mutex (Tokio 互斥锁):
lock().await 一个被任务 D 锁住的 Mutex 时,任务 C 只是把自己标记为'待处理'然后让出控制权,而它所在的 操作系统线程 会立即去执行其他就绪的任务。std::sync::Mutex 是灾难?Tokio 的 多线程运行时(multi-threaded runtime) 通常只有几个 OS 线程(比如 4 个),它们组成了一个'线程池'。
如果在其中一个线程中使用了 std::sync::Mutex 并导致阻塞:
std::sync::Mutex。std::sync::Mutex 是跨线程阻塞的,线程 2 会被操作系统挂起,等待任务 A 释放锁。一句话总结:
std::sync::Mutex会把 Tokio 辛辛苦苦维护的'高并发线程池'中的线程一个个'冻住',导致吞吐量暴跌,甚至死锁。
std::sync::Mutex?虽然通常不推荐,但在以下两种情况下是可以使用的:
current_thread 运行时中(且只有一个任务持有锁):
flavor = "current_thread"),且你能保证锁的获取和释放都在同一个'执行流'中,理论上不会发生跨线程阻塞导致死锁。但即便如此,使用 tokio::sync::Mutex 仍然是更好的习惯。在 tokio::task::spawn_blocking 中:
use std::sync::Mutex;
use tokio::task;
#[tokio::main]
async fn main() {
let std_mutex = Arc::new(Mutex::new(0));
let handle = task::spawn_blocking({
let std_mutex = Arc::clone(&std_mutex);
move || {
// 在这里使用 std::sync::Mutex 是安全的
let mut data = std_mutex.lock().unwrap();
*data += 1;
}
});
handle.await.unwrap();
}
有些数据是非线程安全的(例如 C 库的句柄、某些不支持 Send 特性的 Rust 类型),或者你希望每个线程拥有自己独立的数据副本。
这时 标准库 的 thread_local! 不够用,因为 Tokio 的任务可能 跨线程 迁移。
tokio::task::LocalKey(通过 thread_local! 宏创建)提供的是任务本地存储,保证即使任务在不同线程间迁移,也能访问到同一个数据上下文。
use std::sync::Arc;
use tokio::sync::Mutex; // 引入 Tokio 的异步互斥锁
use std::time::Duration;
#[tokio::main]
async fn main() {
// 1. 创建共享状态
// 使用 Arc (原子引用计数) 来跨任务共享所有权
// 使用 Mutex 来保证同一时间只有一个任务能修改数据
let counter = Arc::new(Mutex::new(0));
// 存储任务句柄的向量
let mut handles = vec![];
// 2. 派生 10 个并发任务
for i in 0..10 {
let counter = Arc::clone(&counter); // 克隆 Arc,增加引用计数
// 每个任务都在后台运行
let handle = tokio::spawn(async move {
// 模拟一些异步工作(比如网络请求)
// 注意:这里不能使用 std::thread::sleep,那会阻塞线程
tokio::time::sleep(Duration::from_millis(100)).await;
// 3. 获取锁并修改数据
// lock() 是异步的,如果锁被占用,任务会自动让出控制权给其他任务
let mut num = counter.lock().await;
*num += 1;
println!("任务 {} 完成,当前计数:{}", i, *num);
});
handles.push(handle);
}
// 4. 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
// 5. 检查最终结果
println!("所有任务结束,最终计数:{}", *counter.lock().await);
}
use tokio::task::LocalKey; // 引入线程本地存储
use std::cell::RefCell; // 用于在单线程内实现内部可变性
use std::rc::Rc; // 用于单线程内的引用计数
// 1. 定义一个线程本地变量
// 这个变量的值在每个 Tokio 线程中都是独立的
thread_local! {
static MY_THREAD_LOCAL: RefCell<Rc<String>> = RefCell::new(Rc::new("Default Value".to_string()));
}
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
// 指定使用多线程运行时,并启动 2 个 worker 线程
// 2. 派生两个任务
// 由于是多线程运行时,这两个任务很可能被调度到不同的 OS 线程上
let handle1 = tokio::spawn(async {
// 在任务 1 中修改线程本地变量
MY_THREAD_LOCAL.with(|f| {
// 获取当前线程的副本并修改
*f.borrow_mut() = Rc::new("Value from Task 1".to_string());
});
// 读取并打印
MY_THREAD_LOCAL.with(|f| {
println!("Task 1 sees: {}", f.borrow());
});
});
let handle2 = tokio::spawn(async {
// 模拟一点延迟,增加调度到不同线程的概率
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// 读取
MY_THREAD_LOCAL.with(|f| {
println!("Task 2 sees: {}", f.borrow());
});
});
// 3. 等待任务完成
handle1.await.unwrap();
handle2.await.unwrap();
// 4. 在主线程中读取
// 主线程也有自己的线程本地变量副本
MY_THREAD_LOCAL.with(|f| {
println!("Main thread sees: {}", f.borrow());
});
}
Tokio 提供了完善的异步 IO 工具,涵盖文件 IO、TCP/UDP 网络通信、管道等场景,API 设计与标准库类似,但均为异步非阻塞模式,适合高并发场景。
开启 "fs" 特性后,可通过 tokio::fs 模块操作文件,所有方法均为异步,不会阻塞运行时线程。
use tokio::fs;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 读取文件内容
let content = fs::read_to_string("test.txt").await?;
println!("文件内容:\n{}", content);
// 2. 写入文件(覆盖写入)
let data = "Tokio 异步文件 IO 示例";
fs::write("output.txt", data).await?;
println!("文件写入完成");
// 3. 创建目录并遍历
fs::create_dir_all("demo_dir/sub_dir").await?;
let entries = fs::read_dir("demo_dir").await?;
for entry in entries {
let entry = entry?;
println!("目录项:{}", entry.path().display());
}
Ok(())
}
提示:异步文件 IO 适合高并发读写场景,若为低频小文件操作,同步 std::fs 可能更简洁(无需异步调度开销)。
开启 "net" 特性后,Tokio 可轻松实现异步 TCP 通信,下面分别演示 TCP 服务器和客户端的实现。
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 绑定地址并监听
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("TCP 服务器启动,监听 127.0.0.1:8080");
// 循环接收客户端连接(异步阻塞,不占用线程)
loop {
let (stream, addr) = listener.accept().await?;
println!("新客户端连接:{}", addr);
// 为每个客户端创建独立任务处理(避免阻塞其他连接)
tokio::spawn(handle_client(stream));
}
}
// 处理单个客户端连接
async fn handle_client(mut stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let mut buf = [0; 1024]; // 缓冲区
// 读取客户端数据
let n = stream.read(&mut buf).await?;
if n == 0 {
return Ok(()); // 客户端关闭连接
}
println!("收到客户端数据:{}", String::from_utf8_lossy(&buf[..n]));
// 向客户端发送响应
let response = "已收到你的消息(来自 Tokio TCP 服务器)";
stream.write_all(response.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接服务器
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
println!("已连接到 TCP 服务器");
// 发送数据到服务器
let data = "Hello, Tokio TCP Server!";
stream.write_all(data.as_bytes()).await?;
stream.flush().await?;
// 读取服务器响应
let mut buf = [0; 1024];
let n = stream.read(&mut buf).await?;
println!("收到服务器响应:{}", String::from_utf8_lossy(&buf[..n]));
Ok(())
}
运行逻辑:先启动服务器,再启动客户端,客户端发送消息后,服务器接收并回复,实现异步双向通信。
Tokio 提供了专为异步场景设计的同步原语(区别于 std::sync),以及灵活的定时任务能力,解决异步任务间的同步与时间管理问题。
开启 "sync" 特性后,可使用 tokio::sync 模块的原语,如 Mutex、Semaphore、Notify 等,均支持异步等待,不会阻塞运行时。
use tokio::sync::{Mutex, Semaphore};
use std::sync::Arc;
#[tokio::main]
async fn main() {
// 场景 1:异步 Mutex(多任务安全访问共享数据)
let counter = Arc::new(Mutex::new(0));
let mut tasks = Vec::new();
for i in 0..5 {
let counter = Arc::clone(&counter);
let task = tokio::spawn(async move {
let mut num = counter.lock().await; // 异步锁定,不阻塞线程
*num += 1;
println!("任务{}:计数器值 = {}", i, *num);
});
tasks.push(task);
}
tokio::join!(tasks.iter().map(|t| t.clone()).collect::<Vec<_>>()[0].await); // 简化示意
println!("最终计数器值:{}", *counter.lock().await);
// 场景 2:Semaphore(控制并发数)
let semaphore = Arc::new(Semaphore::new(2)); // 允许最大并发数=2
let mut tasks = Vec::new();
for i in 0..5 {
let semaphore = Arc::clone(&semaphore);
let task = tokio::spawn(async move {
let permit = semaphore.acquire().await.unwrap(); // 获取许可
println!("任务{}:开始执行(并发数控制)", i);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
drop(permit); // 释放许可
});
tasks.push(task);
}
tokio::join!(tasks.iter().map(|t| t.clone()).collect::<Vec<_>>()[0].await); // 简化示意
}
关键区别:tokio::sync::Mutex 的 lock() 方法是异步的,会在锁定可用时唤醒任务;而 std::sync::Mutex 是同步锁定,会阻塞线程,不可在异步任务中使用。
开启 "time" 特性后,可通过 tokio::time 模块实现延迟执行、周期任务等功能,基于 Tokio 运行时的时间驱动机制。
use tokio::time;
#[tokio::main]
async fn main() {
// 场景 1:延迟执行(1 秒后执行任务)
println!("延迟任务开始等待...");
time::sleep(time::Duration::from_secs(1)).await;
println!("延迟任务执行完成");
// 场景 2:周期任务(每隔 2 秒执行一次,执行 3 次后退出)
let mut interval = time::interval(time::Duration::from_secs(2));
let mut count = 0;
loop {
interval.tick().await; // 等待周期触发
count += 1;
println!("周期任务执行,次数:{}", count);
if count >= 3 {
break;
}
}
// 场景 3:截止时间(任务必须在指定时间内完成)
let deadline = time::Instant::now() + time::Duration::from_secs(3);
let result = time::timeout_at(deadline, async {
time::sleep(time::Duration::from_secs(2)).await;
"任务在截止时间前完成"
}).await;
match result {
Ok(msg) => println!("{}", msg),
Err(_) => println!("任务超时未完成"),
}
}
Tokio 是 Rust 异步生态的基石,常与 reqwest、serde、chrono 等库联动,构建完整业务流程。下面演示两个典型实战场景。
use tokio;
use reqwest::Client;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct IpInfo {
ip: String,
country: String,
city: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建可复用的 reqwest 客户端(基于 Tokio 异步运行时)
let client = Client::new();
// 并发发起多个 HTTP 请求
let urls = vec![
"https://ipapi.co/8.8.8.8/json/",
"https://ipapi.co/1.1.1.1/json/",
"https://ipapi.co/223.5.5.5/json/",
];
let tasks = urls.into_iter().map(|url| async move {
client.get(url).send().await?.json::<IpInfo>().await
});
let results = tokio::join!(tasks.collect::<Vec<_>>()[0].await); // 简化示意
for (idx, result) in results.into_iter().enumerate() {
match result {
Ok(info) => println!("IP {} 信息:{:?}", idx + 1, info),
Err(e) => eprintln!("IP {} 请求失败:{}", idx + 1, e),
}
}
Ok(())
}
use tokio;
use tokio::signal;
use log::{info, warn};
use env_logger::Builder;
use chrono::Local;
#[tokio::main]
async fn main() {
// 配置异步日志(结合 chrono 格式化时间戳)
Builder::new()
.format(|buf, record| {
let time = Local::now().format("%Y-%m-%d %H:%M:%S.%f").to_string();
writeln!(buf, "[{}] [{}] {}", time, record.level(), record.args())
})
.filter(None, log::LevelFilter::Info)
.init();
info!("服务启动成功,等待中断信号...");
// 监听系统中断信号(Ctrl+C)
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())?;
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
// 等待任一中断信号
tokio::select! {
_ = sigint.recv() => warn!("收到 SIGINT 信号,准备退出"),
_ = sigterm.recv() => warn!("收到 SIGTERM 信号,准备退出"),
}
info!("服务已优雅退出");
}
std::thread::sleep、重型计算),需用 tokio::task::spawn_blocking 将阻塞逻辑移交到阻塞线程池。tokio::sync 原语,避免混用 std::sync 原语导致运行时阻塞。Semaphore 控制并发数,防止内存溢出。JoinHandle::await 捕获任务恐慌,避免单个任务异常导致整个服务崩溃。await 或任务未被调度到运行时,需确保异步任务被 block_on 或 spawn 触发。spawn_blocking 迁移逻辑。Mutex,需梳理锁定顺序,或使用 try_lock() 避免死锁。worker_threads 调整核心线程数,匹配 CPU 核心数(通常设为 CPU 核心数或 2 倍)。Tokio 作为 Rust 异步编程的核心库,通过高效的任务调度、完善的 IO 工具、安全的同步原语,为高并发场景提供了一站式解决方案。无论是构建异步网络服务、处理海量 IO 任务,还是实现复杂的任务编排,Tokio 都能凭借'性能与安全兼顾'的优势,成为生产环境的首选。
使用 Tokio 的核心是'理解异步运行时的调度逻辑'——避免阻塞、合理编排任务、善用生态工具,才能充分发挥其性能优势。掌握本文的实践内容后,你可以轻松构建出高效、可靠的 Rust 异步应用,应对各类高并发业务场景。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online