深入理解 Python 异步编程:async、await 与同步函数详解

目录

  1. 引言
  2. 同步函数 vs 异步函数
  3. async 关键字详解
  4. await 关键字详解
  5. asyncio.create_task 详解
  6. 实际应用场景
  7. 常见误区
  8. 最佳实践
  9. 总结

引言

在现代 Python 开发中,异步编程已经成为处理 I/O 密集型任务的标准方式。理解 asyncawait 和同步函数的区别,对于编写高效、可扩展的应用程序至关重要。

本文将通过理论解释、代码示例和实际场景,帮助你全面理解 Python 异步编程的核心概念。


同步函数 vs 异步函数

什么是同步函数?

同步函数是传统的函数调用方式,代码按顺序执行,每一步都必须等待前一步完成。

import time defsync_function():"""同步函数示例"""print("开始执行") time.sleep(2)# 阻塞 2 秒print("执行完成")return"结果"# 调用同步函数 result = sync_function()print(result)

特点:

  • ✅ 简单直观,易于理解
  • ✅ 代码执行顺序清晰
  • ❌ 阻塞执行,无法并发
  • ❌ 处理大量 I/O 操作时效率低下

什么是异步函数?

异步函数使用 async def 定义,可以在等待 I/O 操作时让出控制权,允许其他任务并发执行。

import asyncio asyncdefasync_function():"""异步函数示例"""print("开始执行")await asyncio.sleep(2)# 异步等待 2 秒print("执行完成")return"结果"# 调用异步函数asyncdefmain(): result =await async_function()print(result)# 运行异步函数 asyncio.run(main())

特点:

  • ✅ 非阻塞执行,可以并发处理多个任务
  • ✅ 适合 I/O 密集型操作(网络请求、文件读写等)
  • ✅ 提高资源利用率
  • ❌ 代码复杂度较高
  • ❌ 需要理解事件循环机制

对比示例:处理 100 个网络请求

import time import asyncio import aiohttp # ========== 同步方式 ==========defsync_fetch(url):"""同步获取网页内容""" time.sleep(1)# 模拟网络延迟returnf"Response from {url}"defsync_process_100_requests():"""同步处理 100 个请求""" start = time.time() results =[]for i inrange(100): result = sync_fetch(f"http://example.com/{i}") results.append(result) end = time.time()print(f"同步方式耗时: {end - start:.2f} 秒")return results # ========== 异步方式 ==========asyncdefasync_fetch(session, url):"""异步获取网页内容"""await asyncio.sleep(1)# 模拟网络延迟returnf"Response from {url}"asyncdefasync_process_100_requests():"""异步处理 100 个请求""" start = time.time()asyncwith aiohttp.ClientSession()as session: tasks =[async_fetch(session,f"http://example.com/{i}")for i inrange(100)] results =await asyncio.gather(*tasks) end = time.time()print(f"异步方式耗时: {end - start:.2f} 秒")return results # 运行对比if __name__ =="__main__":# 同步方式:约 100 秒 sync_process_100_requests()# 异步方式:约 1 秒(并发执行) asyncio.run(async_process_100_requests())

结果对比:

  • 同步方式:~100 秒(顺序执行)
  • 异步方式:~1 秒(并发执行)

async 关键字详解

async 的作用

async 关键字用于定义一个协程函数(coroutine function),它返回一个协程对象(coroutine object)

asyncdefmy_async_function():return"Hello"# 调用异步函数 coro = my_async_function()print(type(coro))# <class 'coroutine'># ❌ 错误:直接调用不会执行 result = my_async_function()# 这只是创建了协程对象,没有执行# ✅ 正确:需要使用 await 或 asyncio.run() result =await my_async_function()# 在异步上下文中# 或 result = asyncio.run(my_async_function())# 在同步上下文中

async 函数的特征

可以包含 await 表达式

asyncdeffetch_data():# 可以 await 其他异步函数 data =await some_async_operation()return data 

必须使用 await 或事件循环执行

# 方式 1:使用 await(在异步函数中)asyncdefmain(): result =await func()print(result)# 42# 方式 2:使用 asyncio.run() result = asyncio.run(func())print(result)# 42

返回协程对象

asyncdeffunc():return42 coro = func()# 协程对象,尚未执行

常见错误示例

