Python 协程与异步编程实战笔记
I/O 密集型场景里,同步代码最容易把时间浪费在等待上。异步编程的价值很直接:把这段等待时间拿回来,让程序在网络、文件、数据库这些地方卡住时,先去做别的事。
异步编程在做什么
它不是'更快地执行同一件事',而是尽量别让线程傻等。发起耗时操作后,程序把控制权交回事件循环,等结果准备好再继续往下跑。这样做的好处是结构更轻,线程切换也少,尤其适合高并发但单次操作不重的任务。
常见场景主要是这些:
- 高并发网络通信,比如 HTTP、WebSocket
- 大文件读写
- 数据库查询
协程是异步代码的基本单位
Python 里用 async def 定义协程函数。调用它不会立刻执行,得到的是一个协程对象。真正跑起来,要靠 await 和事件循环配合。
import asyncio
async def hello():
print('Hello, World!')
await asyncio.sleep(1)
print('Hello again!')
# 运行协程
asyncio.run(hello())
协程遇到 await 时会主动让出控制权,等被等待的操作完成后再恢复。这个切换不是线程抢占,代码也不会被拆得太碎,读起来比回调顺手得多。
多个协程一起跑
如果想并发执行多个协程,asyncio.gather() 是最常用的入口之一:
import asyncio
async def count(name):
print(f'{name} Counting...')
await asyncio.sleep(1)
print(f'{name} Counted!')
async def main():
await asyncio.gather(count('A'), count('B'), count('C'))
asyncio.run(main())
任务调度和管理
直接 await 适合顺序控制;想让协程后台跑起来,通常会把它包成 Task。这样更方便取消、超时和统一管理。
创建与取消任务
import asyncio
async def task_func():
try:
print('Task running...')
await asyncio.sleep(5)
except asyncio.CancelledError:
print('Task cancelled!')
async def main():
task = asyncio.create_task(task_func())
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
print('Main: Task cancelled!')
asyncio.run(main())
设置超时
有些任务不能一直挂着,wait_for 用来兜底比较合适:
import asyncio
async def slow_task():
await asyncio.sleep(2)
return 'Done'
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=1)
print(result)
except asyncio.TimeoutError:
print('Task timed out!')
asyncio.run(main())
事件循环
事件循环负责调度所有协程。平时用 asyncio.run() 就够了,它会替你把循环建起来、跑完、再收掉。只有在调试老代码或嵌套环境时,才更需要直接碰 loop。
import asyncio
loop = asyncio.get_event_loop()
async def hello():
print('Hello from loop')
loop.run_until_complete(hello())
asyncio 和 aiohttp
asyncio 是标准库,负责底层调度;aiohttp 则更像现成的工具箱,适合做异步 HTTP 客户端和服务端。
安装依赖
pip install aiohttp
发送 HTTP 请求
ClientSession 会复用连接,避免每次请求都重新握手。这个细节在批量请求里挺重要,省下来的不是一点半点。
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'https://www.example.com')
print(html)
asyncio.run(main())
POST 和 JSON 也一样直接:
import aiohttp
import asyncio
import json
async def post_json(session, url, data):
async with session.post(url, json=data) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
payload = {'name': '张三', 'age': 25}
response = await post_json(session, 'https://httpbin.org/post', payload)
print(response)
asyncio.run(main())
两个常见例子
异步 HTTP 客户端
批量抓多个站点时,异步的优势比较明显:单个请求还是要等,但总耗时通常会比串行方式好看得多。
import aiohttp
import asyncio
import time
async def fetch(session, url):
start_time = time.time()
async with session.get(url) as response:
text = await response.text()
elapsed_time = time.time() - start_time
return url, len(text), elapsed_time
async def main():
urls = [
'https://www.example.com',
'https://www.google.com',
'https://www.github.com',
'https://www.python.org'
]
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
results = await asyncio.gather(*tasks)
for url, length, elapsed_time in results:
print(f'URL: {url}, Length: {length}, Time: {elapsed_time:.2f}s')
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main())
print(f'Total Time: {time.time() - start_time:.2f}s')
异步 Web 服务器
aiohttp.web 可以很快搭一个 API 服务。功能不复杂的时候,它上手很省事;如果项目大了,是否继续用它就要看团队习惯了。
from aiohttp import web
import asyncio
async def handle_root(request):
return web.Response(text='Hello, World!')
async def handle_api(request):
data = {'name': '张三', 'age': 25}
return web.json_response(data)
async def create_app():
app = web.Application()
app.add_routes([
web.get('/', handle_root),
web.get('/api', handle_api),
])
return app
if __name__ == '__main__':
# 启动服务监听 localhost:8080
asyncio.run(web.run_app(create_app(), host='localhost', port=8080))
结尾
异步编程最该先弄明白的,不是某个库的 API,而是事件循环怎么调度、await 到底让出了什么。把这层想清楚后,gather、create_task、wait_for 这些工具就很好用;aiohttp 也只是把常见场景封装得顺手一些。真要上手,还是得自己写几段代码跑一遍,看到阻塞变成并发,理解才算落地。


