Python asyncio 完全指南

目录

  1. 什么是 asyncio
  2. 核心概念
  3. 基础用法
  4. 内部机理
  5. 与多线程/多进程的区别
  6. 高级特性
  7. 实战示例
  8. 最佳实践
  9. 总结

什么是 asyncio

asyncio 是 Python 3.4+ 引入的标准库,用于编写并发代码的异步 I/O 框架。它使用事件循环和协程实现单线程并发,特别适合处理 I/O 密集型任务。

核心优势:

  • 单线程内实现高并发
  • 避免线程切换开销
  • 更少的内存占用
  • 代码更易于理解和调试

核心概念

协程 (Coroutine)

使用 async def 定义的特殊函数,可以在执行过程中暂停和恢复。

asyncdefmy_coroutine():print("开始执行")await asyncio.sleep(1)print("执行完成")

事件循环 (Event Loop)

asyncio 的核心,负责调度和执行协程。

await 关键字

用于等待异步操作完成,只能在 async 函数内使用。

Task

对协程的封装,可以并发执行多个协程。

Future

表示一个异步操作的最终结果。


基础用法

1. 运行协程的三种方式

import asyncio asyncdefhello():print("Hello")await asyncio.sleep(1)print("World")# 方式1: Python 3.7+ 推荐 asyncio.run(hello())# 方式2: 手动管理事件循环 loop = asyncio.get_event_loop() loop.run_until_complete(hello()) loop.close()# 方式3: 在已有事件循环中# await hello() # 只能在异步函数内使用

2. 创建和管理任务

asyncdeftask1():await asyncio.sleep(2)return"任务1完成"asyncdeftask2():await asyncio.sleep(1)return"任务2完成"asyncdefmain():# 创建任务 t1 = asyncio.create_task(task1()) t2 = asyncio.create_task(task2())# 等待所有任务完成 results =await asyncio.gather(t1, t2)print(results) asyncio.run(main())

3. 并发执行

asyncdeffetch_data(n):print(f"开始获取数据 {n}")await asyncio.sleep(1)print(f"完成获取数据 {n}")returnf"数据 {n}"asyncdefmain():# gather: 并发执行,按顺序返回结果 results =await asyncio.gather( fetch_data(1), fetch_data(2), fetch_data(3))print(results) asyncio.run(main())

内部机理

事件循环的工作原理

┌─────────────────────────────────┐ │ 事件循环 (Event Loop) │ │ │ │ ┌──────────────────────────┐ │ │ │ 就绪队列 (Ready Queue) │ │ │ │ [task1, task2, task3] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ 等待队列 (Wait Queue) │ │ │ │ [task4, task5] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ I/O 选择器 │ │ │ │ (epoll/kqueue/select) │ │ │ └──────────────────────────┘ │ └─────────────────────────────────┘ 

执行流程

  1. 初始化: 创建事件循环
  2. 注册协程: 将协程包装成 Task 对象
  3. 调度执行:
    • 从就绪队列取出 Task
    • 执行到 await 处暂停
    • 注册到相应的等待队列
  4. I/O 多路复用: 监听 I/O 事件
  5. 唤醒协程: I/O 完成后,将 Task 移回就绪队列
  6. 循环往复: 直到所有任务完成

协程状态转换

asyncdefexample():print("1. 开始执行")# RUNNINGawait asyncio.sleep(1)# WAITING (挂起)print("2. 继续执行")# RUNNING (恢复)return"完成"# FINISHED

底层实现关键点

# 简化的事件循环伪代码classEventLoop:def__init__(self): self._ready = deque()# 就绪队列 self._selector = select.epoll()# I/O 选择器defrun_until_complete(self, coro): task = Task(coro) self._ready.append(task)while self._ready or self._has_pending_io():# 执行就绪的任务if self._ready: task = self._ready.popleft() task.step()# 等待 I/O 事件 events = self._selector.select(timeout=0)for event in events: self._ready.append(event.callback)

与多线程/多进程的区别

对比表格

