Python asyncio 完整教程
什么是 asyncio
asyncio 是 Python 3.4 引入的标准库,用于编写并发代码。它使用单线程中的事件循环来实现并发,特别适合处理 I/O 密集型任务。
核心特点
| 特性 | 描述 |
|---|---|
| 单线程并发 | 使用事件循环在一个线程中处理多个任务 |
| 协程支持 | 通过 async/await 语法实现异步编程 |
| 非阻塞 I/O | 避免在等待 I/O 操作时阻塞整个程序 |
| 高并发性 | 轻松处理数千个并发连接 |
| 标准库集成 | Python 3.7+ 中与标准库深度集成 |
核心概念
1. 事件循环
定义:事件循环是 asyncio 的核心,负责运行异步任务和处理 I/O 操作。
工作原理:事件循环 continuously 检查任务是否准备好执行,如果没有准备好(比如在等待网络响应),它会切换到其他已准备好的任务执行。
示例:
import asyncio
async def main():
print("开始")
# 获取当前事件循环
loop = asyncio.get_running_loop()
print(f"当前事件循环:{loop}")
await asyncio.sleep(1)
print("结束")
asyncio.run(main())
2. 协程
定义:使用 async def 定义的函数,返回协程对象。
工作原理:协程执行到 await 表达式时,会暂停执行并返回控制权给事件循环,等待异步操作完成后继续执行。
示例:
import asyncio
async def fetch_data(url):
print(f"开始获取数据:{url}")
await asyncio.sleep(2) # 模拟网络请求
print(f"获取完成:{url}")
return f"数据来自 {url}"
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
3. 任务
定义:任务的协程的包装器,让协程可以并发执行。
工作原理:任务创建后会被放入事件循环,事件循环会在适当时机执行任务。
示例:
import asyncio
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"任务 {name} 的结果"
async def main():
# 创建多个任务
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(f"所有结果:{results}")
asyncio.run(main())
4. Future
定义:Future 代表一个最终会结果的异步操作的占位符。
工作原理:Future 可以被等待,当它完成时,等待它的协程会继续执行。
示例:
import asyncio
async def set_future_value(future):
await asyncio.sleep(2)
future.set_result("Future 的值已设置")
async def main():
# 创建 Future
future = asyncio.Future()
# 创建任务设置 Future 的值
task = asyncio.create_task(set_future_value(future))
# 等待 Future 完成
result = await future
print(f"Future 结果:{result}")
await task
asyncio.run(main())
概念关系图
事件循环 协程 任务 Future I/O 操作 定时器 await 表达式
基础功能详解
1. asyncio.run()
用途:运行协程的主入口,创建并运行事件循环。
基本语法:
asyncio.run(coro, *, debug=None)
简单示例:
import asyncio
async def hello():
print("Hello, asyncio!")
await asyncio.sleep(1)
print("Done!")
asyncio.run(hello())
最佳实践:
- 每个程序只调用一次
asyncio.run() - 在程序的主入口使用它
- 使用 debug=True 开启调试模式帮助发现潜在问题
2. asyncio.create_task()
用途:将协程包装成任务,实现并发执行。
基本语法:
asyncio.create_task(coro, *, name=None, context=None)
简单示例:
import asyncio
async def count(name, delay):
for i in range(3):
print(f"{name}: {i}")
await asyncio.sleep(delay)
async def main():
# 创建两个并发任务
task1 = asyncio.create_task(count("A", 0.5))
task2 = asyncio.create_task(count("B", 1))
# 等待任务完成
await asyncio.gather(task1, task2)
asyncio.run(main())
3. asyncio.gather()
用途:并发运行多个任务,收集所有结果。
基本语法:
asyncio.gather(*coros_or_futures, return_exceptions=False)
简单示例:
import asyncio
async def task1():
await asyncio.sleep(1)
return "任务 1 完成"
async def task2():
await asyncio.sleep(2)
return "任务 2 完成"
async def main():
# 并发运行两个任务
results = await asyncio.gather(task1(), task2())
print(f"结果:{results}")
asyncio.run(main())
异常处理:
# 使用 return_exceptions=True 获取所有结果,包括异常
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"异常:{result}")
else:
print(f"正常结果:{result}")
4. asyncio.wait_for()
用途:为异步操作设置超时时间。
简单示例:
import asyncio
async def slow_operation():
await asyncio.sleep(3)
return "操作完成"
async def main():
try:
# 设置 2 秒超时
result = await asyncio.wait_for(slow_operation(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main())
5. asyncio.sleep()
用途:非阻塞地暂停协程执行指定时间。
简单示例:
import asyncio
async def countdown():
for i in range(3, 0, -1):
print(f"{i}...")
await asyncio.sleep(1)
print("Go!")
asyncio.run(countdown())
重要提醒:使用 asyncio.sleep() 而不是 time.sleep() 来避免阻塞事件循环。
进阶主题
1. TaskGroup 和结构化并发(Python 3.11+)
TaskGroup 提供了结构化并发,它确保所有任务都成功完成,或者任何一个任务失败时自动取消其他所有任务。
示例:
import asyncio
async def fetch_data(url):
print(f"开始获取:{url}")
await asyncio.sleep(1)
if url == "error":
raise ValueError(f"无法获取 {url}")
return f"数据来自 {url}"
async def main():
# Python 3.11+ 使用 TaskGroup
try:
async with asyncio.TaskGroup() as tg:
# 创建多个相关任务
task1 = tg.create_task(fetch_data("url1"))
task2 = tg.create_task(fetch_data("url2"))
task3 = tg.create_task(fetch_data("error"))
# 这个会失败
# 只有当所有任务都成功时才会执行到这里
print(f"所有结果:{[task1.result(), task2.result(), task3.result()]}")
except ValueError as e:
print(f"捕获到错误:{e}")
# TaskGroup 自动取消了其他任务
asyncio.run(main())
2. 使用 asyncio 创建服务器
asyncio 提供了创建 TCP 和 UDP 服务器的能力,可以轻松构建高性能的网络服务。
示例:
import asyncio
async def handle_echo(reader, writer):
"""处理客户端连接"""
addr = writer.get_extra_info('peername')
print(f"连接来自:{addr}")
try:
while True:
# 读取客户端数据
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"收到来自 {addr}: {message}")
# 发送响应
response = f"Echo: {message}"
writer.write(response.encode())
await writer.drain()
except ConnectionResetError:
print(f"客户端 {addr} 断开连接")
finally:
print(f"关闭连接:{addr}")
writer.close()
await writer.wait_closed()
async def main():
# 创建 TCP 服务器
server = await asyncio.start_server(
handle_echo,
'127.0.0.1', # 监听地址
8888 # 监听端口
)
addr = server.sockets[0].getsockname()
print(f"服务器启动在 ")
server:
server.serve_forever()
asyncio.run(main())
3. 异步上下文管理器
异步上下文管理器使用 async with 语法,确保异步资源的正确管理。
示例:
import asyncio
import aiofiles
class AsyncDatabaseConnection:
def __init__(self, db_url):
self.db_url = db_url
self.connection = None
async def __aenter__(self):
"""进入上下文时连接数据库"""
print("连接数据库...")
# 模拟数据库连接
await asyncio.sleep(0.1)
self.connection = f"Connection to {self.db_url}"
print("数据库连接成功")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出上下文时关闭连接"""
print("关闭数据库连接...")
# 模拟关闭连接
await asyncio.sleep(0.1)
self.connection = None
print("数据库连接已关闭")
async def query(self, sql):
"""执行查询"""
print(f"执行查询:{sql}")
await asyncio.sleep(0.5) # 模拟查询耗时
():
AsyncDatabaseConnection() db:
result1 = db.query()
result2 = db.query()
()
()
asyncio.run(use_database())
4. 多线程和 asyncio 的结合
当遇到 CPU 密集型任务或必须使用的同步代码时,可以使用 loop.run_in_executor() 将它们放到线程池中执行。
示例:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# CPU 密集型函数
def cpu_intensive_task(n):
"""计算斐波那契数列"""
if n <= 1:
return n
return cpu_intensive_task(n-1) + cpu_intensive_task(n-2)
def blocking_io_task(filename):
"""模拟阻塞的 I/O 操作"""
time.sleep(2) # 模拟文件 I/O
return f"文件 {filename} 读取完成"
async def main():
# 创建线程池
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=4) as executor:
# 运行 CPU 密集型任务
print("开始执行 CPU 密集型任务...")
future1 = loop.run_in_executor(
executor, cpu_intensive_task, 35
)
# 运行阻塞 I/O 任务
print("开始执行阻塞 I/O 任务...")
future2 = loop.run_in_executor(
executor, blocking_io_task, "example.txt"
)
# 同时执行多个任务
print("执行其他异步任务...")
for i in range(3):
print(f"异步任务 ")
asyncio.sleep()
cpu_result = future1
io_result = future2
()
()
asyncio.run(main())
5. 高级错误处理和重试机制
在分布式系统中,网络请求可能因为各种原因失败,实现自动重试机制可以提高系统的可靠性。
示例:
import asyncio
import random
from typing import Callable, Any
class RetryError(Exception):
"""重试次数耗尽异常"""
pass
async def retry(
coro: Callable,
max_retries: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: tuple = (Exception,)
):
"""带指数退避的重试装饰器"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await coro()
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
# 计算延迟时间(指数退避)
wait_time = delay * (backoff_factor ** attempt)
print(f"第 {attempt + 1} 次尝试失败,{wait_time:.1f} 秒后重试...")
await asyncio.sleep(wait_time)
raise RetryError(f"重试 {max_retries} 次后仍然失败") from last_exception
async ():
asyncio.sleep()
random.random() < :
ConnectionError()
():
urls = [, , , , ]
url urls:
:
result = retry(
: unreliable_request(url),
max_retries=,
delay=,
backoff_factor=,
exceptions=(ConnectionError, TimeoutError)
)
()
RetryError e:
()
asyncio.run(main())

