Python asyncio 完全指南
目录
什么是 asyncio
asyncio 是 Python 3.4+ 引入的标准库,用于编写并发代码的异步 I/O 框架。它使用事件循环和协程实现单线程并发,特别适合处理 I/O 密集型任务。
核心优势:
- 单线程内实现高并发
- 避免线程切换开销
- 更少的内存占用
- 代码更易于理解和调试
核心概念
协程 (Coroutine)
使用 async def 定义的特殊函数,可以在执行过程中暂停和恢复。
asyncdefmy_coroutine():print("开始执行")await asyncio.sleep(1)print("执行完成")事件循环 (Event Loop)
asyncio 的核心,负责调度和执行协程。
await 关键字
用于等待异步操作完成,只能在 async 函数内使用。
Task
对协程的封装,可以并发执行多个协程。
Future
表示一个异步操作的最终结果。
基础用法
1. 运行协程的三种方式
import asyncio asyncdefhello():print("Hello")await asyncio.sleep(1)print("World")# 方式1: Python 3.7+ 推荐 asyncio.run(hello())# 方式2: 手动管理事件循环 loop = asyncio.get_event_loop() loop.run_until_complete(hello()) loop.close()# 方式3: 在已有事件循环中# await hello() # 只能在异步函数内使用2. 创建和管理任务
asyncdeftask1():await asyncio.sleep(2)return"任务1完成"asyncdeftask2():await asyncio.sleep(1)return"任务2完成"asyncdefmain():# 创建任务 t1 = asyncio.create_task(task1()) t2 = asyncio.create_task(task2())# 等待所有任务完成 results =await asyncio.gather(t1, t2)print(results) asyncio.run(main())3. 并发执行
asyncdeffetch_data(n):print(f"开始获取数据 {n}")await asyncio.sleep(1)print(f"完成获取数据 {n}")returnf"数据 {n}"asyncdefmain():# gather: 并发执行,按顺序返回结果 results =await asyncio.gather( fetch_data(1), fetch_data(2), fetch_data(3))print(results) asyncio.run(main())内部机理
事件循环的工作原理
┌─────────────────────────────────┐ │ 事件循环 (Event Loop) │ │ │ │ ┌──────────────────────────┐ │ │ │ 就绪队列 (Ready Queue) │ │ │ │ [task1, task2, task3] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ 等待队列 (Wait Queue) │ │ │ │ [task4, task5] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ I/O 选择器 │ │ │ │ (epoll/kqueue/select) │ │ │ └──────────────────────────┘ │ └─────────────────────────────────┘ 执行流程
- 初始化: 创建事件循环
- 注册协程: 将协程包装成 Task 对象
- 调度执行:
- 从就绪队列取出 Task
- 执行到 await 处暂停
- 注册到相应的等待队列
- I/O 多路复用: 监听 I/O 事件
- 唤醒协程: I/O 完成后,将 Task 移回就绪队列
- 循环往复: 直到所有任务完成
协程状态转换
asyncdefexample():print("1. 开始执行")# RUNNINGawait asyncio.sleep(1)# WAITING (挂起)print("2. 继续执行")# RUNNING (恢复)return"完成"# FINISHED底层实现关键点
# 简化的事件循环伪代码classEventLoop:def__init__(self): self._ready = deque()# 就绪队列 self._selector = select.epoll()# I/O 选择器defrun_until_complete(self, coro): task = Task(coro) self._ready.append(task)while self._ready or self._has_pending_io():# 执行就绪的任务if self._ready: task = self._ready.popleft() task.step()# 等待 I/O 事件 events = self._selector.select(timeout=0)for event in events: self._ready.append(event.callback)与多线程/多进程的区别
对比表格
| 特性 | asyncio | 多线程 (threading) | 多进程 (multiprocessing) |
|---|---|---|---|
| 并发模型 | 协作式并发 | 抢占式并发 | 真正并行 |
| 运行环境 | 单线程 | 多线程 | 多进程 |
| GIL 影响 | 无影响 | 受 GIL 限制 | 不受 GIL 限制 |
| 切换开销 | 极小(用户态) | 较大(内核态) | 最大(进程切换) |
| 内存占用 | 低 | 中等 | 高 |
| 适用场景 | I/O 密集型 | I/O 密集型 | CPU 密集型 |
| 数据共享 | 简单(同一线程) | 需要锁 | 需要 IPC |
| 调试难度 | 容易 | 困难 | 中等 |
使用场景详解
asyncio 适合的场景
- 网络请求(爬虫、API 调用)
- 文件 I/O
- 数据库查询
- WebSocket 连接
- 异步消息队列
# 典型的 asyncio 场景: 并发网络请求import aiohttp import asyncio asyncdeffetch_url(session, url):asyncwith session.get(url)as response:returnawait response.text()asyncdefmain(): urls =['http://example.com']*100asyncwith aiohttp.ClientSession()as session: tasks =[fetch_url(session, url)for url in urls] results =await asyncio.gather(*tasks)print(f"完成 {len(results)} 个请求") asyncio.run(main())多线程适合的场景
- 有阻塞 I/O 操作且无异步替代
- 需要并发执行的库不支持 asyncio
- 中等规模并发
import threading import requests deffetch_url(url): response = requests.get(url)print(f"完成: {url}") urls =['http://example.com']*10 threads =[threading.Thread(target=fetch_url, args=(url,))for url in urls]for t in threads: t.start()for t in threads: t.join()多进程适合的场景
- CPU 密集型计算
- 需要绕过 GIL
- 数据并行处理
from multiprocessing import Pool defcpu_intensive(n):returnsum(i*i for i inrange(n))with Pool(4)as pool: results = pool.map(cpu_intensive,[10000000]*4)print(results)性能对比示例
import asyncio import threading import multiprocessing import time # I/O 密集型任务defio_bound_sync(): time.sleep(1)asyncdefio_bound_async():await asyncio.sleep(1)# 测试 asyncioasyncdeftest_asyncio(): start = time.time()await asyncio.gather(*[io_bound_async()for _ inrange(100)])print(f"asyncio: {time.time()- start:.2f}s")# 约 1 秒# 测试多线程deftest_threading(): start = time.time() threads =[threading.Thread(target=io_bound_sync)for _ inrange(100)]for t in threads: t.start()for t in threads: t.join()print(f"threading: {time.time()- start:.2f}s")# 约 1-2 秒# asyncio 在 I/O 密集型任务中表现最优高级特性
1. 异步上下文管理器
classAsyncResource:asyncdef__aenter__(self):print("获取资源")await asyncio.sleep(1)return self asyncdef__aexit__(self, exc_type, exc_val, exc_tb):print("释放资源")await asyncio.sleep(1)asyncdefmain():asyncwith AsyncResource()as resource:print("使用资源") asyncio.run(main())2. 异步迭代器
classAsyncRange:def__init__(self, start, end): self.current = start self.end = end def__aiter__(self):return self asyncdef__anext__(self):if self.current >= self.end:raise StopAsyncIteration await asyncio.sleep(0.1) self.current +=1return self.current -1asyncdefmain():asyncfor i in AsyncRange(0,5):print(i) asyncio.run(main())3. 异步生成器
asyncdefasync_generator():for i inrange(5):await asyncio.sleep(1)yield i asyncdefmain():asyncfor value in async_generator():print(value) asyncio.run(main())4. 超时控制
asyncdefslow_operation():await asyncio.sleep(5)return"完成"asyncdefmain():try:# 设置 2 秒超时 result =await asyncio.wait_for(slow_operation(), timeout=2)except asyncio.TimeoutError:print("操作超时") asyncio.run(main())5. 信号量和锁
# 限制并发数量asyncdeflimited_task(sem, n):asyncwith sem:print(f"任务 {n} 开始")await asyncio.sleep(1)print(f"任务 {n} 完成")asyncdefmain(): sem = asyncio.Semaphore(3)# 最多 3 个并发await asyncio.gather(*[limited_task(sem, i)for i inrange(10)]) asyncio.run(main())6. 队列
asyncdefproducer(queue, n):for i inrange(n):await queue.put(i)print(f"生产: {i}")await asyncio.sleep(0.5)asyncdefconsumer(queue, name):whileTrue: item =await queue.get()print(f"{name} 消费: {item}")await asyncio.sleep(1) queue.task_done()asyncdefmain(): queue = asyncio.Queue()# 创建生产者和消费者 producers =[asyncio.create_task(producer(queue,5))] consumers =[asyncio.create_task(consumer(queue,f"消费者{i}"))for i inrange(2)]await asyncio.gather(*producers)await queue.join()# 等待所有任务处理完成for c in consumers: c.cancel() asyncio.run(main())实战示例
示例1: 异步网络爬虫
import asyncio import aiohttp from bs4 import BeautifulSoup asyncdeffetch_page(session, url):try:asyncwith session.get(url, timeout=10)as response:returnawait response.text()except Exception as e:print(f"错误 {url}: {e}")returnNoneasyncdefparse_page(html):if html: soup = BeautifulSoup(html,'html.parser')return soup.title.string if soup.title else"无标题"returnNoneasyncdefcrawl_urls(urls):asyncwith aiohttp.ClientSession()as session: tasks =[fetch_page(session, url)for url in urls] pages =await asyncio.gather(*tasks) titles =await asyncio.gather(*[parse_page(page)for page in pages])return titles urls =['http://example.com','http://example.org','http://example.net',]# 执行爬虫# titles = asyncio.run(crawl_urls(urls))# print(titles)示例2: 异步数据库操作
import asyncio import aiosqlite asyncdefcreate_table(db):await db.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, name TEXT, email TEXT ) ''')await db.commit()asyncdefinsert_user(db, name, email):await db.execute('INSERT INTO users (name, email) VALUES (?, ?)',(name, email))await db.commit()asyncdeffetch_users(db):asyncwith db.execute('SELECT * FROM users')as cursor:returnawait cursor.fetchall()asyncdefmain():asyncwith aiosqlite.connect('test.db')as db:await create_table(db)# 并发插入await asyncio.gather( insert_user(db,'Alice','[email protected]'), insert_user(db,'Bob','[email protected]'), insert_user(db,'Charlie','[email protected]')) users =await fetch_users(db)for user in users:print(user)# asyncio.run(main())示例3: 异步 Web 服务器
import asyncio asyncdefhandle_client(reader, writer): data =await reader.read(1024) message = data.decode() addr = writer.get_extra_info('peername')print(f"收到来自 {addr} 的消息: {message}") response =f"Echo: {message}" writer.write(response.encode())await writer.drain() writer.close()await writer.wait_closed()asyncdefstart_server(): server =await asyncio.start_server( handle_client,'127.0.0.1',8888) addr = server.sockets[0].getsockname()print(f"服务器启动在 {addr}")asyncwith server:await server.serve_forever()# asyncio.run(start_server())示例4: 实时数据流处理
import asyncio import random asyncdefdata_stream():"""模拟数据流"""whileTrue:yield random.randint(1,100)await asyncio.sleep(0.5)asyncdefprocess_data(value):"""处理数据"""await asyncio.sleep(0.1)return value *2asyncdefmonitor_stream():"""监控和处理数据流"""buffer=[]asyncfor data in data_stream():print(f"接收数据: {data}")# 异步处理数据 task = asyncio.create_task(process_data(data))buffer.append(task)# 每 5 个数据批量处理iflen(buffer)>=5: results =await asyncio.gather(*buffer)print(f"处理结果: {results}")buffer=[]# 演示用,处理 20 个数据后停止if data >20:break# asyncio.run(monitor_stream())示例5: 多任务协调
import asyncio asyncdeftask_with_priority(name, priority, duration):print(f"[优先级 {priority}] {name} 开始")await asyncio.sleep(duration)print(f"[优先级 {priority}] {name} 完成")returnf"{name} 结果"asyncdefcoordinator():# 创建不同优先级的任务 high_priority =[ task_with_priority(f"高优先级-{i}",1,1)for i inrange(3)] low_priority =[ task_with_priority(f"低优先级-{i}",3,2)for i inrange(3)]# 先执行高优先级任务 high_results =await asyncio.gather(*high_priority)print(f"高优先级完成: {high_results}")# 再执行低优先级任务 low_results =await asyncio.gather(*low_priority)print(f"低优先级完成: {low_results}") asyncio.run(coordinator())最佳实践
1. 错误处理
asyncdefsafe_task(n):try:if n ==3:raise ValueError("错误的值")await asyncio.sleep(1)returnf"任务 {n} 成功"except Exception as e:print(f"任务 {n} 失败: {e}")returnNoneasyncdefmain(): results =await asyncio.gather( safe_task(1), safe_task(2), safe_task(3), return_exceptions=True# 不会因为一个任务失败而停止)print(results) asyncio.run(main())2. 资源清理
classAsyncConnection:asyncdef__aenter__(self): self.conn =await self.connect()return self asyncdef__aexit__(self, exc_type, exc_val, exc_tb):await self.close()asyncdefconnect(self):print("建立连接")await asyncio.sleep(1)return"connection"asyncdefclose(self):print("关闭连接")await asyncio.sleep(1)asyncdefmain():asyncwith AsyncConnection()as conn:print(f"使用连接: {conn.conn}") asyncio.run(main())3. 避免阻塞
import asyncio from concurrent.futures import ThreadPoolExecutor # 错误: 阻塞操作asyncdefbad_example(): time.sleep(5)# 这会阻塞整个事件循环!# 正确: 使用 executor 运行阻塞操作asyncdefgood_example(): loop = asyncio.get_event_loop()with ThreadPoolExecutor()as executor: result =await loop.run_in_executor(executor, blocking_function)return result defblocking_function():import time time.sleep(5)return"完成"4. 合理设置超时
asyncdefwith_timeout():try: result =await asyncio.wait_for( long_running_task(), timeout=5.0)except asyncio.TimeoutError:print("任务超时,执行回退逻辑") result ="默认值"return result 5. 使用 TaskGroup (Python 3.11+)
asyncdefmain():asyncwith asyncio.TaskGroup()as tg: task1 = tg.create_task(some_coro()) task2 = tg.create_task(another_coro())# 所有任务完成或某个任务失败时退出print("所有任务完成")总结
asyncio 的核心优势
- 高性能: 单线程处理大量并发,避免线程切换开销
- 低资源消耗: 协程比线程更轻量,内存占用少
- 代码可读性: async/await 语法让异步代码像同步代码一样易读
- 丰富的生态: aiohttp、aiopg、aiomysql 等大量异步库支持
何时使用 asyncio
✅ 适合使用:
- 大量网络 I/O 操作(爬虫、API 服务)
- WebSocket 长连接
- 异步数据库操作
- 实时数据处理
- 微服务间通信
❌ 不适合使用:
- CPU 密集型计算(使用多进程)
- 简单的小型脚本
- 依赖大量同步库的项目
- 团队不熟悉异步编程
学习路径建议
- 基础阶段: 掌握 async/await、事件循环、Task 基本概念
- 进阶阶段: 学习异步上下文管理器、生成器、同步原语
- 实战阶段: 使用 aiohttp、aiopg 等库构建实际项目
- 优化阶段: 性能分析、错误处理、最佳实践
常见陷阱
- 在异步函数中使用阻塞调用
- 忘记使用 await 导致协程未执行
- 过度使用 asyncio(简单任务反而降低效率)
- 忽略异常处理导致任务静默失败
- 不理解事件循环的单线程特性
参考资源
结语: asyncio 是 Python 异步编程的强大工具,掌握它可以显著提升 I/O 密集型应用的性能。理解其内部机制和最佳实践,能帮助你写出高效、可维护的异步代码。