跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Python

Python 进程池 ProcessPoolExecutor 使用指南

Python 进程池 ProcessPoolExecutor 是处理 CPU 密集型任务的强大工具。介绍其基本用法,包括任务提交(submit/map)、结果获取(result/as_completed)、异常处理及回调机制。通过实际案例展示批量图片处理场景,并提供性能优化技巧如合理设置 max_workers 和减少数据传输。对比线程池选择建议,强调根据任务类型(CPU 或 I/O 密集型)选择合适的并发模型,确保资源正确释放并解决共享数据问题。

PhpPioneer发布于 2026/3/16更新于 2026/5/2220 浏览
Python 进程池 ProcessPoolExecutor 使用指南

一、进程池概述

进程池(ProcessPoolExecutor)是 Python 中用于并行执行任务的强大工具,尤其适合 CPU 密集型操作。与传统的多进程编程相比,它提供了更简单、更高级的接口。

适用场景

  1. CPU 密集型任务(数学计算、图像处理等)
  2. 需要并行处理独立任务的情况
  3. 需要限制并发进程数量的场景
  4. 需要获取任务执行结果的场景

二、基本使用

from concurrent.futures import ProcessPoolExecutor
import time

# CPU 密集型计算函数
def calculate_square(n):
    print(f"计算 {n} 的平方...")
    time.sleep(1)  # 模拟耗时计算
    return n * n

# 使用进程池
with ProcessPoolExecutor(max_workers=4) as executor:
    # 提交任务到进程池
    future1 = executor.submit(calculate_square, 5)
    future2 = executor.submit(calculate_square, 8)
    
    # 获取任务结果
    print(f"5 的平方 = {future1.result()}")
    print(f"8 的平方 = {future2.result()}")

三、核心方法详解

1. 任务提交

map(): 批量提交任务

results = executor.map(func, iterable, timeout=None)

submit(): 提交单个任务

future = executor.submit(func, *args, **kwargs)

2. 结果处理

as_completed(): 按照完成顺序获取结果

from concurrent.futures import as_completed

futures = [executor.submit(calculate_square, i) for i in range(1, 6)]
for future in as_completed(futures):
    print(f"结果:{future.result()}")

future.result(timeout=None): 获取任务结果(阻塞)

result = future.result()  # 阻塞直到结果返回

四、高级用法

1. 限制并发进程数

# 最多同时运行 2 个进程
with ProcessPoolExecutor(max_workers=2) as executor:
    results = list(executor.map(calculate_square, range(1, 5)))
    print(results)

2. 获取任务状态

future = executor.submit(calculate_square, 10)
if future.running():
    print("任务正在运行...")
elif future.done():
    print("任务已完成!")

3. 回调处理结果

def result_callback(future):
    print(f"收到结果:{future.result()}")

with ProcessPoolExecutor() as executor:
    future = executor.submit(calculate_square, 15)
    future.add_done_callback(result_callback)

4. 处理异常

def divide(a, b):
    return a / b

try:
    future = executor.submit(divide, 10, 0)
    result = future.result()
except ZeroDivisionError as e:
    print(f"出现错误:{e}")

五、实际应用案例

案例:批量图片处理

from PIL import Image
import os
from concurrent.futures import ProcessPoolExecutor

# 图片处理函数
def process_image(image_path):
    try:
        img = Image.open(image_path)
        # 图片处理操作
        img = img.resize((800, 600))
        img = img.convert('L')  # 转为灰度图
        # 保存处理后的图片
        new_path = os.path.splitext(image_path)[0] + "_processed.jpg"
        img.save(new_path)
        return f"已处理:{image_path}"
    except Exception as e:
        return f"处理失败:{image_path} - {str(e)}"

# 获取图片目录中的所有图片
image_dir = "images"
image_files = [
    os.path.join(image_dir, f)
    for f in os.listdir(image_dir)
    if f.endswith(('.jpg', '.png'))
]

# 使用进程池处理
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
    # 提交所有任务
    futures = {
        executor.submit(process_image, img): img 
        for img in image_files
    }
    # 获取结果
    for future in as_completed(futures):
        result = future.result()
        print(result)

六、性能优化技巧

  1. 选择合适的 max_workers:
    • 对于 CPU 密集型任务:max_workers=os.cpu_count()
    • 对于 I/O 密集型任务:max_workers=(os.cpu_count() * 2)
  2. 减少数据传输:
    • 避免在进程间传递大对象
    • 使用共享内存 (SharedMemory) 或服务器进程 (Manager) 优化数据共享

任务分块:

# 减少小任务的数量
def process_chunk(chunk):
    return [calculate_square(n) for n in chunk]

