前言
在日常开发中,让程序同时处理多个任务是提升效率的核心需求。Python 提供了进程、线程、协程三种多任务实现方式,它们各有适用场景与优劣。本文从基础概念出发,结合代码实例拆解这三种技术的用法、注意事项与实战场景。
本文详细解析了 Python 中的多任务编程技术,涵盖进程、线程与协程三大核心概念。文章阐述了多任务的基本原理及并发与并行的区别,深入讲解了多进程的实现方式、PID 获取及注意事项(如全局变量隔离)。随后介绍了多线程的创建、共享变量机制及线程安全问题(锁的使用)。最后重点分析了协程的优势、适用场景及基于 asyncio 和 aiohttp 的异步爬虫实战。通过代码对比与案例演示,帮助开发者理解不同多任务方案的特性,以便在实际开发中选择最优方案。

在日常开发中,让程序同时处理多个任务是提升效率的核心需求。Python 提供了进程、线程、协程三种多任务实现方式,它们各有适用场景与优劣。本文从基础概念出发,结合代码实例拆解这三种技术的用法、注意事项与实战场景。
多任务是指在同一时间内,程序能够处理多个独立的任务。从用户视角来看,这些任务似乎是'同时进行'的;从技术视角看,多任务的实现依赖于操作系统对 CPU 等资源的调度。对于单 CPU 核心来说,真正的'同时执行'是不存在的,操作系统会快速切换多任务的执行权,由于切换速度极快,用户会产生'多个任务同时运行'的错觉。
简单总结:并发是'交替执行',并行是'同时执行'。
在 Python 中,实现多任务的核心有三种:进程、线程、协程。其中,进程是操作系统进行资源分配和调度的基本单位。
进程可以理解为'正在运行的程序实例'。一个程序可以对应多个进程,每个进程拥有独立的内存空间,互不干扰。进程之间的资源不共享,通信需要通过特定的机制(如管道、队列、共享内存等)。
多进程的核心作用是让程序能够并行处理多个任务,提升执行效率。
import time
def download_file(file_name, cost_time):
print(f"开始下载{file_name}...")
time.sleep(cost_time)
print(f"{file_name}下载完成!")
if __name__ == "__main__":
start_time = time.time()
download_file("文件 A", 3)
download_file("文件 B", 4)
end_time = time.time()
print(f"总耗时:{end_time - start_time:.2f}秒")
单进程顺序执行,总耗时为 7 秒。
import time
from multiprocessing import Process
def download_file(file_name, cost_time):
print(f"开始下载{file_name}...")
time.sleep(cost_time)
print(f"{file_name}下载完成!")
if __name__ == "__main__":
start_time = time.time()
p1 = Process(target=download_file, args=("文件 A", 3))
p2 = Process(target=download_file, args=("文件 B", 4))
p1.start()
p2.start()
p1.join()
p2.join()
end_time = time.time()
print(f"总耗时:{end_time - start_time:.2f}秒")
多进程并行执行,总耗时约 4 秒,效率大幅提升。
Process 类的核心参数:
import time
from multiprocessing import Process
def print_numbers(name, count):
for i in range(count):
print(f"{name}: {i}")
time.sleep(0.5)
def print_letters(name, count):
for i in range(count):
print(f"{name}: {chr(65 + i)}")
time.sleep(0.5)
if __name__ == "__main__":
p1 = Process(name="数字进程", target=print_numbers, args=("数字进程", 3))
p2 = Process(name="字母进程 1", target=print_letters, args=("字母进程 1", 3))
p3 = Process(name="字母进程 2", target=print_letters, args=("字母进程 2", 3))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("所有任务执行完成!")
import time
from multiprocessing import Process
def download(user, file_name, file_size):
print(f"{user}开始下载{file_name}(大小:{file_size}MB)...")
time.sleep(file_size)
print(f"{user}的{file_name}下载完成!")
if __name__ == "__main__":
tasks = [("用户 A", "电影.mp4", 5), ("用户 B", "文档.pdf", 2),
("用户 C", "图片.png", 1), ("用户 D", "音乐.mp3", 3)]
processes = []
for task in tasks:
user, file_name, size = task
p = Process(target=download, args=(user, file_name, size))
processes.append(p)
p.start()
for p in processes:
p.join()
print("所有用户下载任务完成!")
进程编号(PID)是操作系统为每个进程分配的唯一标识,用于区分和管理进程。
import os
from multiprocessing import Process, current_process
def task(name):
pid = os.getpid()
ppid = os.getppid()
print(f"任务{name} - PID:{pid},PPID:{ppid}")
current_proc = current_process()
print(f"任务{name} - 进程名:{current_proc.name},PID:{current_proc.pid}")
if __name__ == "__main__":
main_pid = os.getpid()
print(f"主进程 - PID:{main_pid}")
p1 = Process(name="子进程 1", target=task, args=("A",))
p2 = Process(name="子进程 2", target=task, args=("B",))
p1.start()
p2.start()
p1.join()
p2.join()
print("主进程结束!")
每个进程都有独立的内存空间,因此进程之间的全局变量是不共享的。如果需要数据共享,需要使用 Queue、Pipe、Manager 等机制。
默认情况下,主进程会等待所有子进程执行完成后才会结束。若需主进程结束时子进程随之结束,可设置守护进程或主动销毁。
import time
from multiprocessing import Process
def long_task():
while True:
print("子进程正在运行...")
time.sleep(1)
if __name__ == "__main__":
p = Process(target=long_task)
p.daemon = True
p.start()
time.sleep(3)
print("主进程结束,守护子进程被强制终止!")
import time
from multiprocessing import Process
def task():
i = 0
while True:
print(f"子进程运行中,i = {i+1}")
i += 1
time.sleep(1)
if __name__ == "__main__":
p = Process(target=task)
p.start()
time.sleep(3)
p.terminate()
print("主进程主动销毁子进程!")
线程是进程内部的一个'执行单元',是操作系统进行任务调度的最小单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和系统资源。
适用于 IO 密集型任务。
对比单线程和多线程执行网络请求,多线程可显著减少总耗时。
import time
from threading import Thread
def request_url(url, cost_time):
print(f"开始请求{url}...")
time.sleep(cost_time)
print(f"{url}请求完成!")
if __name__ == "__main__":
start_time = time.time()
t1 = Thread(target=request_url, args=("https://www.baidu.com", 2))
t2 = Thread(target=request_url, args=("https://www.qq.com", 3))
t3 = Thread(target=request_url, args=("https://www.163.com", 1))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
end_time = time.time()
print(f"总耗时:{end_time - start_time:.2f}秒")
步骤与多进程类似:定义任务 -> 创建 Thread 对象 -> start() -> join()。
import time
from threading import Thread
def print_time(name, interval):
while True:
print(f"{name}:{time.strftime('%H:%M:%S', time.localtime())}")
time.sleep(interval)
def count_numbers(name, max_count):
for i in range(max_count):
print(f"{name}:{i}")
time.sleep(1)
if __name__ == "__main__":
t1 = Thread(name="时间线程", target=print_time, args=("时间线程", 2))
t2 = Thread(name="计数线程", target=count_numbers, args=("计数线程", 5))
t1.start()
t2.start()
t2.join()
print("计数线程完成,程序结束!")
import time
from threading import Thread
def download_files(file_type, count, cost_per):
for i in range(1, count + 1):
print(f"开始下载{file_type}{i}...")
time.sleep(cost_per)
print(f"{file_type}{i}下载完成!")
if __name__ == "__main__":
tasks = [("图片", 3, 1), ("视频", 2, 3), ("文档", 4, 0.5)]
threads = []
for task in tasks:
file_type, count, cost = task
t = Thread(target=download_files, args=(file_type, count, cost))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有文件下载完成!")
可通过守护线程或全局变量控制。
import time
from threading import Thread
def task():
while True:
print("子线程运行中...")
time.sleep(1)
if __name__ == "__main__":
t = Thread(target=task)
t.daemon = True
t.start()
time.sleep(3)
print("主线程结束,守护子线程被终止!")
import time
from threading import Thread
running = True
def task():
while running:
print("子线程运行中...")
time.sleep(1)
print("子线程检测到退出信号,正在收尾...")
time.sleep(2)
print("子线程收尾完成,退出!")
if __name__ == "__main__":
t = Thread(target=task)
t.start()
time.sleep(3)
running = False
print("主线程发出退出信号!")
t.join()
print("主线程结束!")
线程的执行顺序由操作系统调度器决定,是不确定的。如需特定顺序,需使用同步机制(锁、条件变量等)。
线程之间共享进程的内存空间,全局变量是共享的。但需注意线程安全问题(竞态条件),可使用 Lock 保护。
import time
from threading import Thread, Lock
global_num = 0
lock = Lock()
def add_num():
global global_num
for i in range(1000000):
lock.acquire()
global_num += 1
lock.release()
if __name__ == "__main__":
t1 = Thread(target=add_num)
t2 = Thread(target=add_num)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终 global_num 的值:{global_num}")
| 对比维度 | 进程 | 线程 |
|---|---|---|
| 资源分配 | 独立内存空间、系统资源 | 共享进程的资源 |
| 通信方式 | 复杂(队列、管道) | 简单(共享变量) |
| 创建/切换开销 | 大 | 小 |
| 线程安全 | 无问题 | 存在安全问题 |
| 稳定性 | 高 | 低 |
协程也叫微线程,是一种在单个线程里让多个任务交替执行的编程方式。核心优势包括切换成本低、占用资源少、无需考虑数据安全问题、处理等待类任务效率极高。
最适合有大量等待操作的 IO 密集型任务,如批量爬取网页、高并发 Web 服务、数据库操作等。
import asyncio
async def my_coroutine():
print("Start")
await asyncio.sleep(1)
print("End")
asyncio.run(my_coroutine())
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} completed")
async def main():
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1))
]
await asyncio.gather(*tasks)
asyncio.run(main())
asyncio.run():启动事件循环。asyncio.create_task():将协程包装为任务。asyncio.gather():并发执行多个协程。asyncio.sleep():非阻塞式等待。import aiohttp
import asyncio
import time
async def async_crawl(url):
print(f"开始爬取:{url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
html = await response.text()
print(f"爬取完成:{url},状态码:{response.status}")
return len(html)
async def main():
urls = ["https://www.baidu.com", "https://www.qq.com", "https://www.163.com"]
coros = [async_crawl(url) for url in urls]
results = await asyncio.gather(*coros)
return results
if __name__ == "__main__":
start_time = time.time()
results = asyncio.run(main())
print(f"各网站源码长度:{results}")
print(f"协程爬虫总耗时:{time.time() - start_time:.2f}秒")
注意:Headers 和 Cookies 需根据实际环境配置,避免泄露敏感信息。
import aiohttp
import asyncio
async def aio_main():
headers = {
"User-Agent": "Mozilla/5.0..."
}
# 注意:实际使用时请替换为有效的 Cookie,不要硬编码敏感信息
cookies = {}
url = "https://image.baidu.com/search/index"
params = {"word": "示例关键词"}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params) as response:
text = await response.text()
print(f"获取到页面内容,长度:{len(text)}")
if __name__ == "__main__":
asyncio.run(aio_main())
进程、线程、协程是 Python 应对不同场景的多任务工具:进程适合 CPU 密集型任务,线程适配 IO 密集型场景,协程则是单线程下的异步效率利器。掌握它们的特性与适用场景,能让程序更高效。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online