Python 并发编程实战:多线程、多进程与线程池应用
在开发高性能应用时,如何充分利用系统资源是关键。Python 的并发编程主要涉及多线程和多进程两种模式,理解它们的适用场景及实现细节,能显著提升程序效率。
并发编程基础
并发编程允许程序同时执行多个任务,从而减少总运行时间并提高资源利用率。根据任务类型不同,选择策略也有所区别:
- CPU 密集型(如数学计算):受限于 GIL(全局解释器锁),更适合使用多进程。
- I/O 密集型(如网络请求、文件读写):线程切换开销小,多线程通常更高效。
多线程编程实践
创建与管理线程
使用 threading 模块可以方便地创建线程。下面是一个简单的示例,展示了如何启动并等待线程结束:
import threading
import time
def thread_function(name):
print(f'线程 {name} 开始')
time.sleep(2)
print(f'线程 {name} 结束')
# 创建线程
thread1 = threading.Thread(target=thread_function, args=('Thread 1',))
thread2 = threading.Thread(target=thread_function, args=('Thread 2',))
# 启动线程
thread1.start()
thread2.start()
# 等待线程结束
thread1.join()
thread2.join()
print('所有线程结束')
同步与互斥
当多个线程访问共享资源时,必须注意数据竞争问题。通过互斥锁(Lock)可以保证同一时刻只有一个线程修改资源。
import threading
import time
counter = 0
lock = threading.Lock()
def thread_function(name):
global counter
print(f'线程 {name} 开始')
# 获取锁
lock.acquire()
try:
counter += 1
print(f'线程 {name} 已修改共享资源,值为 {counter}')
finally:
# 释放锁
lock.release()
time.sleep(2)
print(f'线程 {name} 结束')
threads = []
for i in range(5):
thread = threading.Thread(target=thread_function, args=(f'Thread {i}',))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f'所有线程结束,共享资源的值为 {counter}')
在实际开发中,推荐使用上下文管理器(with 语句)来自动处理锁的获取与释放,避免死锁风险。
线程池管理
频繁创建和销毁线程会消耗性能,使用线程池(ThreadPoolExecutor)可以复用线程资源。
import concurrent.futures
import time
def thread_function(name):
print(f'线程 {name} 开始')
time.sleep(2)
print(f'线程 {name} 结束')
return name
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(thread_function, 'Thread 1')
future2 = executor.submit(thread_function, 'Thread 2')
future3 = executor.submit(thread_function, 'Thread 3')
print(f'线程 {future1.result()} 完成')
print(f'线程 {future2.result()} 完成')
print(f'线程 {future3.result()} 完成')
print('所有线程结束')
多进程编程实践
对于 CPU 密集型任务,多进程可以绕过 GIL 限制,利用多核 CPU 优势。
进程创建与通信
import multiprocessing
import time
def process_function(conn):
print(f'子进程发送数据')
conn.send('Hello from child process')
time.sleep(2)
print(f'子进程结束')
conn.close()
parent_conn, child_conn = multiprocessing.Pipe()
process = multiprocessing.Process(target=process_function, args=(child_conn,))
process.start()
print(f'父进程接收数据:{parent_conn.recv()}')
process.join()
print('所有进程结束')
进程池
类似线程池,ProcessPoolExecutor 用于管理进程池,适合批量处理独立任务。
import concurrent.futures
import time
def process_function(name):
print(f'进程 {name} 开始')
time.sleep(2)
print(f'进程 {name} 结束')
return name
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(process_function, 'Process 1')
future2 = executor.submit(process_function, 'Process 2')
future3 = executor.submit(process_function, 'Process 3')
print(f'进程 {future1.result()} 完成')
print(f'进程 {future2.result()} 完成')
print(f'进程 {future3.result()} 完成')
print('所有进程结束')
实战案例:并发下载文件
假设我们需要从多个 URL 下载文件,并处理可能的失败情况。这里使用线程池配合 requests 库。
import requests
import concurrent.futures
import os
def download_file(url, save_path):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
print(f'文件 {save_path} 下载成功')
return save_path
except Exception as e:
print(f'文件 {save_path} 下载失败:{e}')
return None
def download_files(urls, save_dir, max_workers=5):
if not os.path.exists(save_dir):
os.makedirs(save_dir)
save_paths = [os.path.join(save_dir, os.path.basename(url)) for url in urls]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(download_file, url, save_path) for url, save_path in zip(urls, save_paths)]
for future in concurrent.futures.as_completed(futures):
future.result()
if __name__ == '__main__':
urls = [
'https://www.example.com/page1.html',
'https://www.example.com/page2.html',
'https://www.example.com/page3.html',
'https://www.example.com/page4.html',
'https://www.example.com/page5.html'
]
save_dir = 'downloads'
download_files(urls, save_dir)
实战案例:并发数据处理
如果是 CPU 密集型的统计计算,则应切换到进程池,并使用 pandas 处理数据。
import pandas as pd
import concurrent.futures
import os
def process_file(file_path):
try:
df = pd.read_csv(file_path)
stats = {
'文件名': os.path.basename(file_path),
'行数': df.shape[0],
'列数': df.shape[1],
'平均值': df.mean().to_dict(),
'最大值': df.max().to_dict(),
'最小值': df.min().to_dict()
}
print(f'文件 {os.path.basename(file_path)} 处理成功')
return stats
except Exception as e:
print(f'文件 {os.path.basename(file_path)} 处理失败:{e}')
return None
def process_files(file_paths, max_workers=5):
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_file, file_path) for file_path in file_paths]
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
results.append(result)
return results
def save_results(results, save_path):
df = pd.DataFrame(results)
df.to_csv(save_path, index=False)
print(f'处理结果已保存到 {save_path}')
if __name__ == '__main__':
file_paths = ['data1.csv', 'data2.csv', 'data3.csv', 'data4.csv', 'data5.csv']
save_path = 'results.csv'
results = process_files(file_paths)
save_results(results, save_path)
总结
掌握并发编程的核心在于根据任务特性选择合适的模型。I/O 操作多用线程,计算密集型多用进程。通过 concurrent.futures 统一接口管理线程池和进程池,能让代码更简洁且易于维护。实际项目中,建议结合具体业务场景进行压测,找到最佳的并发度配置。