chunks = [range(i, i+1000) for i in range(0, 10000, 1000)]
results = executor.map(process_chunk, chunks)

预加载数据:

# 使用 initializer 预加载共享数据
def init_worker():
    global shared_data
    shared_data = load_big_data()

def process_item(item):
    return process(shared_data, item)

with ProcessPoolExecutor(initializer=init_worker) as executor:
    ...

七、常见问题解决方案

问题 1:子进程异常导致无限等待

解决方案:

# 设置超时时间
try:
    result = future.result(timeout=60)  # 最多等待 60 秒
except TimeoutError:
    print("任务超时")

问题 2:子进程不被回收

解决方案:

# 使用上下文管理器确保资源回收
with ProcessPoolExecutor() as executor:
    # 执行代码
    # 离开 with 块后自动关闭进程池

问题 3:共享数据问题

解决方案:

from multiprocessing import Manager

def worker(shared_list, data):
    shared_list.append(process(data))

with Manager() as manager:
    shared_list = manager.list()
    with ProcessPoolExecutor() as executor:
        executor.map(worker, [shared_list]*len(data), data)
    print(list(shared_list))

八、与线程池的选择建议

特性进程池 (ProcessPoolExecutor)线程池 (ThreadPoolExecutor)
适用任务CPU 密集型I/O 密集型
内存使用高 (每个进程独立内存空间)低 (共享内存)
上下文切换开销高低
GIL 限制避免 GIL 影响受 GIL 限制
数据共享复杂 (需要专门机制)简单 (直接共享)
通信开销高 (需要序列化)低 (直接内存访问)

选择建议:

  • 优先考虑线程池处理 I/O 密集型任务
  • 仅当任务受 GIL 限制时使用进程池
  • 混合使用:I/O 密集型任务使用线程池,CPU 密集型任务使用进程池

九、结语

ProcessPoolExecutor 是 Python 并发编程的核心组件之一,熟练掌握它可以显著提升程序性能。关键要点:

  1. 使用上下文管理器 (with语句) 确保资源正确释放
  2. 根据任务类型选择合理的 max_workers 数量
  3. 优先使用 map() 和 as_completed() 管理批量任务
  4. 处理好任务间的数据共享问题
  5. 针对不同任务特点优化参数配置

目录

  1. 一、进程池概述
  2. 适用场景
  3. 二、基本使用
  4. CPU 密集型计算函数
  5. 使用进程池
  6. 三、核心方法详解
  7. 1. 任务提交
  8. 2. 结果处理
  9. 四、高级用法
  10. 1. 限制并发进程数
  11. 最多同时运行 2 个进程
  12. 2. 获取任务状态
  13. 3. 回调处理结果
  14. 4. 处理异常
  15. 五、实际应用案例
  16. 案例:批量图片处理
  17. 图片处理函数
  18. 获取图片目录中的所有图片
  19. 使用进程池处理
  20. 六、性能优化技巧
  21. 减少小任务的数量
  22. 使用 initializer 预加载共享数据
  23. 七、常见问题解决方案
  24. 问题 1:子进程异常导致无限等待
  25. 设置超时时间
  26. 问题 2:子进程不被回收
  27. 使用上下文管理器确保资源回收
  28. 问题 3:共享数据问题
  29. 八、与线程池的选择建议
  30. 九、结语
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • YOLOv8.3 无人机航拍小目标检测实战与优化策略
  • AI 时代数据库的融合与智能化演进
  • Less 预处理器实战:三种运行模式与核心语法解析
  • 6 款主流 AI 模型评测:国产 Agent 第一梯队是谁?
  • 2024 大模型与编译器技术秋招面试题解析
  • 从普通产品经理到AI产品经理:转型准备与技能升级指南
  • 融合 YOLO 与大语言模型的无人机河道智能巡检系统
  • ARINC 825 航空电子通信总线标准详解
  • MySQL 索引及其底层数据结构详解
  • Ubuntu 22.04 网络配置实战:静态 IP 与 DNS 设置
  • Vivado 安装包精简配置方案:高效启动 FPGA 开发
  • uv 精准指定 Python 版本管理技巧
  • 昇腾 NPU 部署 Llama 模型:环境搭建、性能测试与问题排查
  • Python 基础语法详解
  • 基于 LLaMA-Factory 的 LLM DPO 训练实战
  • 基于 Docker 与内网穿透实现 Nginx 远程访问
  • Windows 系统提示找不到文件 javaw.exe 的解决方案
  • Android 中高级开发技术面试真题与解析
  • B-树模拟实现详解
  • 基于 Q-Learning 的无人机三维动态避障路径规划 (Matlab 实现)

相关免费在线工具

  • curl 转代码

    解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online

  • JSON 压缩

    通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online