深入理解 Python 异步编程:async、await 与同步函数详解
目录
引言
在现代 Python 开发中,异步编程已经成为处理 I/O 密集型任务的标准方式。理解 async、await 和同步函数的区别,对于编写高效、可扩展的应用程序至关重要。
本文将通过理论解释、代码示例和实际场景,帮助你全面理解 Python 异步编程的核心概念。
同步函数 vs 异步函数
什么是同步函数?
同步函数是传统的函数调用方式,代码按顺序执行,每一步都必须等待前一步完成。
import time defsync_function():"""同步函数示例"""print("开始执行") time.sleep(2)# 阻塞 2 秒print("执行完成")return"结果"# 调用同步函数 result = sync_function()print(result)特点:
- ✅ 简单直观,易于理解
- ✅ 代码执行顺序清晰
- ❌ 阻塞执行,无法并发
- ❌ 处理大量 I/O 操作时效率低下
什么是异步函数?
异步函数使用 async def 定义,可以在等待 I/O 操作时让出控制权,允许其他任务并发执行。
import asyncio asyncdefasync_function():"""异步函数示例"""print("开始执行")await asyncio.sleep(2)# 异步等待 2 秒print("执行完成")return"结果"# 调用异步函数asyncdefmain(): result =await async_function()print(result)# 运行异步函数 asyncio.run(main())特点:
- ✅ 非阻塞执行,可以并发处理多个任务
- ✅ 适合 I/O 密集型操作(网络请求、文件读写等)
- ✅ 提高资源利用率
- ❌ 代码复杂度较高
- ❌ 需要理解事件循环机制
对比示例:处理 100 个网络请求
import time import asyncio import aiohttp # ========== 同步方式 ==========defsync_fetch(url):"""同步获取网页内容""" time.sleep(1)# 模拟网络延迟returnf"Response from {url}"defsync_process_100_requests():"""同步处理 100 个请求""" start = time.time() results =[]for i inrange(100): result = sync_fetch(f"http://example.com/{i}") results.append(result) end = time.time()print(f"同步方式耗时: {end - start:.2f} 秒")return results # ========== 异步方式 ==========asyncdefasync_fetch(session, url):"""异步获取网页内容"""await asyncio.sleep(1)# 模拟网络延迟returnf"Response from {url}"asyncdefasync_process_100_requests():"""异步处理 100 个请求""" start = time.time()asyncwith aiohttp.ClientSession()as session: tasks =[async_fetch(session,f"http://example.com/{i}")for i inrange(100)] results =await asyncio.gather(*tasks) end = time.time()print(f"异步方式耗时: {end - start:.2f} 秒")return results # 运行对比if __name__ =="__main__":# 同步方式:约 100 秒 sync_process_100_requests()# 异步方式:约 1 秒(并发执行) asyncio.run(async_process_100_requests())结果对比:
- 同步方式:~100 秒(顺序执行)
- 异步方式:~1 秒(并发执行)
async 关键字详解
async 的作用
async 关键字用于定义一个协程函数(coroutine function),它返回一个协程对象(coroutine object)。
asyncdefmy_async_function():return"Hello"# 调用异步函数 coro = my_async_function()print(type(coro))# <class 'coroutine'># ❌ 错误:直接调用不会执行 result = my_async_function()# 这只是创建了协程对象,没有执行# ✅ 正确:需要使用 await 或 asyncio.run() result =await my_async_function()# 在异步上下文中# 或 result = asyncio.run(my_async_function())# 在同步上下文中async 函数的特征
可以包含 await 表达式
asyncdeffetch_data():# 可以 await 其他异步函数 data =await some_async_operation()return data 必须使用 await 或事件循环执行
# 方式 1:使用 await(在异步函数中)asyncdefmain(): result =await func()print(result)# 42# 方式 2:使用 asyncio.run() result = asyncio.run(func())print(result)# 42返回协程对象
asyncdeffunc():return42 coro = func()# 协程对象,尚未执行常见错误示例
# ❌ 错误 1:忘记 awaitasyncdefget_data():return"data"defmain(): result = get_data()# 错误!result 是协程对象,不是字符串print(result)# <coroutine object get_data at 0x...># ✅ 正确asyncdefmain(): result =await get_data()# 正确print(result)# "data"# ❌ 错误 2:在同步函数中直接调用异步函数defsync_func(): result =await async_func()# 错误!await 只能在 async 函数中使用# ✅ 正确asyncdefasync_func(): result =await async_func()# 正确# 或在同步函数中使用 asyncio.run()defsync_func(): result = asyncio.run(async_func())# 正确await 关键字详解
await 的作用机制
await 关键字用于暂停当前协程的执行,等待异步操作完成,同时让出控制权给事件循环,允许其他协程运行。
asyncdeftask1():print("Task 1 开始")await asyncio.sleep(2)# 暂停,让出控制权print("Task 1 完成")asyncdeftask2():print("Task 2 开始")await asyncio.sleep(1)# 暂停,让出控制权print("Task 2 完成")asyncdefmain():# 并发执行两个任务await asyncio.gather(task1(), task2()) asyncio.run(main())# 输出:# Task 1 开始# Task 2 开始# Task 2 完成 (1 秒后)# Task 1 完成 (2 秒后)# 总耗时:约 2 秒(不是 3 秒)await 的执行流程
1. 遇到 await 表达式 ↓ 2. 暂停当前协程 ↓ 3. 将控制权返回给事件循环 ↓ 4. 事件循环执行其他就绪的协程 ↓ 5. await 的表达式完成后 ↓ 6. 恢复当前协程的执行 ↓ 7. 返回结果 await vs 同步阻塞
这是最容易混淆的概念!
import time import asyncio # ========== 同步阻塞 ==========defsync_blocking():print("开始阻塞") time.sleep(2)# 阻塞整个线程print("阻塞结束")# 在这 2 秒内,整个线程无法做任何其他事情# ========== 异步等待 ==========asyncdefasync_waiting():print("开始等待")await asyncio.sleep(2)# 暂停协程,但不阻塞线程print("等待结束")# 在这 2 秒内,其他协程可以运行# 对比演示asyncdefdemo():print("=== 同步阻塞演示 ===") start = time.time() sync_blocking() sync_blocking()print(f"耗时: {time.time()- start:.2f} 秒\n")# 约 4 秒print("=== 异步等待演示 ===") start = time.time()await asyncio.gather(async_waiting(), async_waiting())print(f"耗时: {time.time()- start:.2f} 秒")# 约 2 秒 asyncio.run(demo())关键区别:
time.sleep():阻塞整个线程,其他代码无法执行await asyncio.sleep():暂停当前协程,其他协程可以执行
await 可以等待什么?
# 1. 其他协程对象asyncdeffunc1():return"result"asyncdeffunc2(): result =await func1()# ✅ 可以return result # 2. 可等待对象(Awaitable)# - 协程(Coroutine)# - 任务(Task)# - Future 对象# 3. 异步上下文管理器asyncdefexample():asyncwith aiofiles.open('file.txt')as f: content =await f.read()# ✅ 可以# 4. 异步迭代器asyncdefexample():asyncfor line in async_file_reader():print(line)# ✅ 可以asyncio.create_task 详解
什么是 create_task?
asyncio.create_task() 是 Python 3.7+ 引入的函数,用于将协程包装成 Task 对象并立即调度执行。它是并发执行多个异步任务的关键工具。
import asyncio asyncdefmy_coroutine():await asyncio.sleep(1)return"完成"# create_task 将协程包装成 Task task = asyncio.create_task(my_coroutine())# Task 立即开始执行(不等待完成)create_task 的核心作用
- 立即调度执行:创建 Task 后,协程立即开始执行,不需要等待
- 返回 Task 对象:可以跟踪任务状态、取消任务、获取结果
- 并发执行:多个 Task 可以并发运行
import asyncio import time asyncdeftask(name:str, duration:int):print(f"[{time.time():.2f}] 任务 {name} 开始")await asyncio.sleep(duration)print(f"[{time.time():.2f}] 任务 {name} 完成")returnf"任务 {name} 的结果"asyncdefmain(): start = time.time()# 创建多个 Task,它们会并发执行 task1 = asyncio.create_task(task("A",2)) task2 = asyncio.create_task(task("B",1)) task3 = asyncio.create_task(task("C",3))# 等待所有任务完成 results =await asyncio.gather(task1, task2, task3) end = time.time()print(f"\n所有任务完成,耗时: {end - start:.2f} 秒")print(f"结果: {results}") asyncio.run(main())# 输出示例:# [12345.67] 任务 A 开始# [12345.67] 任务 B 开始# [12345.67] 任务 C 开始# [12346.67] 任务 B 完成# [12347.67] 任务 A 完成# [12348.67] 任务 C 完成# # 所有任务完成,耗时: 3.00 秒(不是 6 秒!)# 结果: ['任务 A 的结果', '任务 B 的结果', '任务 C 的结果']create_task vs await
这是最容易混淆的地方!
import asyncio asyncdefslow_task(name:str):print(f"{name} 开始")await asyncio.sleep(2)print(f"{name} 完成")returnf"{name} 的结果"# ========== 方式 1:顺序执行(使用 await)==========asyncdefsequential():print("=== 顺序执行 ===") result1 =await slow_task("任务1")# 等待完成 result2 =await slow_task("任务2")# 等待完成 result3 =await slow_task("任务3")# 等待完成# 总耗时:约 6 秒return[result1, result2, result3]# ========== 方式 2:并发执行(使用 create_task)==========asyncdefconcurrent():print("=== 并发执行 ===") task1 = asyncio.create_task(slow_task("任务1"))# 立即开始 task2 = asyncio.create_task(slow_task("任务2"))# 立即开始 task3 = asyncio.create_task(slow_task("任务3"))# 立即开始# 等待所有任务完成 result1 =await task1 result2 =await task2 result3 =await task3 # 总耗时:约 2 秒(并发执行)return[result1, result2, result3]# ========== 方式 3:使用 gather(推荐)==========asyncdefusing_gather():print("=== 使用 gather ===") tasks =[ asyncio.create_task(slow_task("任务1")), asyncio.create_task(slow_task("任务2")), asyncio.create_task(slow_task("任务3"))] results =await asyncio.gather(*tasks)# 总耗时:约 2 秒return results 关键区别:
| 方式 | 执行方式 | 耗时 | 适用场景 |
|---|---|---|---|
await coro() | 顺序执行 | 累加 | 需要顺序执行 |
create_task() + await | 并发执行 | 最大值 | 需要并发,且需要单独处理每个任务 |
asyncio.gather() | 并发执行 | 最大值 | 需要并发,且统一处理结果(推荐) |
create_task vs asyncio.gather
两者都可以实现并发,但使用场景不同:
import asyncio asyncdeffetch_data(url:str):await asyncio.sleep(1)# 模拟网络请求returnf"数据来自 {url}"# ========== 使用 create_task:需要单独控制每个任务 ==========asyncdefwith_create_task(): task1 = asyncio.create_task(fetch_data("url1")) task2 = asyncio.create_task(fetch_data("url2"))# 可以单独等待某个任务 result1 =await task1 print(f"第一个结果: {result1}")# 可以取消任务 task2.cancel()try: result2 =await task2 except asyncio.CancelledError:print("任务2被取消")# 可以检查任务状态print(f"任务1完成: {task1.done()}")print(f"任务2完成: {task2.done()}")# ========== 使用 gather:统一处理所有任务 ==========asyncdefwith_gather():# 更简洁,统一处理结果 results =await asyncio.gather( fetch_data("url1"), fetch_data("url2"), fetch_data("url3"))# results 是一个列表,包含所有结果return results # ========== gather 的错误处理 ==========asyncdefwith_gather_error_handling():asyncdeffetch_with_error(url:str):if url =="error":raise ValueError("模拟错误")await asyncio.sleep(1)returnf"数据来自 {url}"# return_exceptions=True:不抛出异常,返回异常对象 results =await asyncio.gather( fetch_with_error("url1"), fetch_with_error("error"), fetch_with_error("url3"), return_exceptions=True)for i, result inenumerate(results):ifisinstance(result, Exception):print(f"任务 {i} 失败: {result}")else:print(f"任务 {i} 成功: {result}")选择建议:
- 使用
create_task:需要单独控制任务(取消、检查状态、单独等待) - 使用
gather:统一处理多个任务,代码更简洁(推荐)
create_task 的实际应用场景
场景 1:后台任务(Fire-and-Forget)
import asyncio asyncdefsend_notification(user_id:int, message:str):"""发送通知(不需要等待完成)"""await asyncio.sleep(1)# 模拟发送print(f"通知已发送给用户 {user_id}: {message}")asyncdefprocess_order(order_id:int):"""处理订单"""print(f"开始处理订单 {order_id}")await asyncio.sleep(2)# 模拟处理print(f"订单 {order_id} 处理完成")# 发送通知(后台任务,不阻塞主流程) asyncio.create_task(send_notification(order_id,"订单处理完成"))# 注意:不 await,立即返回returnf"订单 {order_id} 已处理"asyncdefmain(): order =await process_order(123)print(f"主流程完成: {order}")# 等待一下,让后台任务完成await asyncio.sleep(2) asyncio.run(main())# 输出:# 开始处理订单 123# 订单 123 处理完成# 主流程完成: 订单 123 已处理# 通知已发送给用户 123: 订单处理完成场景 2:超时控制
import asyncio asyncdeffetch_with_timeout(url:str, timeout:float):"""带超时的请求""" task = asyncio.create_task(fetch_data(url))try: result =await asyncio.wait_for(task, timeout=timeout)return result except asyncio.TimeoutError: task.cancel()# 取消任务returnf"请求 {url} 超时"asyncdeffetch_data(url:str):await asyncio.sleep(5)# 模拟慢请求returnf"数据来自 {url}"asyncdefmain(): result =await fetch_with_timeout("http://example.com", timeout=2.0)print(result)# 请求 http://example.com 超时 asyncio.run(main())场景 3:任务取消和状态检查
import asyncio asyncdeflong_running_task(name:str, duration:int):try:print(f"任务 {name} 开始")await asyncio.sleep(duration)print(f"任务 {name} 完成")returnf"任务 {name} 的结果"except asyncio.CancelledError:print(f"任务 {name} 被取消")raiseasyncdefmain():# 创建任务 task1 = asyncio.create_task(long_running_task("A",5)) task2 = asyncio.create_task(long_running_task("B",3))# 等待 2 秒await asyncio.sleep(2)# 检查任务状态print(f"任务A完成: {task1.done()}")print(f"任务B完成: {task2.done()}")# 取消任务A task1.cancel()# 等待任务完成(或取消)try: result1 =await task1 except asyncio.CancelledError:print("任务A已被取消") result2 =await task2 print(f"任务B结果: {result2}") asyncio.run(main())create_task 的常见错误
错误 1:忘记 await Task
# ❌ 错误:创建任务后不等待asyncdefmain(): task = asyncio.create_task(some_async_function())# 没有 await,主函数可能提前结束return"完成"# ✅ 正确:确保任务完成asyncdefmain(): task = asyncio.create_task(some_async_function()) result =await task # 等待任务完成return result # 或者使用 gatherasyncdefmain():await asyncio.gather(some_async_function())return"完成"错误 2:在事件循环外使用
# ❌ 错误:没有运行事件循环defsync_function(): task = asyncio.create_task(async_function())# 错误!# ✅ 正确:在异步上下文中使用asyncdefasync_function(): task = asyncio.create_task(another_async_function())await task # 或者在同步函数中使用 asyncio.run()defsync_function(): asyncio.run(async_function())错误 3:混用 create_task 和直接 await
# ❌ 错误:混用导致顺序执行asyncdefmain():await slow_task("A")# 等待完成 task = asyncio.create_task(slow_task("B"))# 创建任务await slow_task("C")# 等待完成await task # 等待任务B# 实际执行顺序:A -> C -> B(不是并发)# ✅ 正确:统一使用 create_taskasyncdefmain(): task1 = asyncio.create_task(slow_task("A")) task2 = asyncio.create_task(slow_task("B")) task3 = asyncio.create_task(slow_task("C"))await asyncio.gather(task1, task2, task3)create_task 最佳实践
1. 后台任务管理
import asyncio from typing import List classBackgroundTaskManager:"""后台任务管理器"""def__init__(self): self.tasks: List[asyncio.Task]=[]defadd_task(self, coro):"""添加后台任务""" task = asyncio.create_task(coro) self.tasks.append(task)return task asyncdefwait_all(self):"""等待所有任务完成"""if self.tasks:await asyncio.gather(*self.tasks, return_exceptions=True)defcancel_all(self):"""取消所有任务"""for task in self.tasks:ifnot task.done(): task.cancel()# 使用示例asyncdefmain(): manager = BackgroundTaskManager()# 添加多个后台任务 manager.add_task(send_notification(1,"消息1")) manager.add_task(send_notification(2,"消息2"))# 主流程继续print("主流程执行中...")await asyncio.sleep(1)# 等待所有后台任务完成await manager.wait_all()print("所有任务完成")2. 任务包装器
import asyncio import logging asyncdefsafe_task(coro, task_name:str):"""安全的任务包装器,自动处理异常"""try: result =await coro logging.info(f"任务 {task_name} 完成")return result except Exception as e: logging.error(f"任务 {task_name} 失败: {e}")returnNoneasyncdefmain():# 创建安全的任务 task1 = asyncio.create_task(safe_task(risky_operation(),"任务1")) task2 = asyncio.create_task(safe_task(another_operation(),"任务2")) results =await asyncio.gather(task1, task2)return results 3. 限制并发数量
import asyncio classTaskLimiter:"""限制并发任务数量"""def__init__(self, max_concurrent:int): self.semaphore = asyncio.Semaphore(max_concurrent) self.tasks =[]asyncdefadd_task(self, coro):"""添加任务(受并发限制)"""asyncwith self.semaphore:returnawait coro asyncdefrun_tasks(self, coros):"""运行多个任务""" tasks =[asyncio.create_task(self.add_task(coro))for coro in coros]returnawait asyncio.gather(*tasks)# 使用示例:最多同时运行 3 个任务asyncdefmain(): limiter = TaskLimiter(max_concurrent=3) coros =[fetch_data(f"url{i}")for i inrange(10)] results =await limiter.run_tasks(coros)return results 总结:create_task 的核心要点
- 立即调度:
create_task()创建 Task 后立即开始执行 - 并发执行:多个 Task 可以并发运行
- 任务控制:可以取消、检查状态、获取结果
- 与 await 的区别:
await会等待完成,create_task()立即返回 - 与 gather 的关系:
gather()内部使用create_task(),但更简洁 - 适用场景:后台任务、超时控制、任务管理
实际应用场景
场景 1:Web API 请求处理
from fastapi import FastAPI import asyncio app = FastAPI()# ❌ 同步方式:阻塞服务器@app.get("/sync")defsync_endpoint():# 模拟数据库查询 time.sleep(1)# 阻塞!其他请求无法处理return{"status":"ok"}# ✅ 异步方式:非阻塞@app.get("/async")asyncdefasync_endpoint():# 模拟异步数据库查询await asyncio.sleep(1)# 不阻塞,其他请求可以处理return{"status":"ok"}效果:
- 同步方式:服务器每秒只能处理 1 个请求
- 异步方式:服务器可以并发处理多个请求
场景 2:批量文件处理
import aiofiles import asyncio from pathlib import Path # ❌ 同步方式defsync_process_files(file_paths): results =[]for path in file_paths:withopen(path,'r')as f: content = f.read()# 阻塞 results.append(process(content))return results # ✅ 异步方式asyncdefasync_process_files(file_paths):asyncdefprocess_file(path):asyncwith aiofiles.open(path,'r')as f: content =await f.read()# 非阻塞return process(content) tasks =[process_file(path)for path in file_paths] results =await asyncio.gather(*tasks)# 并发处理return results 场景 3:数据库操作
import asyncpg import asyncio # ✅ 异步数据库操作asyncdeffetch_users(): conn =await asyncpg.connect('postgresql://...')try: users =await conn.fetch('SELECT * FROM users')return users finally:await conn.close()# 并发查询多个表asyncdeffetch_all_data(): users, posts, comments =await asyncio.gather( fetch_users(), fetch_posts(), fetch_comments())return users, posts, comments 场景 4:任务调度与后台任务(使用 create_task)
import asyncio asyncdefbackground_task(task_id:str):"""后台任务:不需要等待完成"""print(f"任务 {task_id} 开始")await asyncio.sleep(5)# 模拟耗时操作print(f"任务 {task_id} 完成")asyncdefcritical_operation():"""关键操作:需要等待完成"""print("关键操作开始")await asyncio.sleep(1)print("关键操作完成")return"结果"asyncdefmain():# 关键操作:必须等待 result =await critical_operation()print(f"得到结果: {result}")# 后台任务:使用 create_task,不需要等待,立即返回 task1 = asyncio.create_task(background_task("task-1")) task2 = asyncio.create_task(background_task("task-2"))print("主流程继续执行...")# 后台任务在后台运行,不影响主流程# 如果需要,可以稍后等待任务完成await asyncio.sleep(6)# 等待后台任务完成print("所有任务完成") asyncio.run(main())常见误区
误区 1:await 就是同步等待
错误理解:
# 认为 await 就是同步阻塞await some_async_function()# 认为这会阻塞整个程序正确理解:
# await 是异步等待,会暂停当前协程,但允许其他协程运行asyncdeftask1():await asyncio.sleep(2)# 暂停 task1asyncdeftask2():await asyncio.sleep(1)# 暂停 task2# 两个任务并发执行,总耗时约 2 秒(不是 3 秒)await asyncio.gather(task1(), task2())误区 2:异步函数会自动并发
错误理解:
asyncdefmain():# 认为这样会自动并发await func1()await func2()await func3()实际情况:
# ❌ 顺序执行(不是并发)asyncdefmain():await func1()# 等待完成await func2()# 等待完成await func3()# 等待完成# 总耗时 = func1 + func2 + func3# ✅ 并发执行方式 1:使用 create_taskasyncdefmain(): task1 = asyncio.create_task(func1()) task2 = asyncio.create_task(func2()) task3 = asyncio.create_task(func3())await asyncio.gather(task1, task2, task3)# 总耗时 = max(func1, func2, func3)# ✅ 并发执行方式 2:使用 gather(推荐)asyncdefmain():await asyncio.gather(func1(), func2(), func3())# 总耗时 = max(func1, func2, func3)误区 3:所有函数都应该用 async
错误理解:
# 认为所有函数都应该改成 asyncasyncdefadd(a, b):return a + b # CPU 密集型操作,不需要 async正确做法:
# CPU 密集型:使用同步函数defadd(a, b):return a + b # I/O 密集型:使用异步函数asyncdeffetch_url(url):asyncwith aiohttp.ClientSession()as session:asyncwith session.get(url)as response:returnawait response.text()误区 4:忘记 await 异步函数
错误代码:
asyncdefget_data():return"data"defmain(): result = get_data()# ❌ 错误!result 是协程对象print(result)# <coroutine object ...>正确代码:
asyncdefget_data():return"data"asyncdefmain(): result =await get_data()# ✅ 正确print(result)# "data"误区 5:create_task 后忘记等待
错误代码:
asyncdefmain():# ❌ 错误:创建任务后不等待,主函数可能提前结束 asyncio.create_task(long_running_task())return"完成"# 任务可能还没完成就返回了正确代码:
asyncdefmain():# ✅ 正确方式 1:等待任务完成 task = asyncio.create_task(long_running_task()) result =await task return result # ✅ 正确方式 2:使用 gatherawait asyncio.gather(long_running_task())return"完成"# ✅ 正确方式 3:后台任务(明确不需要等待) task = asyncio.create_task(background_task())# 确保事件循环运行足够长时间await asyncio.sleep(10)# 等待后台任务完成return"完成"最佳实践
1. 何时使用异步?
✅ 适合异步的场景:
- 网络请求(HTTP API、WebSocket)
- 文件 I/O(读写文件)
- 数据库操作
- 等待外部服务响应
- 处理大量并发连接
❌ 不适合异步的场景:
- CPU 密集型计算(图像处理、数值计算)
- 简单的同步操作(不需要并发)
- 已经优化的同步代码(不要为了异步而异步)
2. 异步函数设计原则
# ✅ 好的异步函数设计asyncdeffetch_user_data(user_id:int):"""清晰的异步函数"""asyncwith aiohttp.ClientSession()as session:asyncwith session.get(f'/api/users/{user_id}')as response:returnawait response.json()# ❌ 不好的设计:混合同步和异步asyncdefbad_function(): time.sleep(1)# ❌ 同步阻塞,破坏了异步的优势await asyncio.sleep(1)# ✅ 异步等待3. 错误处理
asyncdefrobust_async_function():try: result =await some_async_operation()return result except Exception as e: logger.error(f"操作失败: {e}")raise# 批量操作时的错误处理asyncdefbatch_operations(): tasks =[operation(i)for i inrange(10)] results =await asyncio.gather(*tasks, return_exceptions=True)for i, result inenumerate(results):ifisinstance(result, Exception):print(f"任务 {i} 失败: {result}")else:print(f"任务 {i} 成功: {result}")4. 资源管理
# ✅ 使用异步上下文管理器asyncdefprocess_file():asyncwith aiofiles.open('file.txt','r')as f: content =await f.read()# 文件自动关闭# ✅ 数据库连接管理asyncdefquery_database():asyncwith asyncpg.create_pool('postgresql://...')as pool:asyncwith pool.acquire()as conn: result =await conn.fetch('SELECT * FROM users')# 连接自动释放5. 性能优化技巧
# ✅ 使用 asyncio.gather 并发执行asyncdeffetch_multiple_urls(urls):asyncwith aiohttp.ClientSession()as session: tasks =[fetch_url(session, url)for url in urls] results =await asyncio.gather(*tasks)return results # ✅ 使用 create_task 实现更细粒度的控制asyncdeffetch_with_control(urls):asyncwith aiohttp.ClientSession()as session: tasks =[asyncio.create_task(fetch_url(session, url))for url in urls]# 可以单独处理每个任务 results =[]for task in tasks:try: result =await task results.append(result)except Exception as e:print(f"任务失败: {e}") results.append(None)return results # ✅ 限制并发数量(避免过多并发)asyncdeffetch_with_limit(urls, limit=10): semaphore = asyncio.Semaphore(limit)asyncdeffetch_with_semaphore(url):asyncwith semaphore:returnawait fetch_url(url) tasks =[asyncio.create_task(fetch_with_semaphore(url))for url in urls]returnawait asyncio.gather(*tasks)总结
核心概念回顾
- 同步函数:顺序执行,阻塞等待
- 简单直观
- 适合 CPU 密集型任务
- 不适合大量 I/O 操作
- 异步函数(async):定义协程函数
- 返回协程对象
- 必须用 await 或事件循环执行
- 适合 I/O 密集型任务
- await 关键字:异步等待
- 暂停当前协程
- 让出控制权给事件循环
- 不阻塞整个线程
- 允许其他协程并发执行
- asyncio.create_task():创建并调度任务
- 将协程包装成 Task 对象
- 立即开始执行(不等待完成)
- 实现并发执行多个任务
- 可以取消、检查状态、获取结果
关键区别总结
| 特性 | 同步函数 | 异步函数 + await | create_task |
|---|---|---|---|
| 执行方式 | 顺序执行 | 顺序执行(单个协程) | 并发执行(多个任务) |
| 阻塞性 | 阻塞线程 | 暂停协程,不阻塞线程 | 不阻塞,立即返回 |
| 适用场景 | CPU 密集型 | I/O 密集型(单个) | I/O 密集型(多个并发) |
| 并发能力 | 无 | 无(单个) | 高(多个) |
| 代码复杂度 | 低 | 中 | 中高 |
| 返回类型 | 直接返回值 | 协程对象 | Task 对象 |
记忆要点
- async def = 定义异步函数,返回协程对象
- await = 异步等待,暂停协程但不阻塞线程
- 异步函数必须用 await,否则不会执行
- await 不等于同步阻塞,它允许并发执行
- I/O 操作用异步,CPU 计算用同步
- create_task() = 创建任务并立即调度,实现并发执行
- 多个 await 顺序执行,多个 create_task 并发执行
- gather() 内部使用 create_task(),但更简洁
实践建议
- 🎯 从简单的异步函数开始练习
- 🎯 理解事件循环的工作原理
- 🎯 在实际项目中逐步应用
- 🎯 注意错误处理和资源管理
- 🎯 不要过度使用异步(适合的场景才用)