跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Python算法

Python 并发编程实战:多线程、多进程与线程池应用

Python 并发编程涉及多线程、多进程及线程池等核心机制。文章基于 threading 与 multiprocessing 库详解任务创建、同步互斥及进程间通信,结合 concurrent.futures 实现高效并发。通过文件下载与数据处理实战,对比 CPU 与 I/O 密集型场景的资源分配策略,助力开发者构建高性能异步应用。

beaabea发布于 2026/3/25更新于 2026/6/1117 浏览
Python 并发编程实战:多线程、多进程与线程池应用

Python 并发编程实战:多线程、多进程与线程池应用

在开发高性能应用时,如何充分利用系统资源是关键。Python 的并发编程主要涉及多线程和多进程两种模式,理解它们的适用场景及实现细节,能显著提升程序效率。

并发编程基础

并发编程允许程序同时执行多个任务,从而减少总运行时间并提高资源利用率。根据任务类型不同,选择策略也有所区别:

  • CPU 密集型(如数学计算):受限于 GIL(全局解释器锁),更适合使用多进程。
  • I/O 密集型(如网络请求、文件读写):线程切换开销小,多线程通常更高效。

多线程编程实践

创建与管理线程

使用 threading 模块可以方便地创建线程。下面是一个简单的示例,展示了如何启动并等待线程结束:

import threading
import time

def thread_function(name):
    print(f'线程 {name} 开始')
    time.sleep(2)
    print(f'线程 {name} 结束')

# 创建线程
thread1 = threading.Thread(target=thread_function, args=('Thread 1',))
thread2 = threading.Thread(target=thread_function, args=('Thread 2',))

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()
print('所有线程结束')

同步与互斥

当多个线程访问共享资源时,必须注意数据竞争问题。通过互斥锁(Lock)可以保证同一时刻只有一个线程修改资源。

import threading
import time

counter = 0
lock = threading.Lock()

def thread_function(name):
    global counter
    print(f'线程 {name} 开始')
    
    # 获取锁
    lock.acquire()
    try:
        counter += 1
        print(f'线程 {name} 已修改共享资源,值为 {counter}')
    finally:
        # 释放锁
        lock.release()
    
    time.sleep(2)
    print(f'线程 {name} 结束')

threads = []
for i in range(5):
    thread = threading.Thread(target=thread_function, args=(f'Thread {i}',))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f'所有线程结束,共享资源的值为 {counter}')

在实际开发中,推荐使用上下文管理器(with 语句)来自动处理锁的获取与释放,避免死锁风险。

线程池管理

频繁创建和销毁线程会消耗性能,使用线程池(ThreadPoolExecutor)可以复用线程资源。

import concurrent.futures
import time

def thread_function(name):
    print(f'线程 {name} 开始')
    time.sleep(2)
    print(f'线程 {name} 结束')
    return name

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(thread_function, 'Thread 1')
    future2 = executor.submit(thread_function, 'Thread 2')
    future3 = executor.submit(thread_function, 'Thread 3')

    print(f'线程 {future1.result()} 完成')
    print(f'线程 {future2.result()} 完成')
    print(f'线程 {future3.result()} 完成')
    print('所有线程结束')

多进程编程实践

对于 CPU 密集型任务,多进程可以绕过 GIL 限制,利用多核 CPU 优势。

进程创建与通信

import multiprocessing
import time

def process_function(conn):
    print(f'子进程发送数据')
    conn.send('Hello from child process')
    time.sleep(2)
    print(f'子进程结束')
    conn.close()

parent_conn, child_conn = multiprocessing.Pipe()
process = multiprocessing.Process(target=process_function, args=(child_conn,))
process.start()

print(f'父进程接收数据:{parent_conn.recv()}')
process.join()
print('所有进程结束')

进程池

类似线程池,ProcessPoolExecutor 用于管理进程池,适合批量处理独立任务。

import concurrent.futures
import time

def process_function(name):
    print(f'进程 {name} 开始')
    time.sleep(2)
    print(f'进程 {name} 结束')
    return name

with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(process_function, 'Process 1')
    future2 = executor.submit(process_function, 'Process 2')
    future3 = executor.submit(process_function, 'Process 3')

    print(f'进程 {future1.result()} 完成')
    print(f'进程 {future2.result()} 完成')
    print(f'进程 {future3.result()} 完成')
    print('所有进程结束')

实战案例:并发下载文件

假设我们需要从多个 URL 下载文件,并处理可能的失败情况。这里使用线程池配合 requests 库。