特性asyncio多线程 (threading)多进程 (multiprocessing)
并发模型协作式并发抢占式并发真正并行
运行环境单线程多线程多进程
GIL 影响无影响受 GIL 限制不受 GIL 限制
切换开销极小(用户态)较大(内核态)最大(进程切换)
内存占用中等
适用场景I/O 密集型I/O 密集型CPU 密集型
数据共享简单(同一线程)需要锁需要 IPC
调试难度容易困难中等

使用场景详解

asyncio 适合的场景
  • 网络请求(爬虫、API 调用)
  • 文件 I/O
  • 数据库查询
  • WebSocket 连接
  • 异步消息队列
# 典型的 asyncio 场景: 并发网络请求import aiohttp import asyncio asyncdeffetch_url(session, url):asyncwith session.get(url)as response:returnawait response.text()asyncdefmain(): urls =['http://example.com']*100asyncwith aiohttp.ClientSession()as session: tasks =[fetch_url(session, url)for url in urls] results =await asyncio.gather(*tasks)print(f"完成 {len(results)} 个请求") asyncio.run(main())
多线程适合的场景
  • 有阻塞 I/O 操作且无异步替代
  • 需要并发执行的库不支持 asyncio
  • 中等规模并发
import threading import requests deffetch_url(url): response = requests.get(url)print(f"完成: {url}") urls =['http://example.com']*10 threads =[threading.Thread(target=fetch_url, args=(url,))for url in urls]for t in threads: t.start()for t in threads: t.join()
多进程适合的场景
  • CPU 密集型计算
  • 需要绕过 GIL
  • 数据并行处理
from multiprocessing import Pool defcpu_intensive(n):returnsum(i*i for i inrange(n))with Pool(4)as pool: results = pool.map(cpu_intensive,[10000000]*4)print(results)

性能对比示例

import asyncio import threading import multiprocessing import time # I/O 密集型任务defio_bound_sync(): time.sleep(1)asyncdefio_bound_async():await asyncio.sleep(1)# 测试 asyncioasyncdeftest_asyncio(): start = time.time()await asyncio.gather(*[io_bound_async()for _ inrange(100)])print(f"asyncio: {time.time()- start:.2f}s")# 约 1 秒# 测试多线程deftest_threading(): start = time.time() threads =[threading.Thread(target=io_bound_sync)for _ inrange(100)]for t in threads: t.start()for t in threads: t.join()print(f"threading: {time.time()- start:.2f}s")# 约 1-2 秒# asyncio 在 I/O 密集型任务中表现最优

高级特性

1. 异步上下文管理器

classAsyncResource:asyncdef__aenter__(self):print("获取资源")await asyncio.sleep(1)return self asyncdef__aexit__(self, exc_type, exc_val, exc_tb):print("释放资源")await asyncio.sleep(1)asyncdefmain():asyncwith AsyncResource()as resource:print("使用资源") asyncio.run(main())

2. 异步迭代器

classAsyncRange:def__init__(self, start, end): self.current = start self.end = end def__aiter__(self):return self asyncdef__anext__(self):if self.current >= self.end:raise StopAsyncIteration await asyncio.sleep(0.1) self.current +=1return self.current -1asyncdefmain():asyncfor i in AsyncRange(0,5):print(i) asyncio.run(main())

3. 异步生成器

asyncdefasync_generator():for i inrange(5):await asyncio.sleep(1)yield i asyncdefmain():asyncfor value in async_generator():print(value) asyncio.run(main())

4. 超时控制

asyncdefslow_operation():await asyncio.sleep(5)return"完成"asyncdefmain():try:# 设置 2 秒超时 result =await asyncio.wait_for(slow_operation(), timeout=2)except asyncio.TimeoutError:print("操作超时") asyncio.run(main())

5. 信号量和锁

# 限制并发数量asyncdeflimited_task(sem, n):asyncwith sem:print(f"任务 {n} 开始")await asyncio.sleep(1)print(f"任务 {n} 完成")asyncdefmain(): sem = asyncio.Semaphore(3)# 最多 3 个并发await asyncio.gather(*[limited_task(sem, i)for i inrange(10)]) asyncio.run(main())