# ❌ 错误 1:忘记 awaitasyncdefget_data():return"data"defmain(): result = get_data()# 错误!result 是协程对象,不是字符串print(result)# <coroutine object get_data at 0x...># ✅ 正确asyncdefmain(): result =await get_data()# 正确print(result)# "data"# ❌ 错误 2:在同步函数中直接调用异步函数defsync_func(): result =await async_func()# 错误!await 只能在 async 函数中使用# ✅ 正确asyncdefasync_func(): result =await async_func()# 正确# 或在同步函数中使用 asyncio.run()defsync_func(): result = asyncio.run(async_func())# 正确

await 关键字详解

await 的作用机制

await 关键字用于暂停当前协程的执行,等待异步操作完成,同时让出控制权给事件循环,允许其他协程运行。

asyncdeftask1():print("Task 1 开始")await asyncio.sleep(2)# 暂停,让出控制权print("Task 1 完成")asyncdeftask2():print("Task 2 开始")await asyncio.sleep(1)# 暂停,让出控制权print("Task 2 完成")asyncdefmain():# 并发执行两个任务await asyncio.gather(task1(), task2()) asyncio.run(main())# 输出:# Task 1 开始# Task 2 开始# Task 2 完成 (1 秒后)# Task 1 完成 (2 秒后)# 总耗时:约 2 秒(不是 3 秒)

await 的执行流程

1. 遇到 await 表达式 ↓ 2. 暂停当前协程 ↓ 3. 将控制权返回给事件循环 ↓ 4. 事件循环执行其他就绪的协程 ↓ 5. await 的表达式完成后 ↓ 6. 恢复当前协程的执行 ↓ 7. 返回结果 

await vs 同步阻塞

这是最容易混淆的概念!

import time import asyncio # ========== 同步阻塞 ==========defsync_blocking():print("开始阻塞") time.sleep(2)# 阻塞整个线程print("阻塞结束")# 在这 2 秒内,整个线程无法做任何其他事情# ========== 异步等待 ==========asyncdefasync_waiting():print("开始等待")await asyncio.sleep(2)# 暂停协程,但不阻塞线程print("等待结束")# 在这 2 秒内,其他协程可以运行# 对比演示asyncdefdemo():print("=== 同步阻塞演示 ===") start = time.time() sync_blocking() sync_blocking()print(f"耗时: {time.time()- start:.2f} 秒\n")# 约 4 秒print("=== 异步等待演示 ===") start = time.time()await asyncio.gather(async_waiting(), async_waiting())print(f"耗时: {time.time()- start:.2f} 秒")# 约 2 秒 asyncio.run(demo())

关键区别:

  • time.sleep():阻塞整个线程,其他代码无法执行
  • await asyncio.sleep():暂停当前协程,其他协程可以执行

await 可以等待什么?

# 1. 其他协程对象asyncdeffunc1():return"result"asyncdeffunc2(): result =await func1()# ✅ 可以return result # 2. 可等待对象(Awaitable)# - 协程(Coroutine)# - 任务(Task)# - Future 对象# 3. 异步上下文管理器asyncdefexample():asyncwith aiofiles.open('file.txt')as f: content =await f.read()# ✅ 可以# 4. 异步迭代器asyncdefexample():asyncfor line in async_file_reader():print(line)# ✅ 可以

asyncio.create_task 详解

什么是 create_task?

asyncio.create_task() 是 Python 3.7+ 引入的函数,用于将协程包装成 Task 对象并立即调度执行。它是并发执行多个异步任务的关键工具。

import asyncio asyncdefmy_coroutine():await asyncio.sleep(1)return"完成"# create_task 将协程包装成 Task task = asyncio.create_task(my_coroutine())# Task 立即开始执行(不等待完成)

create_task 的核心作用

  1. 立即调度执行:创建 Task 后,协程立即开始执行,不需要等待
  2. 返回 Task 对象:可以跟踪任务状态、取消任务、获取结果
  3. 并发执行:多个 Task 可以并发运行