import requests
import concurrent.futures
import os

def download_file(url, save_path):
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(save_path, 'wb') as file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    file.write(chunk)
        print(f'文件 {save_path} 下载成功')
        return save_path
    except Exception as e:
        print(f'文件 {save_path} 下载失败:{e}')
        return None

def download_files(urls, save_dir, max_workers=5):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    save_paths = [os.path.join(save_dir, os.path.basename(url)) for url in urls]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(download_file, url, save_path) for url, save_path in zip(urls, save_paths)]
        
        for future in concurrent.futures.as_completed(futures):
            future.result()

if __name__ == '__main__':
    urls = [
        'https://www.example.com/page1.html',
        'https://www.example.com/page2.html',
        'https://www.example.com/page3.html',
        'https://www.example.com/page4.html',
        'https://www.example.com/page5.html'
    ]
    save_dir = 'downloads'
    download_files(urls, save_dir)

实战案例:并发数据处理

如果是 CPU 密集型的统计计算,则应切换到进程池,并使用 pandas 处理数据。

import pandas as pd
import concurrent.futures
import os

def process_file(file_path):
    try:
        df = pd.read_csv(file_path)
        stats = {
            '文件名': os.path.basename(file_path),
            '行数': df.shape[0],
            '列数': df.shape[1],
            '平均值': df.mean().to_dict(),
            '最大值': df.max().to_dict(),
            '最小值': df.min().to_dict()
        }
        print(f'文件 {os.path.basename(file_path)} 处理成功')
        return stats
    except Exception as e:
        print(f'文件 {os.path.basename(file_path)} 处理失败:{e}')
        return None

def process_files(file_paths, max_workers=5):
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_file, file_path) for file_path in file_paths]
        results = []
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if result:
                results.append(result)
    return results

def save_results(results, save_path):
    df = pd.DataFrame(results)
    df.to_csv(save_path, index=False)
    print(f'处理结果已保存到 {save_path}')

if __name__ == '__main__':
    file_paths = ['data1.csv', 'data2.csv', 'data3.csv', 'data4.csv', 'data5.csv']
    save_path = 'results.csv'
    results = process_files(file_paths)
    save_results(results, save_path)

总结

掌握并发编程的核心在于根据任务特性选择合适的模型。I/O 操作多用线程,计算密集型多用进程。通过 concurrent.futures 统一接口管理线程池和进程池,能让代码更简洁且易于维护。实际项目中,建议结合具体业务场景进行压测,找到最佳的并发度配置。

目录

  1. Python 并发编程实战:多线程、多进程与线程池应用
  2. 并发编程基础
  3. 多线程编程实践
  4. 创建与管理线程
  5. 创建线程
  6. 启动线程
  7. 等待线程结束
  8. 同步与互斥
  9. 线程池管理
  10. 多进程编程实践
  11. 进程创建与通信
  12. 进程池
  13. 实战案例:并发下载文件
  14. 实战案例:并发数据处理
  15. 总结
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • F5 刷新时浏览器前端发生了什么
  • 零基础 Python 爬虫学习指南:安装、应用方向与就业薪资
  • Neo4j 数据库 Windows 环境安装与运行指南
  • Python 编程入门指南:基础语法与核心应用
  • PyTorch 中 LSTM 模型参数详解
  • Telegram Bot 与 Mini-App 开发实践:获取 window.Telegram.WebApp 对象及解析
  • MyLesson 小程序前台前端开发(一)
  • GLM-5 大模型代码生成能力深度评测与实战
  • Dubbo 服务降级:Mock 机制原理与实战
  • OpenClaw 技术解析:让大模型从对话走向执行
  • 利用 AI 工具组合高效完成文献综述的方法与实践
  • Spring MVC 请求参数处理详解
  • OpenClaw 完整搭建指南:从零打造 AI 助手
  • Kubernetes Python 客户端实战教程
  • Python 3.14 安装教程:Windows/macOS/Linux 全平台指南
  • AI 编辑器收费模式变革:Token 计费下的开发者生存逻辑
  • 网络安全工程师学习指南:从零开始掌握 Web 安全与渗透测试技术
  • 本地部署 Web-Check 并通过内网穿透实现远程协作
  • 宇树 Unitree 机器人 ROS 2 环境部署指南 (Go2/B2/H1 + Humble)
  • Faster-Whisper-GUI 日语语音识别异常问题解析与实战方案

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • curl 转代码

    解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online