6. 队列

asyncdefproducer(queue, n):for i inrange(n):await queue.put(i)print(f"生产: {i}")await asyncio.sleep(0.5)asyncdefconsumer(queue, name):whileTrue: item =await queue.get()print(f"{name} 消费: {item}")await asyncio.sleep(1) queue.task_done()asyncdefmain(): queue = asyncio.Queue()# 创建生产者和消费者 producers =[asyncio.create_task(producer(queue,5))] consumers =[asyncio.create_task(consumer(queue,f"消费者{i}"))for i inrange(2)]await asyncio.gather(*producers)await queue.join()# 等待所有任务处理完成for c in consumers: c.cancel() asyncio.run(main())

实战示例

示例1: 异步网络爬虫

import asyncio import aiohttp from bs4 import BeautifulSoup asyncdeffetch_page(session, url):try:asyncwith session.get(url, timeout=10)as response:returnawait response.text()except Exception as e:print(f"错误 {url}: {e}")returnNoneasyncdefparse_page(html):if html: soup = BeautifulSoup(html,'html.parser')return soup.title.string if soup.title else"无标题"returnNoneasyncdefcrawl_urls(urls):asyncwith aiohttp.ClientSession()as session: tasks =[fetch_page(session, url)for url in urls] pages =await asyncio.gather(*tasks) titles =await asyncio.gather(*[parse_page(page)for page in pages])return titles urls =['http://example.com','http://example.org','http://example.net',]# 执行爬虫# titles = asyncio.run(crawl_urls(urls))# print(titles)

示例2: 异步数据库操作