import asyncio import time asyncdeftask(name:str, duration:int):print(f"[{time.time():.2f}] 任务 {name} 开始")await asyncio.sleep(duration)print(f"[{time.time():.2f}] 任务 {name} 完成")returnf"任务 {name} 的结果"asyncdefmain(): start = time.time()# 创建多个 Task,它们会并发执行 task1 = asyncio.create_task(task("A",2)) task2 = asyncio.create_task(task("B",1)) task3 = asyncio.create_task(task("C",3))# 等待所有任务完成 results =await asyncio.gather(task1, task2, task3) end = time.time()print(f"\n所有任务完成,耗时: {end - start:.2f} 秒")print(f"结果: {results}") asyncio.run(main())# 输出示例:# [12345.67] 任务 A 开始# [12345.67] 任务 B 开始# [12345.67] 任务 C 开始# [12346.67] 任务 B 完成# [12347.67] 任务 A 完成# [12348.67] 任务 C 完成# # 所有任务完成,耗时: 3.00 秒(不是 6 秒!)# 结果: ['任务 A 的结果', '任务 B 的结果', '任务 C 的结果']

create_task vs await

这是最容易混淆的地方!

import asyncio asyncdefslow_task(name:str):print(f"{name} 开始")await asyncio.sleep(2)print(f"{name} 完成")returnf"{name} 的结果"# ========== 方式 1:顺序执行(使用 await)==========asyncdefsequential():print("=== 顺序执行 ===") result1 =await slow_task("任务1")# 等待完成 result2 =await slow_task("任务2")# 等待完成 result3 =await slow_task("任务3")# 等待完成# 总耗时:约 6 秒return[result1, result2, result3]# ========== 方式 2:并发执行(使用 create_task)==========asyncdefconcurrent():print("=== 并发执行 ===") task1 = asyncio.create_task(slow_task("任务1"))# 立即开始 task2 = asyncio.create_task(slow_task("任务2"))# 立即开始 task3 = asyncio.create_task(slow_task("任务3"))# 立即开始# 等待所有任务完成 result1 =await task1 result2 =await task2 result3 =await task3 # 总耗时:约 2 秒(并发执行)return[result1, result2, result3]# ========== 方式 3:使用 gather(推荐)==========asyncdefusing_gather():print("=== 使用 gather ===") tasks =[ asyncio.create_task(slow_task("任务1")), asyncio.create_task(slow_task("任务2")), asyncio.create_task(slow_task("任务3"))] results =await asyncio.gather(*tasks)# 总耗时:约 2 秒return results 

关键区别:

方式执行方式耗时适用场景
await coro()顺序执行累加需要顺序执行
create_task() + await并发执行最大值需要并发,且需要单独处理每个任务
asyncio.gather()并发执行最大值需要并发,且统一处理结果(推荐)

create_task vs asyncio.gather

两者都可以实现并发,但使用场景不同:

import asyncio asyncdeffetch_data(url:str):await asyncio.sleep(1)# 模拟网络请求returnf"数据来自 {url}"# ========== 使用 create_task:需要单独控制每个任务 ==========asyncdefwith_create_task(): task1 = asyncio.create_task(fetch_data("url1")) task2 = asyncio.create_task(fetch_data("url2"))# 可以单独等待某个任务 result1 =await task1 print(f"第一个结果: {result1}")# 可以取消任务 task2.cancel()try: result2 =await task2 except asyncio.CancelledError:print("任务2被取消")# 可以检查任务状态print(f"任务1完成: {task1.done()}")print(f"任务2完成: {task2.done()}")# ========== 使用 gather:统一处理所有任务 ==========asyncdefwith_gather():# 更简洁,统一处理结果 results =await asyncio.gather( fetch_data("url1"), fetch_data("url2"), fetch_data("url3"))# results 是一个列表,包含所有结果return results # ========== gather 的错误处理 ==========asyncdefwith_gather_error_handling():asyncdeffetch_with_error(url:str):if url =="error":raise ValueError("模拟错误")await asyncio.sleep(1)returnf"数据来自 {url}"# return_exceptions=True:不抛出异常,返回异常对象 results =await asyncio.gather( fetch_with_error("url1"), fetch_with_error("error"), fetch_with_error("url3"), return_exceptions=True)for i, result inenumerate(results):ifisinstance(result, Exception):print(f"任务 {i} 失败: {result}")else:print(f"任务 {i} 成功: {result}")

选择建议:

  • 使用 create_task:需要单独控制任务(取消、检查状态、单独等待)
  • 使用 gather:统一处理多个任务,代码更简洁(推荐)

