跳到主要内容
Python asyncio 异步编程核心指南 | 极客日志
Python
Python asyncio 异步编程核心指南 Python asyncio 是标准库中的异步 I/O 框架,基于事件循环和协程实现单线程高并发。了 asyncio 的核心概念、运行机制及与多线程/多进程的区别,重点针对 I/O 密集型场景提供了实战示例。内容包括任务管理、超时控制、同步原语使用以及常见陷阱规避,帮助开发者编写高效、可维护的异步代码。
不羁 发布于 2026/1/8 更新于 2026/4/25 1 浏览什么是 asyncio
asyncio 是 Python 3.4+ 引入的标准库,用于编写并发代码的异步 I/O 框架。它利用事件循环和协程在单线程内实现高并发,特别适合处理网络请求、文件读写等 I/O 密集型任务。
核心优势:
单线程内实现高并发,避免线程切换开销
内存占用更少,资源消耗低
代码结构清晰,易于理解和调试
核心概念
协程 (Coroutine)
使用 async def 定义的特殊函数,可以在执行过程中暂停和恢复,不会阻塞整个程序。
async def my_coroutine ():
print ("开始执行" )
await asyncio.sleep(1 )
print ("执行完成" )
事件循环 (Event Loop)
这是 asyncio 的心脏,负责调度和执行协程。它不断检查就绪队列中的任务并分发执行。
await 关键字
用于等待异步操作完成,只能在 async 函数内部使用。遇到 await 时,当前协程会挂起,让出控制权给事件循环。
Task 与 Future
Task 是对协程的封装,可以并发执行多个协程;Future 则表示一个异步操作的最终结果。
基础用法
运行协程的几种方式
import asyncio
async def hello ():
print ("Hello" )
await asyncio.sleep(1 )
print ("World" )
asyncio.run(hello())
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
创建和管理任务
async def task1 ():
await asyncio.sleep( )
():
asyncio.sleep( )
():
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
results = asyncio.gather(t1, t2)
(results)
asyncio.run(main())
2
return
"任务 1 完成"
async
def
task2
await
1
return
"任务 2 完成"
async
def
main
await
print
并发执行 async def fetch_data (n ):
print (f"开始获取数据 {n} " )
await asyncio.sleep(1 )
print (f"完成获取数据 {n} " )
return f"数据 {n} "
async def main ():
results = await asyncio.gather(
fetch_data(1 ),
fetch_data(2 ),
fetch_data(3 )
)
print (results)
asyncio.run(main())
内部机理
事件循环的工作原理 事件循环维护着就绪队列和等待队列。当协程遇到 await 时,会被移入等待队列;一旦 I/O 操作完成,它会被唤醒并放回就绪队列继续执行。
┌─────────────────────────────────┐
│ 事件循环 (Event Loop) │
│ │
│ ┌──────────────────────────┐ │
│ │ 就绪队列 (Ready Queue) │ │
│ │ [task1, task2, task3] │ │
│ └──────────────────────────┘ │
│ │
│ ┌──────────────────────────┐ │
│ │ 等待队列 (Wait Queue) │ │
│ │ [task4, task5] │ │
│ └──────────────────────────┘ │
│ │
│ ┌──────────────────────────┐ │
│ │ I /O 选择器 │ │
│ │ (epoll/kqueue/select) │ │
│ └──────────────────────────┘ │
└─────────────────────────────────┘
执行流程
初始化 :创建事件循环实例。
注册协程 :将协程包装成 Task 对象加入就绪队列。
调度执行 :从就绪队列取出 Task 执行,直到遇到 await 暂停。
I/O 多路复用 :监听 I/O 事件,不阻塞 CPU。
唤醒协程 :I/O 完成后,将 Task 移回就绪队列。
循环往复 :直到所有任务完成。
底层实现关键点 from collections import deque
import select
class EventLoop :
def __init__ (self ):
self ._ready = deque()
self ._selector = select.epoll()
def run_until_complete (self, coro ):
task = Task(coro)
self ._ready.append(task)
while self ._ready or self ._has_pending_io():
if self ._ready:
task = self ._ready.popleft()
task.step()
events = self ._selector.select(timeout=0 )
for event in events:
self ._ready.append(event.callback)
与多线程/多进程的区别 特性 asyncio 多线程 (threading) 多进程 (multiprocessing) 并发模型 协作式并发 抢占式并发 真正并行 运行环境 单线程 多线程 多进程 GIL 影响 无影响 受 GIL 限制 不受 GIL 限制 切换开销 极小 (用户态) 较大 (内核态) 最大 (进程切换) 内存占用 低 中等 高 适用场景 I/O 密集型 I/O 密集型 CPU 密集型 数据共享 简单 (同一线程) 需要锁 需要 IPC 调试难度 容易 困难 中等
使用场景详解
网络请求 (爬虫、API 调用)
文件 I/O
数据库查询
WebSocket 连接
异步消息队列
import aiohttp
import asyncio
async def fetch_url (session, url ):
async with session.get(url) as response:
return await response.text()
async def main ():
urls = ['http://example.com' ] * 100
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print (f"完成 {len (results)} 个请求" )
asyncio.run(main())
有阻塞 I/O 操作且无异步替代
需要并发执行的库不支持 asyncio
中等规模并发
import threading
import requests
def fetch_url (url ):
response = requests.get(url)
print (f"完成:{url} " )
urls = ['http://example.com' ] * 10
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
CPU 密集型计算
需要绕过 GIL
数据并行处理
from multiprocessing import Pool
def cpu_intensive (n ):
return sum (i*i for i in range (n))
with Pool(4 ) as pool:
results = pool.map (cpu_intensive, [10000000 ]*4 )
print (results)
性能对比示例 import asyncio
import threading
import time
def io_bound_sync ():
time.sleep(1 )
async def io_bound_async ():
await asyncio.sleep(1 )
async def test_asyncio ():
start = time.time()
await asyncio.gather(*[io_bound_async() for _ in range (100 )])
print (f"asyncio: {time.time()-start:.2 f} s" )
def test_threading ():
start = time.time()
threads = [threading.Thread(target=io_bound_sync) for _ in range (100 )]
for t in threads:
t.start()
for t in threads:
t.join()
print (f"threading: {time.time()-start:.2 f} s" )
高级特性
1. 异步上下文管理器 class AsyncResource :
async def __aenter__ (self ):
print ("获取资源" )
await asyncio.sleep(1 )
return self
async def __aexit__ (self, exc_type, exc_val, exc_tb ):
print ("释放资源" )
await asyncio.sleep(1 )
async def main ():
async with AsyncResource() as resource:
print ("使用资源" )
asyncio.run(main())
2. 异步迭代器 class AsyncRange :
def __init__ (self, start, end ):
self .current = start
self .end = end
def __aiter__ (self ):
return self
async def __anext__ (self ):
if self .current >= self .end:
raise StopAsyncIteration
await asyncio.sleep(0.1 )
self .current += 1
return self .current - 1
async def main ():
async for i in AsyncRange(0 , 5 ):
print (i)
asyncio.run(main())
3. 异步生成器 async def async_generator ():
for i in range (5 ):
await asyncio.sleep(1 )
yield i
async def main ():
async for value in async_generator():
print (value)
asyncio.run(main())
4. 超时控制 async def slow_operation ():
await asyncio.sleep(5 )
return "完成"
async def main ():
try :
result = await asyncio.wait_for(slow_operation(), timeout=2 )
except asyncio.TimeoutError:
print ("操作超时" )
asyncio.run(main())
5. 信号量和锁
async def limited_task (sem, n ):
async with sem:
print (f"任务 {n} 开始" )
await asyncio.sleep(1 )
print (f"任务 {n} 完成" )
async def main ():
sem = asyncio.Semaphore(3 )
await asyncio.gather(*[limited_task(sem, i) for i in range (10 )])
asyncio.run(main())
6. 队列 async def producer (queue, n ):
for i in range (n):
await queue.put(i)
print (f"生产:{i} " )
await asyncio.sleep(0.5 )
async def consumer (queue, name ):
while True :
item = await queue.get()
print (f"{name} 消费:{item} " )
await asyncio.sleep(1 )
queue.task_done()
async def main ():
queue = asyncio.Queue()
producers = [asyncio.create_task(producer(queue, 5 ))]
consumers = [asyncio.create_task(consumer(queue, f"消费者{i} " )) for i in range (2 )]
await asyncio.gather(*producers)
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())
实战示例
示例:异步网络爬虫 import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch_page (session, url ):
try :
async with session.get(url, timeout=10 ) as response:
return await response.text()
except Exception as e:
print (f"错误 {url} : {e} " )
return None
async def parse_page (html ):
if html:
soup = BeautifulSoup(html, 'html.parser' )
return soup.title.string if soup.title else "无标题"
return None
async def crawl_urls (urls ):
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
titles = await asyncio.gather(*[parse_page(page) for page in pages])
return titles
urls = ['http://example.com' , 'http://example.org' , 'http://example.net' ]
示例:异步数据库操作 import asyncio
import aiosqlite
async def create_table (db ):
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
''' )
await db.commit()
async def insert_user (db, name, email ):
await db.execute('INSERT INTO users (name, email) VALUES (?, ?)' , (name, email))
await db.commit()
async def fetch_users (db ):
async with db.execute('SELECT * FROM users' ) as cursor:
return await cursor.fetchall()
async def main ():
async with aiosqlite.connect('test.db' ) as db:
await create_table(db)
await asyncio.gather(
insert_user(db, 'Alice' , '[email protected] ' ),
insert_user(db, 'Bob' , '[email protected] ' ),
insert_user(db, 'Charlie' , '[email protected] ' )
)
users = await fetch_users(db)
for user in users:
print (user)
示例:异步 Web 服务器 import asyncio
async def handle_client (reader, writer ):
data = await reader.read(1024 )
message = data.decode()
addr = writer.get_extra_info('peername' )
print (f"收到来自 {addr} 的消息:{message} " )
response = f"Echo: {message} "
writer.write(response.encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def start_server ():
server = await asyncio.start_server(handle_client, '127.0.0.1' , 8888 )
addr = server.sockets[0 ].getsockname()
print (f"服务器启动在 {addr} " )
async with server:
await server.serve_forever()
示例:实时数据流处理 import asyncio
import random
async def data_stream ():
"""模拟数据流"""
while True :
yield random.randint(1 , 100 )
await asyncio.sleep(0.5 )
async def process_data (value ):
"""处理数据"""
await asyncio.sleep(0.1 )
return value * 2
async def monitor_stream ():
"""监控和处理数据流"""
buffer = []
async for data in data_stream():
print (f"接收数据:{data} " )
task = asyncio.create_task(process_data(data))
buffer.append(task)
if len (buffer) >= 5 :
results = await asyncio.gather(*buffer)
print (f"处理结果:{results} " )
buffer = []
if data > 20 :
break
示例:多任务协调 import asyncio
async def task_with_priority (name, priority, duration ):
print (f"[优先级 {priority} ] {name} 开始" )
await asyncio.sleep(duration)
print (f"[优先级 {priority} ] {name} 完成" )
return f"{name} 结果"
async def coordinator ():
high_priority = [
task_with_priority(f"高优先级-{i} " , 1 , 1 ) for i in range (3 )
]
low_priority = [
task_with_priority(f"低优先级-{i} " , 3 , 2 ) for i in range (3 )
]
high_results = await asyncio.gather(*high_priority)
print (f"高优先级完成:{high_results} " )
low_results = await asyncio.gather(*low_priority)
print (f"低优先级完成:{low_results} " )
asyncio.run(coordinator())
最佳实践
1. 错误处理 async def safe_task (n ):
try :
if n == 3 :
raise ValueError("错误的值" )
await asyncio.sleep(1 )
return f"任务 {n} 成功"
except Exception as e:
print (f"任务 {n} 失败:{e} " )
return None
async def main ():
results = await asyncio.gather(
safe_task(1 ),
safe_task(2 ),
safe_task(3 ),
return_exceptions=True
)
print (results)
asyncio.run(main())
2. 资源清理 class AsyncConnection :
async def __aenter__ (self ):
self .conn = await self .connect()
return self
async def __aexit__ (self, exc_type, exc_val, exc_tb ):
await self .close()
async def connect (self ):
print ("建立连接" )
await asyncio.sleep(1 )
return "connection"
async def close (self ):
print ("关闭连接" )
await asyncio.sleep(1 )
async def main ():
async with AsyncConnection() as conn:
print (f"使用连接:{conn.conn} " )
asyncio.run(main())
3. 避免阻塞 import asyncio
from concurrent.futures import ThreadPoolExecutor
async def bad_example ():
import time
time.sleep(5 )
async def good_example ():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, blocking_function)
return result
def blocking_function ():
import time
time.sleep(5 )
return "完成"
4. 合理设置超时 async def with_timeout ():
try :
result = await asyncio.wait_for(long_running_task(), timeout=5.0 )
except asyncio.TimeoutError:
print ("任务超时,执行回退逻辑" )
result = "默认值"
return result
5. 使用 TaskGroup (Python 3.11+) async def main ():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro())
task2 = tg.create_task(another_coro())
print ("所有任务完成" )
总结
asyncio 的核心优势
高性能 : 单线程处理大量并发,避免线程切换开销
低资源消耗 : 协程比线程更轻量,内存占用少
代码可读性 : async/await 语法让异步代码像同步代码一样易读
丰富的生态 : aiohttp、aiopg、aiomysql 等大量异步库支持
何时使用 asyncio
大量网络 I/O 操作 (爬虫、API 服务)
WebSocket 长连接
异步数据库操作
实时数据处理
微服务间通信
CPU 密集型计算 (使用多进程)
简单的小型脚本
依赖大量同步库的项目
团队不熟悉异步编程
学习路径建议
基础阶段 : 掌握 async/await、事件循环、Task 基本概念
进阶阶段 : 学习异步上下文管理器、生成器、同步原语
实战阶段 : 使用 aiohttp、aiopg 等库构建实际项目
优化阶段 : 性能分析、错误处理、最佳实践
常见陷阱
在异步函数中使用阻塞调用
忘记使用 await 导致协程未执行
过度使用 asyncio (简单任务反而降低效率)
忽略异常处理导致任务静默失败
不理解事件循环的单线程特性
参考资源 相关免费在线工具 curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online