import asyncio import aiosqlite asyncdefcreate_table(db):await db.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, name TEXT, email TEXT ) ''')await db.commit()asyncdefinsert_user(db, name, email):await db.execute('INSERT INTO users (name, email) VALUES (?, ?)',(name, email))await db.commit()asyncdeffetch_users(db):asyncwith db.execute('SELECT * FROM users')as cursor:returnawait cursor.fetchall()asyncdefmain():asyncwith aiosqlite.connect('test.db')as db:await create_table(db)# 并发插入await asyncio.gather( insert_user(db,'Alice','[email protected]'), insert_user(db,'Bob','[email protected]'), insert_user(db,'Charlie','[email protected]')) users =await fetch_users(db)for user in users:print(user)# asyncio.run(main())

示例3: 异步 Web 服务器

import asyncio asyncdefhandle_client(reader, writer): data =await reader.read(1024) message = data.decode() addr = writer.get_extra_info('peername')print(f"收到来自 {addr} 的消息: {message}") response =f"Echo: {message}" writer.write(response.encode())await writer.drain() writer.close()await writer.wait_closed()asyncdefstart_server(): server =await asyncio.start_server( handle_client,'127.0.0.1',8888) addr = server.sockets[0].getsockname()print(f"服务器启动在 {addr}")asyncwith server:await server.serve_forever()# asyncio.run(start_server())

示例4: 实时数据流处理

import asyncio import random asyncdefdata_stream():"""模拟数据流"""whileTrue:yield random.randint(1,100)await asyncio.sleep(0.5)asyncdefprocess_data(value):"""处理数据"""await asyncio.sleep(0.1)return value *2asyncdefmonitor_stream():"""监控和处理数据流"""buffer=[]asyncfor data in data_stream():print(f"接收数据: {data}")# 异步处理数据 task = asyncio.create_task(process_data(data))buffer.append(task)# 每 5 个数据批量处理iflen(buffer)>=5: results =await asyncio.gather(*buffer)print(f"处理结果: {results}")buffer=[]# 演示用,处理 20 个数据后停止if data >20:break# asyncio.run(monitor_stream())

示例5: 多任务协调

import asyncio asyncdeftask_with_priority(name, priority, duration):print(f"[优先级 {priority}] {name} 开始")await asyncio.sleep(duration)print(f"[优先级 {priority}] {name} 完成")returnf"{name} 结果"asyncdefcoordinator():# 创建不同优先级的任务 high_priority =[ task_with_priority(f"高优先级-{i}",1,1)for i inrange(3)] low_priority =[ task_with_priority(f"低优先级-{i}",3,2)for i inrange(3)]# 先执行高优先级任务 high_results =await asyncio.gather(*high_priority)print(f"高优先级完成: {high_results}")# 再执行低优先级任务 low_results =await asyncio.gather(*low_priority)print(f"低优先级完成: {low_results}") asyncio.run(coordinator())

最佳实践

1. 错误处理

asyncdefsafe_task(n):try:if n ==3:raise ValueError("错误的值")await asyncio.sleep(1)returnf"任务 {n} 成功"except Exception as e:print(f"任务 {n} 失败: {e}")returnNoneasyncdefmain(): results =await asyncio.gather( safe_task(1), safe_task(2), safe_task(3), return_exceptions=True# 不会因为一个任务失败而停止)print(results) asyncio.run(main())

2. 资源清理

classAsyncConnection:asyncdef__aenter__(self): self.conn =await self.connect()return self asyncdef__aexit__(self, exc_type, exc_val, exc_tb):await self.close()asyncdefconnect(self):print("建立连接")await asyncio.sleep(1)return"connection"asyncdefclose(self):print("关闭连接")await asyncio.sleep(1)asyncdefmain():asyncwith AsyncConnection()as conn:print(f"使用连接: {conn.conn}") asyncio.run(main())

3. 避免阻塞

import asyncio from concurrent.futures import ThreadPoolExecutor # 错误: 阻塞操作asyncdefbad_example(): time.sleep(5)# 这会阻塞整个事件循环!# 正确: 使用 executor 运行阻塞操作asyncdefgood_example(): loop = asyncio.get_event_loop()with ThreadPoolExecutor()as executor: result =await loop.run_in_executor(executor, blocking_function)return result defblocking_function():import time time.sleep(5)return"完成"

4. 合理设置超时

asyncdefwith_timeout():try: result =await asyncio.wait_for( long_running_task(), timeout=5.0)except asyncio.TimeoutError:print("任务超时,执行回退逻辑") result ="默认值"return result 

5. 使用 TaskGroup (Python 3.11+)

asyncdefmain():asyncwith asyncio.TaskGroup()as tg: task1 = tg.create_task(some_coro()) task2 = tg.create_task(another_coro())# 所有任务完成或某个任务失败时退出print("所有任务完成")

总结

asyncio 的核心优势

  1. 高性能: 单线程处理大量并发,避免线程切换开销
  2. 低资源消耗: 协程比线程更轻量,内存占用少
  3. 代码可读性: async/await 语法让异步代码像同步代码一样易读
  4. 丰富的生态: aiohttp、aiopg、aiomysql 等大量异步库支持

何时使用 asyncio

✅ 适合使用:

  • 大量网络 I/O 操作(爬虫、API 服务)
  • WebSocket 长连接
  • 异步数据库操作
  • 实时数据处理
  • 微服务间通信

❌ 不适合使用:

  • CPU 密集型计算(使用多进程)
  • 简单的小型脚本
  • 依赖大量同步库的项目
  • 团队不熟悉异步编程

学习路径建议

  1. 基础阶段: 掌握 async/await、事件循环、Task 基本概念
  2. 进阶阶段: 学习异步上下文管理器、生成器、同步原语
  3. 实战阶段: 使用 aiohttp、aiopg 等库构建实际项目
  4. 优化阶段: 性能分析、错误处理、最佳实践

常见陷阱

  • 在异步函数中使用阻塞调用
  • 忘记使用 await 导致协程未执行
  • 过度使用 asyncio(简单任务反而降低效率)
  • 忽略异常处理导致任务静默失败
  • 不理解事件循环的单线程特性

参考资源


结语: asyncio 是 Python 异步编程的强大工具,掌握它可以显著提升 I/O 密集型应用的性能。理解其内部机制和最佳实践,能帮助你写出高效、可维护的异步代码。

Could not load content