create_task 的实际应用场景

场景 1:后台任务(Fire-and-Forget)
import asyncio asyncdefsend_notification(user_id:int, message:str):"""发送通知(不需要等待完成)"""await asyncio.sleep(1)# 模拟发送print(f"通知已发送给用户 {user_id}: {message}")asyncdefprocess_order(order_id:int):"""处理订单"""print(f"开始处理订单 {order_id}")await asyncio.sleep(2)# 模拟处理print(f"订单 {order_id} 处理完成")# 发送通知(后台任务,不阻塞主流程) asyncio.create_task(send_notification(order_id,"订单处理完成"))# 注意:不 await,立即返回returnf"订单 {order_id} 已处理"asyncdefmain(): order =await process_order(123)print(f"主流程完成: {order}")# 等待一下,让后台任务完成await asyncio.sleep(2) asyncio.run(main())# 输出:# 开始处理订单 123# 订单 123 处理完成# 主流程完成: 订单 123 已处理# 通知已发送给用户 123: 订单处理完成
场景 2:超时控制
import asyncio asyncdeffetch_with_timeout(url:str, timeout:float):"""带超时的请求""" task = asyncio.create_task(fetch_data(url))try: result =await asyncio.wait_for(task, timeout=timeout)return result except asyncio.TimeoutError: task.cancel()# 取消任务returnf"请求 {url} 超时"asyncdeffetch_data(url:str):await asyncio.sleep(5)# 模拟慢请求returnf"数据来自 {url}"asyncdefmain(): result =await fetch_with_timeout("http://example.com", timeout=2.0)print(result)# 请求 http://example.com 超时 asyncio.run(main())
场景 3:任务取消和状态检查
import asyncio asyncdeflong_running_task(name:str, duration:int):try:print(f"任务 {name} 开始")await asyncio.sleep(duration)print(f"任务 {name} 完成")returnf"任务 {name} 的结果"except asyncio.CancelledError:print(f"任务 {name} 被取消")raiseasyncdefmain():# 创建任务 task1 = asyncio.create_task(long_running_task("A",5)) task2 = asyncio.create_task(long_running_task("B",3))# 等待 2 秒await asyncio.sleep(2)# 检查任务状态print(f"任务A完成: {task1.done()}")print(f"任务B完成: {task2.done()}")# 取消任务A task1.cancel()# 等待任务完成(或取消)try: result1 =await task1 except asyncio.CancelledError:print("任务A已被取消") result2 =await task2 print(f"任务B结果: {result2}") asyncio.run(main())

create_task 的常见错误

错误 1:忘记 await Task
# ❌ 错误:创建任务后不等待asyncdefmain(): task = asyncio.create_task(some_async_function())# 没有 await,主函数可能提前结束return"完成"# ✅ 正确:确保任务完成asyncdefmain(): task = asyncio.create_task(some_async_function()) result =await task # 等待任务完成return result # 或者使用 gatherasyncdefmain():await asyncio.gather(some_async_function())return"完成"
错误 2:在事件循环外使用
# ❌ 错误:没有运行事件循环defsync_function(): task = asyncio.create_task(async_function())# 错误!# ✅ 正确:在异步上下文中使用asyncdefasync_function(): task = asyncio.create_task(another_async_function())await task # 或者在同步函数中使用 asyncio.run()defsync_function(): asyncio.run(async_function())
错误 3:混用 create_task 和直接 await
# ❌ 错误:混用导致顺序执行asyncdefmain():await slow_task("A")# 等待完成 task = asyncio.create_task(slow_task("B"))# 创建任务await slow_task("C")# 等待完成await task # 等待任务B# 实际执行顺序:A -> C -> B(不是并发)# ✅ 正确:统一使用 create_taskasyncdefmain(): task1 = asyncio.create_task(slow_task("A")) task2 = asyncio.create_task(slow_task("B")) task3 = asyncio.create_task(slow_task("C"))await asyncio.gather(task1, task2, task3)

create_task 最佳实践

