跳到主要内容 理解 Python 异步编程原理与 asyncio 简单实现 | 极客日志
Python 算法
理解 Python 异步编程原理与 asyncio 简单实现 本文详细解析了 Python 异步编程的核心概念,包括阻塞与非阻塞、同步与异步、并发与并行的区别。通过对比同步阻塞、多进程、多线程及非阻塞 Socket 编程的演进过程,阐述了传统方式的局限性。重点介绍了 Python 标准库 asyncio 的工作原理,涵盖事件循环、协程、Future 和 Task 等关键组件,并通过代码示例展示了如何使用 asyncio 实现高效的并发网络请求。文章最后总结了不同并发模型的适用场景,帮助开发者根据实际需求选择最优技术方案。
星星泡饭 发布于 2025/2/6 更新于 2026/4/20 1 浏览
理解 Python 异步编程原理与 asyncio 简单实现
在开始说明异步编程之前,首先先了解几个相关的核心概念。这些概念是理解现代高性能网络编程的基础。
基本概念辨析
阻塞 (Blocking) 程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。
常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。
阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。(如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。)
简单的理解的话,阻塞就是 A 调用 B,A 会被挂起,一直等待 B 的结果,什么都不能干 。
非阻塞 (Non-blocking) 程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。
非阻塞并不是在任何程序级别、任何情况下都可以存在的。
仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。
非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。
简单理解的话,非阻塞就是 A 调用 B,A 自己不用被挂起来等待 B 的结果,A 可以去干其他的事情 。
同步 (Synchronous) 不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。
例如购物系统中更新商品库存,需要用'行锁'作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。
简单理解的话,同步就是A 调用 B,此时只有等 B 有了结果才返回 。
异步 (Asynchronous) 为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式。
不相关的程序单元之间可以是异步的。
例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。
简单理解的话,异步就是A 调用 B,B 立即返回,无需等待。等 B 处理完之后再告诉 A 结果 。
并发 (Concurrency) vs 并行 (Parallelism)
并发 并发描述的是程序的组织结构。指程序要被设计成多个可独立执行的子任务。
以利用有限的计算机资源使多个任务可以被实时或近实时执行为目的。
并行 并行描述的是程序的执行状态。指多个任务同时被执行。
以利用富余计算资源(多核 CPU)加速完成多个任务为目的。
并发提供了一种程序组织结构方式,让问题的解决方案可以并行执行,但并行执行不是必须的。
总的来说,并行是为了利用多核加速多任务的完成;并发是为了让独立的子任务能够尽快完成;非阻塞是为了提高程序的整体运行效率,而异步是组织非阻塞任务的方式。
从同步到异步 I/O 的演进 以一个爬虫为例,下载 10 篇网页,用几个例子来展示从同步->异步的演进过程。
同步阻塞方式 以同步阻塞方式来写这个程序也是最容易想到的方式,即依次下载好 10 篇网页。
import socket
def blocking_way ():
sock = socket.socket()
sock.connect(('example.com' , 80 ))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii' ))
response = b''
chunk = sock.recv(4096 )
while chunk:
response += chunk
chunk = sock.recv(4096 )
return response
def sync_way ():
res = []
for i in range (10 ):
res.append(blocking_way())
return len (res)
这段代码的执行时间大概为 4.5 秒。(取多次平均值)
上述代码中,blocking_way() 这个函数的作用主要是建立连接,发送 HTTP 请求,然后从 socket 读取 HTTP 响应数据并返回。
sync_way() 将 blocking_way() 执行了 10 次,也就是说,我们执行了 10 次访问 example.com。
由于网络情况和服务端的处理各不相同,所以服务端什么时候返回了响应数据并被客户端接收到可供程序读取,也是不可预测的。所以 sock.connect() 和 sock.recv() 这两个调用在默认情况下是阻塞的。
注:sock.send() 函数并不会阻塞太久,它只负责将请求数据拷贝到 TCP/IP 协议栈的系统缓冲区中就返回,并不等待服务端返回的应答确认。
如果是说网络环境很差的话,创建网络连接的 TCP/IP 握手需要 1 秒,那么 sock.connect() 就得阻塞 1 秒。这一秒时间对 CPU 来说就被浪费了。同理,sock.recv() 也一样的必须得等到服务端的响应数据已经被客户端接收,才能进行后续的程序。目前的例子上只需要下载一篇网页,阻塞 10 次看起来好像没有什么问题,可是如果需求是 1000w 篇的话,这种阻塞的方式就显得很蠢,效率也很低下。
改进:多进程 在一个程序中,依次执行 10 次好像有些耗时,那么我们使用多进程,开 10 个同样的程序一起处理的话,也许会好一些?于是第一个改进方式便出来了:多进程编程 。
import socket
from concurrent.futures import ProcessPoolExecutor
def blocking_way ():
sock = socket.socket()
sock.connect(('example.com' , 80 ))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii' ))
response = b''
chunk = sock.recv(4096 )
while chunk:
response += chunk
chunk = sock.recv(4096 )
return response
def process_way ():
workers = 10
with ProcessPoolExecutor(workers) as executor:
futs = {executor.submit(blocking_way) for _ in range (10 )}
return len ([fut.result() for fut in futs])
按理说,使用 10 个相同的进程来执行这段程序,其执行时间应该是会缩短到原来的 1/10,然而并没有。这里面还有一些时间被进程的切换所消耗掉了。
CPU 从一个进程切换到另一个进程的时候,需要把旧进程运行时的寄存器状态,内存状态都保存好,然后再将另一个进程之前保存的数据恢复。当进程数量大于 CPU 核心数的时候,进程切换是必须的。
一般来说,服务器在能够稳定运行的前提下,可以同时处理的进程数在数十个到数百个规模。如果进程数量规模更大,系统运行将不稳定,而且可用内存资源往往也会不足。除了切换开销大,以及可支持的任务规模小之外,多进程还有其他缺点,如状态共享等问题。
改进:多线程 线程的数据结构比进程更加的轻量级,同一个进程可以容纳好几个线程。
后来的 OS 也把调度单位由进程转为线程,进程只作为线程的容器,用于管理进程所需的资源。而且 OS 级别的线程是可以被分配到不同的 CPU 核心同时运行的。
import socket
from concurrent.futures import ThreadPoolExecutor
def blocking_way ():
sock = socket.socket()
sock.connect(('example.com' , 80 ))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii' ))
response = b''
chunk = sock.recv(4096 )
while chunk:
response += chunk
chunk = sock.recv(4096 )
return response
def thread_way ():
workers = 10
with ThreadPoolExecutor(workers) as executor:
futs = {executor.submit(blocking_way) for _ in range (10 )}
return len ([fut.result() for fut in futs])
从运行时间上来看,多线程好像已经解决了进程切换开销大的问题,而且可支持的任务数量规模,也变成了数百个到数千个。
但是由于 CPython 中的多线程因为 GIL(全局解释器锁)的存在,它们并不能利用 CPU 多核优势,一个 Python 进程中,只允许有一个线程处于运行状态。
在做阻塞的系统调用时,例如 sock.connect(), sock.recv() 时,当前线程会释放 GIL,让别的线程有执行机会。但是单个线程内,在阻塞调用上还是阻塞的。
Python 中 time.sleep 是阻塞的,都知道使用它要谨慎,但在多线程编程中,time.sleep 并不会阻塞其他线程。
除了 GIL 之外,所有的多线程还有通病。它们是被 OS 调度,调度策略是抢占式的,以保证同等优先级的线程都有均等的执行机会,那带来的问题是:并不知道下一时刻是哪个线程被运行,也不知道它正要执行的代码是什么。所以就可能存在竞态条件。如果在一个复杂的爬虫系统中,要抓取的 URL 由多个爬虫线程来拿,那么 URL 如何分配,这就需要用到'锁'或'同步队列'来保证下载任务不会被重复执行。多线程最主要的问题还是竞态条件。
非阻塞方式 import socket
def noblock_way ():
sock = socket.socket()
sock.setblocking(False )
try :
sock.connect(('example.com' , 80 ))
except BlockingIOError:
pass
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
data = request.encode('ascii' )
while True :
try :
sock.send(data)
break
except OSError:
pass
response = b''
while True :
try :
chunk = sock.recv(4096 )
while chunk:
response += chunk
chunk = sock.recv(4096 )
break
except OSError:
pass
return response
def sync_way ():
res = []
for i in range (10 ):
res.append(noblock_way())
return len (res)
执行完这段代码的时候,感觉好像是被骗了,代码的执行时间和非阻塞方式差不多,而且程序更复杂了。要非阻塞何用?
代码 sock.setblocking(False) 告诉 OS,让 socket 上阻塞调用都改为非阻塞的方式。非阻塞就是在做一件事的时候,不阻碍调用它的程序做别的事情。上述代码在执行完 sock.connect() 和 sock.recv() 后的确不再阻塞,可以继续往下执行请求准备的代码或者是执行下一次读取。第 8 行要放在 try 语句内,是因为 socket 在发送非阻塞连接请求过程中,系统底层也会抛出异常。connect() 被调用之后,立即可以往下执行第 12 和 13 行的代码。
虽然 connect() 和 recv() 不再阻塞主程序,空出来的时间段 CPU 没有空闲着,但并没有利用好这空闲去做其他有意义的事情,而是在循环尝试读写 socket(不停判断非阻塞调用的状态是否就绪)。还得处理来自底层的可忽略的异常。也不能同时处理多个 socket。 所以总体执行时间和同步阻塞相当。
非阻塞改进:Epoll 其实判断非阻塞调用是否就绪可以交给 OS 来做,不用应用程序自己去等待和判断,可以用这个空闲时间去做其他的事情。
OS 将 O/I 的变化都封装成了事件,比如可读事件、可写事件。而且提供了相应的系统模块以供调用来接收事件通知。这个模块就是 select,让应用程序可以通过 select 注册文件描述符和回调函数。当文件描述符的状态发生变化时,select 就调用事先注册的回调函数。
select 因其算法效率比较低,后来改进成了 poll,再后来又有进一步改进,BSD 内核改进成了 kqueue 模块,而 Linux 内核改进成了 epoll 模块。这四个模块的作用都相同,暴露给程序员使用的 API 也几乎一致,区别在于性能和处理大规模连接的能力。
Select : 每次调用都需要遍历所有监听的文件描述符,时间复杂度为 O(N)。随着连接数增加,性能急剧下降。
Poll : 改进了 select 的线性扫描,但依然需要在用户空间和内核空间之间传递大量数据。
Epoll : 基于事件驱动机制,只关注活跃的连接。当文件描述符状态改变时,内核通过回调通知应用层,时间复杂度接近 O(1)。这使得 Epoll 非常适合高并发场景。
Python 异步编程:Asyncio 在了解了底层的 IO 模型后,我们可以看看 Python 是如何封装这些能力的。Python 3.4 引入了 asyncio 库,旨在提供原生支持异步编程的框架。
为什么需要 Asyncio? 尽管多线程和多进程可以解决部分并发问题,但在 Python 中,由于 GIL 的存在,对于 CPU 密集型任务,多线程无法利用多核优势。而对于 I/O 密集型任务,多线程虽然可行,但上下文切换开销依然存在,且管理复杂(锁、死锁等)。
asyncio 提供了一种单线程下的并发模型。它通过事件循环(Event Loop)来管理协程(Coroutine)的执行。当一个协程遇到 I/O 等待时,它会挂起自己,将控制权交还给事件循环,事件循环接着去执行下一个协程。这样,单个线程就可以高效地处理成千上万个并发连接。
核心概念
Event Loop (事件循环) : 无限循环,负责监控和管理所有的事件(如 I/O 就绪、定时器触发等)。
Coroutine (协程) : 使用 async def 定义的函数。它是一种特殊的生成器,可以在执行过程中暂停和恢复。
Future (未来对象) : 代表一个尚未完成的操作的结果。协程通常返回 Future 对象。
Task (任务) : 封装协程的对象,使其成为 Event Loop 的可调度实体。
简单实现示例 下面是一个使用 asyncio 模拟高并发下载的示例,对比之前的同步阻塞方式。
import asyncio
import aiohttp
async def fetch (session, url ):
async with session.get(url) as response:
return await response.text()
async def main ():
urls = ['http://example.com' ] * 10
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print (f"Downloaded {len (results)} pages" )
if __name__ == '__main__' :
asyncio.run(main())
在这个例子中,aiohttp 是基于 asyncio 的 HTTP 库。asyncio.gather 会并发执行所有任务,直到所有任务完成。相比之前的同步方式,这里不需要手动管理线程池或进程池,代码更加简洁,且避免了 GIL 带来的限制(在 I/O 等待期间释放 GIL)。
手写简易 Event Loop 为了深入理解,我们可以尝试手写一个极简的事件循环逻辑(伪代码示意):
class SimpleLoop :
def __init__ (self ):
self .tasks = []
def add_task (self, coro ):
self .tasks.append(coro)
def run (self ):
while self .tasks:
task = self .tasks.pop(0 )
try :
next (task)
except StopIteration:
continue
pass
总结 异步编程的核心在于非阻塞 I/O 与事件驱动 。在 Python 中,asyncio 是标准库提供的最佳实践工具。
同步阻塞 : 简单直观,但效率低,不适合高并发。
多线程 : 适合 I/O 密集型,但受 GIL 限制,且管理复杂。
多进程 : 适合 CPU 密集型,但资源开销大。
Asyncio : 适合高并发 I/O 密集型,单线程模型,效率高,代码清晰。
选择合适的并发模型取决于具体的业务场景。对于网络爬虫、Web 服务器后端等 I/O 密集型任务,asyncio 通常是首选方案。
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online