跳到主要内容Python 异步编程实战:基于 async/await 的高并发实现 | 极客日志Python
Python 异步编程实战:基于 async/await 的高并发实现
本文介绍了 Python 异步编程的核心原理,包括协程工作机制与事件循环调度。详细讲解了 asyncio 核心组件(协程、任务、Future、事件循环)及原生 async/await 语法优势。通过高并发 HTTP 请求实战,演示了 aiohttp 使用、信号量控制并发及生产者 - 消费者模式。此外还涵盖异步上下文管理器、迭代器应用、性能优化原则、常见陷阱排查及最佳实践总结,帮助开发者掌握高并发 IO 处理技能。
字节跳动1 浏览 一、异步编程的核心原理
1.1 什么是异步编程?
传统的同步编程中,代码按照顺序一行行执行,遇到 IO 操作(如网络请求、文件读写)时,程序会阻塞等待操作完成,导致 CPU 空闲浪费。而异步编程的核心思想是:遇到 IO 操作时自动切换,IO 操作完成后自动切回,在单线程内实现高并发。
1.2 协程的工作原理
协程是异步编程的基础单元,它的工作流程如下:
┌─────────────────────────────────────────────────────────┐
│ 事件循环 (Event )
├─────────────────────────────────────────────────────────┤
│
│ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ │ 任务 │ ──> │ 任务 │ ──> │ 任务 │
│ │ (协程 A) │ │ (协程 B) │ │ (协程 C) │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │ │
│ ▼ ▼ ▼
│ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ │ IO 操作 │ │ IO 操作 │ │ IO 操作 │
│ └──────────┘ └──────────┘ └──────────┘
│ │ │ │
│ └────────────────┼────────────────┘
│ ▼
│ IO 完成,回调通知
│
└─────────────────────────────────────────────────────────┘
Loop
1
2
3
await
await
await
1
2
3
- 遇到 IO 自动挂起:执行到
await 关键字时,协程主动让出 CPU
- 事件循环调度:事件循环负责管理所有协程,当某个协程挂起时,立即切换到下一个就绪协程
- IO 完成自动恢复:底层通过
select/epoll 等机制监听 IO 事件,完成后将对应协程放回就绪队列
二、Python 异步编程的演进
2.1 发展阶段对比
| 版本 | 核心特性 | 代表作 | 缺点 |
|---|
| Python 2.x | 无原生支持 | gevent(第三方) | 猴子补丁,魔法太重 |
| Python 3.3 | yield from | asyncio 雏形 | 生成器协程混淆 |
| Python 3.5 | async/await | 原生协程 | - |
| Python 3.7+ | asyncio.run() | 极致简化 | - |
2.2 原生协程的优势
Python 3.5 引入的 async/await 语法彻底改变了异步编程:
@asyncio.coroutine
def hello():
yield from asyncio.sleep(1)
print('Hello')
async def hello():
await asyncio.sleep(1)
print('Hello')
三、asyncio 核心组件详解
3.1 四大核心对象
import asyncio
async def coro_func():
return 42
coro = coro_func()
task = asyncio.create_task(coro())
loop = asyncio.get_event_loop()
loop.run_until_complete(coro())
asyncio.run(coro())
3.2 awaitable 对象的三种类型
async def foo():
return 123
async def main():
task = asyncio.create_task(foo())
result = await task
fut = asyncio.Future()
asyncio.ensure_future(set_after(fut, 1, 456))
result = await fut
四、实战:高并发 HTTP 请求
4.1 基础用法
import asyncio
import aiohttp
import time
async def fetch_one(session, url):
"""单个 HTTP 请求"""
async with session.get(url) as response:
return await response.text()
async def main_simple():
"""简单示例:请求单个 URL"""
async with aiohttp.ClientSession() as session:
html = await fetch_one(session, 'http://httpbin.org/get')
print(f"响应长度:{len(html)}")
asyncio.run(main_simple())
4.2 高并发批量请求
import asyncio
import aiohttp
import time
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""单个 URL 请求,带错误处理"""
start = time.time()
try:
async with session.get(url, timeout=10) as response:
content = await response.text()
return {
'url': url,
'status': response.status,
'length': len(content),
'time': time.time() - start,
'success': True
}
except Exception as e:
return {
'url': url,
'error': str(e),
'time': time.time() - start,
'success': False
}
async def fetch_many(urls: List[str], max_concurrent: int = 10):
"""
高并发请求多个 URL
- 使用信号量控制并发数
- 收集所有结果
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_fetch(url):
async with semaphore:
return await fetch_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [bounded_fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
"""性能对比演示"""
urls = [
'http://httpbin.org/delay/1',
'http://httpbin.org/get',
'http://httpbin.org/json',
'http://httpbin.org/xml',
'http://httpbin.org/robots.txt',
'http://httpbin.org/anything',
'http://httpbin.org/uuid',
'http://httpbin.org/image',
'http://httpbin.org/headers',
'http://httpbin.org/ip'
] * 2
start = time.time()
results = await fetch_many(urls, max_concurrent=5)
async_time = time.time() - start
success_count = sum(1 for r in results if isinstance(r, dict) and r.get('success'))
print(f"异步并发请求 {len(urls)} 个 URL:")
print(f" 耗时:{async_time:.2f}秒")
print(f" 成功:{success_count}/{len(results)}")
if __name__ == "__main__":
asyncio.run(main())
4.3 高级模式:生产者 - 消费者
import asyncio
import aiohttp
from asyncio import Queue
async def producer(queue: Queue, urls: List[str]):
"""生产者:将 URL 放入队列"""
for url in urls:
await queue.put(url)
print(f"生产者放入:{url}")
for _ in range(3):
await queue.put(None)
async def consumer(queue: Queue, session: aiohttp.ClientSession, name: str):
"""消费者:从队列取 URL 并请求"""
while True:
url = await queue.get()
if url is None:
queue.task_done()
break
print(f"消费者{name} 处理:{url}")
try:
async with session.get(url) as resp:
text = await resp.text()
print(f"消费者{name} 完成:{url}, 大小:{len(text)}")
except Exception as e:
print(f"消费者{name} 失败:{url}, 错误:{e}")
finally:
queue.task_done()
async def producer_consumer_demo():
"""生产者 - 消费者模式示例"""
urls = ['http://httpbin.org/get'] * 20
queue = Queue(maxsize=5)
async with aiohttp.ClientSession() as session:
consumers = [
asyncio.create_task(consumer(queue, session, f"{i}"))
for i in range(3)
]
producer_task = asyncio.create_task(producer(queue, urls))
await asyncio.gather(producer_task, *consumers)
await queue.join()
asyncio.run(producer_consumer_demo())
五、异步上下文管理器与异步迭代器
5.1 异步上下文管理器(async with)
class AsyncResource:
"""模拟需要异步初始化和清理的资源"""
async def __aenter__(self):
print("正在获取资源...")
await asyncio.sleep(1)
print("资源已获取")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("正在释放资源...")
await asyncio.sleep(0.5)
print("资源已释放")
async def work(self):
return "资源使用中"
async def use_async_context():
async with AsyncResource() as resource:
result = await resource.work()
print(result)
class DatabaseConnection:
async def __aenter__(self):
self.conn = await create_db_connection()
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async def query_db():
async with DatabaseConnection() as conn:
return await conn.execute("SELECT * FROM users")
5.2 异步迭代器(async for)
import asyncio
class AsyncRange:
"""异步范围迭代器"""
def __init__(self, start, end, delay=0.1):
self.start = start
self.end = end
self.delay = delay
self.current = start
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(self.delay)
self.current += 1
return self.current - 1
async def main():
async for num in AsyncRange(1, 5):
print(f"异步生成:{num}")
asyncio.run(main())
class PaginatedAPI:
async def __aiter__(self):
return self
async def __anext__(self):
page = await self.fetch_page()
if not page:
raise StopAsyncIteration
return page
async def fetch_page(self):
pass
async def fetch_all_pages():
async for page in PaginatedAPI():
await process_page(page)
六、性能优化与最佳实践
6.1 七项核心原则
async def good():
task = asyncio.create_task(coro())
await task
async def bad():
await coro()
await coro2()
async def fetch_all():
results = await asyncio.gather(
fetch_url(url1),
fetch_url(url2),
return_exceptions=True
)
sem = asyncio.Semaphore(10)
async def bounded_fetch(url):
async with sem:
return await fetch_url(url)
async def fetch_with_timeout(url):
try:
return await asyncio.wait_for(fetch_url(url), timeout=5.0)
except asyncio.TimeoutError:
return None
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def cancellable():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("任务被取消")
await cleanup()
raise
6.2 性能对比测试框架
import asyncio
import aiohttp
import requests
import time
from functools import wraps
def timeit(func):
"""性能计时装饰器"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.perf_counter()
result = await func(*args, **kwargs)
cost = time.perf_counter() - start
print(f"{func.__name__} 耗时:{cost:.3f}秒")
return result
return async_wrapper
@timeit
async def async_benchmark():
"""异步版本性能测试"""
urls = ['http://httpbin.org/get'] * 20
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
def sync_benchmark():
"""同步版本性能测试(对比用)"""
urls = ['http://httpbin.org/get'] * 20
results = []
start = time.perf_counter()
for url in urls:
results.append(requests.get(url).text)
print(f"sync_benchmark 耗时:{time.perf_counter() - start:.3f}秒")
return results
asyncio.run(async_benchmark())
sync_benchmark()
七、常见陷阱与解决方案
7.1 常见错误排查
async def mistake1():
coro()
async def correct1():
await coro()
def mistake2():
asyncio.run(main())
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())
async def mistake3():
time.sleep(1)
await asyncio.sleep(0)
async def correct3():
await asyncio.sleep(1)
async def mistake4():
asyncio.create_task(work())
async def correct4():
task = asyncio.create_task(work())
await task
7.2 调试技巧
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)
loop = asyncio.get_event_loop()
loop.set_debug(True)
pending = asyncio.all_tasks(loop)
for task in pending:
print(f"未完成任务:{task}")
八、总结与展望
8.1 异步编程的核心要点
- 原理理解:协程遇 IO 自动切换,事件循环统一调度
- 语法掌握:
async/await、async with、async for、asyncio.gather()
- 库选择:使用 aiohttp、asyncpg 等原生异步库
- 模式应用:信号量限流、生产者 - 消费者、超时控制
- 性能意识:避免阻塞调用,合理设置并发数
8.2 适用场景
| 场景 | 推荐度 | 原因 |
|---|
| 网络爬虫 | 五星 | 大量 IO 等待,异步收益明显 |
| Web 应用 | 五星 | FastAPI、Sanic 等框架原生支持 |
| 数据库访问 | 四星 | 连接池 + 异步驱动,吞吐量提升 |
| CPU 密集型 | 两星 | 多进程更合适 |
| 简单脚本 | 三星 | 视 IO 密集程度而定 |
| · | | |
8.3 未来演进
Python 3.11+ 引入了更高效的 asyncio 实现,性能进一步提升。异步编程已成为 Python 生态中处理高并发 IO 任务的标准方案,掌握它是现代 Python 开发者必备的核心技能。
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown 转 HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
- HTML 转 Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online