1. 后台任务管理
import asyncio from typing import List classBackgroundTaskManager:"""后台任务管理器"""def__init__(self): self.tasks: List[asyncio.Task]=[]defadd_task(self, coro):"""添加后台任务""" task = asyncio.create_task(coro) self.tasks.append(task)return task asyncdefwait_all(self):"""等待所有任务完成"""if self.tasks:await asyncio.gather(*self.tasks, return_exceptions=True)defcancel_all(self):"""取消所有任务"""for task in self.tasks:ifnot task.done(): task.cancel()# 使用示例asyncdefmain(): manager = BackgroundTaskManager()# 添加多个后台任务 manager.add_task(send_notification(1,"消息1")) manager.add_task(send_notification(2,"消息2"))# 主流程继续print("主流程执行中...")await asyncio.sleep(1)# 等待所有后台任务完成await manager.wait_all()print("所有任务完成")
2. 任务包装器
import asyncio import logging asyncdefsafe_task(coro, task_name:str):"""安全的任务包装器,自动处理异常"""try: result =await coro logging.info(f"任务 {task_name} 完成")return result except Exception as e: logging.error(f"任务 {task_name} 失败: {e}")returnNoneasyncdefmain():# 创建安全的任务 task1 = asyncio.create_task(safe_task(risky_operation(),"任务1")) task2 = asyncio.create_task(safe_task(another_operation(),"任务2")) results =await asyncio.gather(task1, task2)return results 
3. 限制并发数量
import asyncio classTaskLimiter:"""限制并发任务数量"""def__init__(self, max_concurrent:int): self.semaphore = asyncio.Semaphore(max_concurrent) self.tasks =[]asyncdefadd_task(self, coro):"""添加任务(受并发限制)"""asyncwith self.semaphore:returnawait coro asyncdefrun_tasks(self, coros):"""运行多个任务""" tasks =[asyncio.create_task(self.add_task(coro))for coro in coros]returnawait asyncio.gather(*tasks)# 使用示例:最多同时运行 3 个任务asyncdefmain(): limiter = TaskLimiter(max_concurrent=3) coros =[fetch_data(f"url{i}")for i inrange(10)] results =await limiter.run_tasks(coros)return results 

总结:create_task 的核心要点

  1. 立即调度create_task() 创建 Task 后立即开始执行
  2. 并发执行:多个 Task 可以并发运行
  3. 任务控制:可以取消、检查状态、获取结果
  4. 与 await 的区别await 会等待完成,create_task() 立即返回
  5. 与 gather 的关系gather() 内部使用 create_task(),但更简洁
  6. 适用场景:后台任务、超时控制、任务管理

实际应用场景

场景 1:Web API 请求处理

from fastapi import FastAPI import asyncio app = FastAPI()# ❌ 同步方式:阻塞服务器@app.get("/sync")defsync_endpoint():# 模拟数据库查询 time.sleep(1)# 阻塞!其他请求无法处理return{"status":"ok"}# ✅ 异步方式:非阻塞@app.get("/async")asyncdefasync_endpoint():# 模拟异步数据库查询await asyncio.sleep(1)# 不阻塞,其他请求可以处理return{"status":"ok"}

效果:

  • 同步方式:服务器每秒只能处理 1 个请求
  • 异步方式:服务器可以并发处理多个请求

场景 2:批量文件处理

import aiofiles import asyncio from pathlib import Path # ❌ 同步方式defsync_process_files(file_paths): results =[]for path in file_paths:withopen(path,'r')as f: content = f.read()# 阻塞 results.append(process(content))return results # ✅ 异步方式asyncdefasync_process_files(file_paths):asyncdefprocess_file(path):asyncwith aiofiles.open(path,'r')as f: content =await f.read()# 非阻塞return process(content) tasks =[process_file(path)for path in file_paths] results =await asyncio.gather(*tasks)# 并发处理return results 

场景 3:数据库操作

import asyncpg import asyncio # ✅ 异步数据库操作asyncdeffetch_users(): conn =await asyncpg.connect('postgresql://...')try: users =await conn.fetch('SELECT * FROM users')return users finally:await conn.close()# 并发查询多个表asyncdeffetch_all_data(): users, posts, comments =await asyncio.gather( fetch_users(), fetch_posts(), fetch_comments())return users, posts, comments 

场景 4:任务调度与后台任务(使用 create_task)

