跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Python

Python asyncio 异步编程教程

综述由AI生成Python asyncio 是 Python 3.4 引入的标准库,用于编写并发代码。它通过单线程事件循环实现高并发 I/O 操作,避免阻塞。档涵盖 asyncio 的核心概念,包括事件循环、协程、任务和 Future。详细讲解了基础功能如 asyncio.run、create_task、gather、wait_for 和 sleep。进阶主题涉及 Python 3.11+ 的 TaskGroup 结构化并发、TCP/UDP 服务器创建、异步上下文管理器、多线程结合使用以及高级错误处理和重试机制。适合处理网络请求等 I/O 密集型任务。

BigDataPan发布于 2026/3/27更新于 2026/5/2426 浏览

Python asyncio 完整教程

什么是 asyncio

asyncio 是 Python 3.4 引入的标准库,用于编写并发代码。它使用单线程中的事件循环来实现并发,特别适合处理 I/O 密集型任务。

核心特点
特性描述
单线程并发使用事件循环在一个线程中处理多个任务
协程支持通过 async/await 语法实现异步编程
非阻塞 I/O避免在等待 I/O 操作时阻塞整个程序
高并发性轻松处理数千个并发连接
标准库集成Python 3.7+ 中与标准库深度集成

核心概念

1. 事件循环

定义:事件循环是 asyncio 的核心,负责运行异步任务和处理 I/O 操作。

工作原理:事件循环 continuously 检查任务是否准备好执行,如果没有准备好(比如在等待网络响应),它会切换到其他已准备好的任务执行。

示例:

import asyncio

async def main():
    print("开始")
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    print(f"当前事件循环:{loop}")
    await asyncio.sleep(1)
    print("结束")

asyncio.run(main())
2. 协程

定义:使用 async def 定义的函数,返回协程对象。

工作原理:协程执行到 await 表达式时,会暂停执行并返回控制权给事件循环,等待异步操作完成后继续执行。

示例:

import asyncio

async def fetch_data(url):
    print(f"开始获取数据:{url}")
    await asyncio.sleep(2)  # 模拟网络请求
    print(f"获取完成:{url}")
    return f"数据来自 {url}"

async def main():
    result = await fetch_data("https://api.example.com")
    print(result)

asyncio.run(main())
3. 任务

定义:任务的协程的包装器,让协程可以并发执行。

工作原理:任务创建后会被放入事件循环,事件循环会在适当时机执行任务。

示例:

import asyncio

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"任务 {name} 的结果"

async def main():
    # 创建多个任务
    task1 = asyncio.create_task(task("A", 2))
    task2 = asyncio.create_task(task("B", 1))
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print(f"所有结果:{results}")

asyncio.run(main())
4. Future

定义:Future 代表一个最终会结果的异步操作的占位符。

工作原理:Future 可以被等待,当它完成时,等待它的协程会继续执行。

示例:

import asyncio

async def set_future_value(future):
    await asyncio.sleep(2)
    future.set_result("Future 的值已设置")

async def main():
    # 创建 Future
    future = asyncio.Future()
    # 创建任务设置 Future 的值
    task = asyncio.create_task(set_future_value(future))
    # 等待 Future 完成
    result = await future
    print(f"Future 结果:{result}")
    await task

asyncio.run(main())
概念关系图

事件循环 协程 任务 Future I/O 操作 定时器 await 表达式

基础功能详解

1. asyncio.run()

用途:运行协程的主入口,创建并运行事件循环。

基本语法:

asyncio.run(coro, *, debug=None)

简单示例:

import asyncio

async def hello():
    print("Hello, asyncio!")
    await asyncio.sleep(1)
    print("Done!")

asyncio.run(hello())

最佳实践:

  • 每个程序只调用一次 asyncio.run()
  • 在程序的主入口使用它
  • 使用 debug=True 开启调试模式帮助发现潜在问题
2. asyncio.create_task()

用途:将协程包装成任务,实现并发执行。

基本语法:

asyncio.create_task(coro, *, name=None, context=None)

简单示例:

import asyncio

async def count(name, delay):
    for i in range(3):
        print(f"{name}: {i}")
    await asyncio.sleep(delay)

async def main():
    # 创建两个并发任务
    task1 = asyncio.create_task(count("A", 0.5))
    task2 = asyncio.create_task(count("B", 1))
    # 等待任务完成
    await asyncio.gather(task1, task2)

asyncio.run(main())
3. asyncio.gather()

用途:并发运行多个任务,收集所有结果。

基本语法:

asyncio.gather(*coros_or_futures, return_exceptions=False)

简单示例:

import asyncio

async def task1():
    await asyncio.sleep(1)
    return "任务 1 完成"

async def task2():
    await asyncio.sleep(2)
    return "任务 2 完成"

async def main():
    # 并发运行两个任务
    results = await asyncio.gather(task1(), task2())
    print(f"结果:{results}")

asyncio.run(main())

异常处理:

# 使用 return_exceptions=True 获取所有结果,包括异常
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
    if isinstance(result, Exception):
        print(f"异常:{result}")
    else:
        print(f"正常结果:{result}")
4. asyncio.wait_for()

用途:为异步操作设置超时时间。

简单示例:

import asyncio

async def slow_operation():
    await asyncio.sleep(3)
    return "操作完成"

async def main():
    try:
        # 设置 2 秒超时
        result = await asyncio.wait_for(slow_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")

asyncio.run(main())
5. asyncio.sleep()

用途:非阻塞地暂停协程执行指定时间。

简单示例:

import asyncio

async def countdown():
    for i in range(3, 0, -1):
        print(f"{i}...")
    await asyncio.sleep(1)
    print("Go!")

asyncio.run(countdown())

重要提醒:使用 asyncio.sleep() 而不是 time.sleep() 来避免阻塞事件循环。

进阶主题

1. TaskGroup 和结构化并发(Python 3.11+)

TaskGroup 提供了结构化并发,它确保所有任务都成功完成,或者任何一个任务失败时自动取消其他所有任务。

示例:

import asyncio

async def fetch_data(url):
    print(f"开始获取:{url}")
    await asyncio.sleep(1)
    if url == "error":
        raise ValueError(f"无法获取 {url}")
    return f"数据来自 {url}"

async def main():
    # Python 3.11+ 使用 TaskGroup
    try:
        async with asyncio.TaskGroup() as tg:
            # 创建多个相关任务
            task1 = tg.create_task(fetch_data("url1"))
            task2 = tg.create_task(fetch_data("url2"))
            task3 = tg.create_task(fetch_data("error"))
            # 这个会失败
            # 只有当所有任务都成功时才会执行到这里
            print(f"所有结果:{[task1.result(), task2.result(), task3.result()]}")
    except ValueError as e:
        print(f"捕获到错误:{e}")
        # TaskGroup 自动取消了其他任务

asyncio.run(main())
2. 使用 asyncio 创建服务器

asyncio 提供了创建 TCP 和 UDP 服务器的能力,可以轻松构建高性能的网络服务。

示例:

import asyncio

async def handle_echo(reader, writer):
    """处理客户端连接"""
    addr = writer.get_extra_info('peername')
    print(f"连接来自:{addr}")
    try:
        while True:
            # 读取客户端数据
            data = await reader.read(100)
            if not data:
                break
            message = data.decode()
            print(f"收到来自 {addr}: {message}")
            # 发送响应
            response = f"Echo: {message}"
            writer.write(response.encode())
            await writer.drain()
    except ConnectionResetError:
        print(f"客户端 {addr} 断开连接")
    finally:
        print(f"关闭连接:{addr}")
        writer.close()
        await writer.wait_closed()

async def main():
    # 创建 TCP 服务器
    server = await asyncio.start_server(
        handle_echo,
        '127.0.0.1',  # 监听地址
        8888          # 监听端口
    )
    addr = server.sockets[0].getsockname()
    print(f"服务器启动在 {addr}")
    async with server:
        # 保持服务器运行
        await server.serve_forever()

asyncio.run(main())
3. 异步上下文管理器

异步上下文管理器使用 async with 语法,确保异步资源的正确管理。

示例:

import asyncio
import aiofiles

class AsyncDatabaseConnection:
    def __init__(self, db_url):
        self.db_url = db_url
        self.connection = None

    async def __aenter__(self):
        """进入上下文时连接数据库"""
        print("连接数据库...")
        # 模拟数据库连接
        await asyncio.sleep(0.1)
        self.connection = f"Connection to {self.db_url}"
        print("数据库连接成功")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出上下文时关闭连接"""
        print("关闭数据库连接...")
        # 模拟关闭连接
        await asyncio.sleep(0.1)
        self.connection = None
        print("数据库连接已关闭")

    async def query(self, sql):
        """执行查询"""
        print(f"执行查询:{sql}")
        await asyncio.sleep(0.5)  # 模拟查询耗时
        return f"Query result for {sql}"

async def use_database():
    async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
        result1 = await db.query("SELECT * FROM users")
        result2 = await db.query("SELECT * FROM orders")
        print(f"查询结果 1: {result1}")
        print(f"查询结果 2: {result2}")

asyncio.run(use_database())
4. 多线程和 asyncio 的结合

当遇到 CPU 密集型任务或必须使用的同步代码时,可以使用 loop.run_in_executor() 将它们放到线程池中执行。

示例:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# CPU 密集型函数
def cpu_intensive_task(n):
    """计算斐波那契数列"""
    if n <= 1:
        return n
    return cpu_intensive_task(n-1) + cpu_intensive_task(n-2)

def blocking_io_task(filename):
    """模拟阻塞的 I/O 操作"""
    time.sleep(2)  # 模拟文件 I/O
    return f"文件 {filename} 读取完成"

async def main():
    # 创建线程池
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 运行 CPU 密集型任务
        print("开始执行 CPU 密集型任务...")
        future1 = loop.run_in_executor(
            executor, cpu_intensive_task, 35
        )
        # 运行阻塞 I/O 任务
        print("开始执行阻塞 I/O 任务...")
        future2 = loop.run_in_executor(
            executor, blocking_io_task, "example.txt"
        )
        # 同时执行多个任务
        print("执行其他异步任务...")
        for i in range(3):
            print(f"异步任务 {i}")
        await asyncio.sleep(0.5)
        # 获取结果
        cpu_result = await future1
        io_result = await future2
        print(f"CPU 任务结果:{cpu_result}")
        print(f"I/O 任务结果:{io_result}")

asyncio.run(main())
5. 高级错误处理和重试机制

在分布式系统中,网络请求可能因为各种原因失败,实现自动重试机制可以提高系统的可靠性。

示例:

import asyncio
import random
from typing import Callable, Any

class RetryError(Exception):
    """重试次数耗尽异常"""
    pass

async def retry(
    coro: Callable,
    max_retries: int = 3,
    delay: float = 1.0,
    backoff_factor: float = 2.0,
    exceptions: tuple = (Exception,)
):
    """带指数退避的重试装饰器"""
    last_exception = None
    for attempt in range(max_retries + 1):
        try:
            return await coro()
        except exceptions as e:
            last_exception = e
            if attempt == max_retries:
                break
            # 计算延迟时间(指数退避)
            wait_time = delay * (backoff_factor ** attempt)
            print(f"第 {attempt + 1} 次尝试失败,{wait_time:.1f} 秒后重试...")
            await asyncio.sleep(wait_time)
    raise RetryError(f"重试 {max_retries} 次后仍然失败") from last_exception

async def unreliable_request(url):
    """模拟可能失败的请求"""
    await asyncio.sleep(0.5)
    if random.random() < 0.7:  # 70% 概率失败
        raise ConnectionError(f"请求 {url} 失败")
    return f"成功请求 {url}"

async def main():
    urls = ["url1", "url2", "url3", "url4", "url5"]
    for url in urls:
        try:
            # 使用重试机制
            result = await retry(
                lambda: unreliable_request(url),
                max_retries=3,
                delay=1,
                backoff_factor=1.5,
                exceptions=(ConnectionError, TimeoutError)
            )
            print(f"✓ {result}")
        except RetryError as e:
            print(f"✗ {url}: {str(e)}")

asyncio.run(main())

目录

  1. Python asyncio 完整教程
  2. 什么是 asyncio
  3. 核心特点
  4. 核心概念
  5. 1. 事件循环
  6. 2. 协程
  7. 3. 任务
  8. 4. Future
  9. 概念关系图
  10. 基础功能详解
  11. 1. asyncio.run()
  12. 2. asyncio.create_task()
  13. 3. asyncio.gather()
  14. 使用 return_exceptions=True 获取所有结果,包括异常
  15. 4. asyncio.wait_for()
  16. 5. asyncio.sleep()
  17. 进阶主题
  18. 1. TaskGroup 和结构化并发(Python 3.11+)
  19. 2. 使用 asyncio 创建服务器
  20. 3. 异步上下文管理器
  21. 4. 多线程和 asyncio 的结合
  22. CPU 密集型函数
  23. 5. 高级错误处理和重试机制
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • OpenViking 部署与应用:字节跳动开源 AI 代理上下文数据库
  • Python 为何成为 AI 开发的首选语言?
  • OpenClaw 新手入门:环境搭建、模型配置与 WebUI 远程访问
  • Python 环境变量配置与验证指南
  • OpenClaw 的 SOUL.md:用自然语言定义 AI 代理身份与行为边界
  • VSCode + GitHub Copilot AI 编程实战指南
  • OpenClaw:开源 AI 智能体框架的技术解析与实战部署
  • 2026 年高校 AIGC 检测政策汇总
  • Python 集成开发环境(IDE)优缺点对比与推荐
  • 从零构建可扩展 Flutter 应用:v1.0 到 v2.0 全代码详解
  • MixAIHub 镜像站:支持 ChatGPT、Claude 等主流 AI 模型访问
  • 从零构建可扩展 Flutter 应用:v1.0 到 v2.0 架构演进详解
  • Python 3.14 在 Windows 上的安装指南
  • Linux TCP 服务器开发:从 Echo 到远程命令执行的并发与安全
  • 国产时序数据库云原生实践:Apache IoTDB 与 TimechoDB 深度应用
  • libwebkit2gtk-4.1-0 在 Ubuntu 22.04 下的依赖问题与解决
  • 基于无人机搭载摄像头网络的交互式监控分布式方法
  • Apache Tomcat 高危路径遍历漏洞 CVE-2025-55752 修复指南
  • libONVIF 开源 ONVIF 库解析与使用指南
  • Linux 进程间通信详解:管道、共享内存与 System V IPC 机制

相关免费在线工具

  • curl 转代码

    解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online

  • JSON 压缩

    通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online