import asyncio asyncdefbackground_task(task_id:str):"""后台任务:不需要等待完成"""print(f"任务 {task_id} 开始")await asyncio.sleep(5)# 模拟耗时操作print(f"任务 {task_id} 完成")asyncdefcritical_operation():"""关键操作:需要等待完成"""print("关键操作开始")await asyncio.sleep(1)print("关键操作完成")return"结果"asyncdefmain():# 关键操作:必须等待 result =await critical_operation()print(f"得到结果: {result}")# 后台任务:使用 create_task,不需要等待,立即返回 task1 = asyncio.create_task(background_task("task-1")) task2 = asyncio.create_task(background_task("task-2"))print("主流程继续执行...")# 后台任务在后台运行,不影响主流程# 如果需要,可以稍后等待任务完成await asyncio.sleep(6)# 等待后台任务完成print("所有任务完成") asyncio.run(main())

常见误区

误区 1:await 就是同步等待

错误理解:

# 认为 await 就是同步阻塞await some_async_function()# 认为这会阻塞整个程序

正确理解:

# await 是异步等待,会暂停当前协程,但允许其他协程运行asyncdeftask1():await asyncio.sleep(2)# 暂停 task1asyncdeftask2():await asyncio.sleep(1)# 暂停 task2# 两个任务并发执行,总耗时约 2 秒(不是 3 秒)await asyncio.gather(task1(), task2())

误区 2:异步函数会自动并发

错误理解:

asyncdefmain():# 认为这样会自动并发await func1()await func2()await func3()

实际情况:

# ❌ 顺序执行(不是并发)asyncdefmain():await func1()# 等待完成await func2()# 等待完成await func3()# 等待完成# 总耗时 = func1 + func2 + func3# ✅ 并发执行方式 1:使用 create_taskasyncdefmain(): task1 = asyncio.create_task(func1()) task2 = asyncio.create_task(func2()) task3 = asyncio.create_task(func3())await asyncio.gather(task1, task2, task3)# 总耗时 = max(func1, func2, func3)# ✅ 并发执行方式 2:使用 gather(推荐)asyncdefmain():await asyncio.gather(func1(), func2(), func3())# 总耗时 = max(func1, func2, func3)

误区 3:所有函数都应该用 async

错误理解:

# 认为所有函数都应该改成 asyncasyncdefadd(a, b):return a + b # CPU 密集型操作,不需要 async

正确做法:

# CPU 密集型:使用同步函数defadd(a, b):return a + b # I/O 密集型:使用异步函数asyncdeffetch_url(url):asyncwith aiohttp.ClientSession()as session:asyncwith session.get(url)as response:returnawait response.text()

误区 4:忘记 await 异步函数

错误代码:

asyncdefget_data():return"data"defmain(): result = get_data()# ❌ 错误!result 是协程对象print(result)# <coroutine object ...>

正确代码:

asyncdefget_data():return"data"asyncdefmain(): result =await get_data()# ✅ 正确print(result)# "data"

误区 5:create_task 后忘记等待

错误代码:

asyncdefmain():# ❌ 错误:创建任务后不等待,主函数可能提前结束 asyncio.create_task(long_running_task())return"完成"# 任务可能还没完成就返回了

正确代码:

asyncdefmain():# ✅ 正确方式 1:等待任务完成 task = asyncio.create_task(long_running_task()) result =await task return result # ✅ 正确方式 2:使用 gatherawait asyncio.gather(long_running_task())return"完成"# ✅ 正确方式 3:后台任务(明确不需要等待) task = asyncio.create_task(background_task())# 确保事件循环运行足够长时间await asyncio.sleep(10)# 等待后台任务完成return"完成"

最佳实践

1. 何时使用异步?

适合异步的场景:

  • 网络请求(HTTP API、WebSocket)
  • 文件 I/O(读写文件)
  • 数据库操作
  • 等待外部服务响应
  • 处理大量并发连接

不适合异步的场景:

  • CPU 密集型计算(图像处理、数值计算)
  • 简单的同步操作(不需要并发)
  • 已经优化的同步代码(不要为了异步而异步)

2. 异步函数设计原则

# ✅ 好的异步函数设计asyncdeffetch_user_data(user_id:int):"""清晰的异步函数"""asyncwith aiohttp.ClientSession()as session:asyncwith session.get(f'/api/users/{user_id}')as response:returnawait response.json()# ❌ 不好的设计:混合同步和异步asyncdefbad_function(): time.sleep(1)# ❌ 同步阻塞,破坏了异步的优势await asyncio.sleep(1)# ✅ 异步等待

3. 错误处理

asyncdefrobust_async_function():try: result =await some_async_operation()return result except Exception as e: logger.error(f"操作失败: {e}")raise# 批量操作时的错误处理asyncdefbatch_operations(): tasks =[operation(i)for i inrange(10)] results =await asyncio.gather(*tasks, return_exceptions=True)for i, result inenumerate(results):ifisinstance(result, Exception):print(f"任务 {i} 失败: {result}")else:print(f"任务 {i} 成功: {result}")

4. 资源管理

# ✅ 使用异步上下文管理器asyncdefprocess_file():asyncwith aiofiles.open('file.txt','r')as f: content =await f.read()# 文件自动关闭# ✅ 数据库连接管理asyncdefquery_database():asyncwith asyncpg.create_pool('postgresql://...')as pool:asyncwith pool.acquire()as conn: result =await conn.fetch('SELECT * FROM users')# 连接自动释放

5. 性能优化技巧

# ✅ 使用 asyncio.gather 并发执行asyncdeffetch_multiple_urls(urls):asyncwith aiohttp.ClientSession()as session: tasks =[fetch_url(session, url)for url in urls] results =await asyncio.gather(*tasks)return results # ✅ 使用 create_task 实现更细粒度的控制asyncdeffetch_with_control(urls):asyncwith aiohttp.ClientSession()as session: tasks =[asyncio.create_task(fetch_url(session, url))for url in urls]# 可以单独处理每个任务 results =[]for task in tasks:try: result =await task results.append(result)except Exception as e:print(f"任务失败: {e}") results.append(None)return results # ✅ 限制并发数量(避免过多并发)asyncdeffetch_with_limit(urls, limit=10): semaphore = asyncio.Semaphore(limit)asyncdeffetch_with_semaphore(url):asyncwith semaphore:returnawait fetch_url(url) tasks =[asyncio.create_task(fetch_with_semaphore(url))for url in urls]returnawait asyncio.gather(*tasks)

总结

核心概念回顾

  1. 同步函数:顺序执行,阻塞等待
    • 简单直观
    • 适合 CPU 密集型任务
    • 不适合大量 I/O 操作
  2. 异步函数(async):定义协程函数
    • 返回协程对象
    • 必须用 await 或事件循环执行
    • 适合 I/O 密集型任务
  3. await 关键字:异步等待
    • 暂停当前协程
    • 让出控制权给事件循环
    • 不阻塞整个线程
    • 允许其他协程并发执行
  4. asyncio.create_task():创建并调度任务
    • 将协程包装成 Task 对象
    • 立即开始执行(不等待完成)
    • 实现并发执行多个任务
    • 可以取消、检查状态、获取结果

关键区别总结

特性同步函数异步函数 + awaitcreate_task
执行方式顺序执行顺序执行(单个协程)并发执行(多个任务)
阻塞性阻塞线程暂停协程,不阻塞线程不阻塞,立即返回
适用场景CPU 密集型I/O 密集型(单个)I/O 密集型(多个并发)
并发能力无(单个)高(多个)
代码复杂度中高
返回类型直接返回值协程对象Task 对象

记忆要点

  1. async def = 定义异步函数,返回协程对象
  2. await = 异步等待,暂停协程但不阻塞线程
  3. 异步函数必须用 await,否则不会执行
  4. await 不等于同步阻塞,它允许并发执行
  5. I/O 操作用异步,CPU 计算用同步
  6. create_task() = 创建任务并立即调度,实现并发执行
  7. 多个 await 顺序执行,多个 create_task 并发执行
  8. gather() 内部使用 create_task(),但更简洁

实践建议

  • 🎯 从简单的异步函数开始练习
  • 🎯 理解事件循环的工作原理
  • 🎯 在实际项目中逐步应用
  • 🎯 注意错误处理和资源管理
  • 🎯 不要过度使用异步(适合的场景才用)

延伸阅读

Read more

用 DeepSeek 打造你的超强代码助手

用 DeepSeek 打造你的超强代码助手

DeepSeek Engineer 是啥? 简单来说,DeepSeek Engineer 是一个基于命令行的智能助手。它能帮你完成这些事: * 快速读文件内容:比如你有个配置文件,直接用命令把它加载进助手,后续所有操作都可以基于这个文件。 * 自动改文件:它不仅能提建议,还可以直接生成差异表(diff),甚至自动应用修改。 * 智能代码生成:比如你让它生成代码片段,它会按照指定格式和规则直接返回。 更重要的是,这一切都是通过 DeepSeek 的强大 API 来实现的。想象一下,你有个贴身助手,不仅能听懂你的代码需求,还能直接动手帮你写! 核心功能拆解 我们先来看 DeepSeek Engineer 的几个核心能力,让你更好地理解它的强大之处。 1. 自动配置 DeepSeek 客户端 启动这个工具时,你只需要准备一个 .env 文件,里面写上你的 API Key,比如: DEEPSEEK_API_

By Ne0inhk
10分钟打造专属AI助手!ToDesk云电脑/顺网云/海马云操作DeepSeek哪家强?

10分钟打造专属AI助手!ToDesk云电脑/顺网云/海马云操作DeepSeek哪家强?

文章目录 * 一、引言 * 云计算平台概览 * ToDesk云电脑:随时随地用上高性能电脑 * 二 .云电脑初体验 * DeekSeek介绍 * 版本参数与特点 * 任务类型表现 * 1、ToDesk云电脑 * 2、顺网云电脑 * 3、海马云电脑 * 三、DeekSeek本地化实操和AIGC应用 * 1. ToDesk云电脑 * 2. 海马云电脑 * 3、顺网云电脑 * 四、结语 * 总结:云电脑如何选择? 一、引言 DeepSeek这些大模型让 AI 开发变得越来越有趣,但真要跑起来,可没那么简单! * 本地配置太麻烦:显卡不够、驱动难装、环境冲突,光是折腾这些就让人心态崩了。 * 云端性能参差不齐:选错云电脑,可能卡到爆、加载慢,还容易掉线,搞得效率直线下降。 * 成本难控:有的平台按小时计费,价格一会儿一个样,

By Ne0inhk
解锁DeepSeek潜能:Docker+Ollama打造本地大模型部署新范式

解锁DeepSeek潜能:Docker+Ollama打造本地大模型部署新范式

🐇明明跟你说过:个人主页 🏅个人专栏:《深度探秘:AI界的007》 🏅 🔖行路有良友,便是天堂🔖 目录 一、引言 1、什么是Docker 2、什么是Ollama 二、准备工作 1、操作系统 2、镜像准备 三、安装 1、安装Docker 2、启动Ollama 3、拉取Deepseek大模型 4、启动Deepseek  一、引言 1、什么是Docker Docker:就像一个“打包好的App” 想象一下,你写了一个很棒的程序,在自己的电脑上运行得很好。但当你把它发给别人,可能会遇到各种问题: * “这个软件需要 Python 3.8,但我只有 Python 3.6!

By Ne0inhk
深挖 DeepSeek 隐藏玩法·智能炼金术2.0版本

深挖 DeepSeek 隐藏玩法·智能炼金术2.0版本

前引:屏幕前的你还在AI智能搜索框这样搜索吗?“这道题怎么写”“苹果为什么红”“怎么不被发现翘课” ,。看到此篇文章的小伙伴们!请准备好你的思维魔杖,开启【霍格沃茨模式】,看我如何更新秘密的【知识炼金术】,我们一起来解锁更加刺激的剧情!友情提醒:《《《前方高能》》》 目录 在哪使用DeepSeek 如何对提需求  隐藏玩法总结 几个高阶提示词 职场打工人 自媒体创作 电商实战 程序员开挂 非适用场地 “服务器繁忙”如何解决 (1)硅基流动平台 (2)Chatbox + API集成方案 (3)各大云平台 搭建个人知识库 前置准备 下载安装AnythingLLM 选择DeepSeek作为AI提供商 创作工作区 导入文档 编辑  编辑 小编寄语 ——————————————————————————————————————————— 在哪使用DeepSeek 我们解锁剧情前,肯定要知道在哪用DeepSeek!咯,为了照顾一些萌新朋友,它的下载方式我放在下面了,拿走不谢!  (1)

By Ne0inhk