Python 并发编程:多线程与多进程实战
介绍 Python 并发编程基础,涵盖多线程、多进程、线程池及进程池的使用。通过 threading 和 multiprocessing 库演示创建、同步、互斥机制,并结合 requests 下载文件和 pandas 数据处理两个实战案例,展示如何提升程序执行效率与资源利用率。

介绍 Python 并发编程基础,涵盖多线程、多进程、线程池及进程池的使用。通过 threading 和 multiprocessing 库演示创建、同步、互斥机制,并结合 requests 下载文件和 pandas 数据处理两个实战案例,展示如何提升程序执行效率与资源利用率。

学习目标:掌握 Python 并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习 threading、multiprocessing 等核心库的使用;通过实战案例开发并发应用程序。
学习重点:多线程的创建与管理、多进程的创建与管理、线程池与进程池、同步与互斥、并发编程实战。
并发编程是一种编程方式,允许程序同时执行多个任务,从而提高程序的执行效率。在并发编程中,任务可以是线程或进程。
import threading
import time
# 定义线程函数
def thread_function(name):
print(f'线程 {name} 开始')
time.sleep(2)
print(f'线程 {name} 结束')
# 创建线程
thread1 = threading.Thread(target=thread_function, args=('Thread 1',))
thread2 = threading.Thread(target=thread_function, args=('Thread 2',))
# 启动线程
thread1.start()
thread2.start()
# 等待线程结束
thread1.join()
thread2.join()
print('所有线程结束')
import threading
import time
# 共享资源
counter = 0
# 互斥锁
lock = threading.Lock()
# 定义线程函数
def thread_function(name):
global counter
print(f'线程 {name} 开始')
# 获取锁
lock.acquire()
try:
# 修改共享资源
counter += 1
print(f'线程 {name} 已修改共享资源,值为 {counter}')
finally:
# 释放锁
lock.release()
time.sleep(2)
print(f'线程 {name} 结束')
# 创建线程
threads = []
for i in range(5):
thread = threading.Thread(target=thread_function, args=(f'Thread {i}',))
threads.append(thread)
thread.start()
# 等待所有线程结束
for thread in threads:
thread.join()
print(f'所有线程结束,共享资源的值为 {counter}')
import concurrent.futures
import time
# 定义线程函数
def thread_function(name):
print(f'线程 {name} 开始')
time.sleep(2)
print(f'线程 {name} 结束')
return name
# 使用线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
future1 = executor.submit(thread_function, 'Thread 1')
future2 = executor.submit(thread_function, 'Thread 2')
future3 = executor.submit(thread_function, 'Thread 3')
# 获取结果
print(f'线程 {future1.result()} 完成')
print(f'线程 {future2.result()} 完成')
print(f'线程 {future3.result()} 完成')
print('所有线程结束')
import multiprocessing
import time
# 定义进程函数
def process_function(name):
print(f'进程 {name} 开始')
time.sleep(2)
print(f'进程 {name} 结束')
# 创建进程
process1 = multiprocessing.Process(target=process_function, args=('Process 1',))
process2 = multiprocessing.Process(target=process_function, args=('Process 2',))
# 启动进程
process1.start()
process2.start()
# 等待进程结束
process1.join()
process2.join()
print('所有进程结束')
import multiprocessing
import time
# 定义进程函数
def process_function(conn):
print(f'子进程发送数据')
conn.send('Hello from child process')
time.sleep(2)
print(f'子进程结束')
conn.close()
# 创建管道
parent_conn, child_conn = multiprocessing.Pipe()
# 创建进程
process = multiprocessing.Process(target=process_function, args=(child_conn,))
process.start()
# 接收数据
print(f'父进程接收数据:{parent_conn.recv()}')
# 等待进程结束
process.join()
print('所有进程结束')
import concurrent.futures
import time
# 定义进程函数
def process_function(name):
print(f'进程 {name} 开始')
time.sleep(2)
print(f'进程 {name} 结束')
return name
# 使用进程池
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
# 提交任务
future1 = executor.submit(process_function, 'Process 1')
future2 = executor.submit(process_function, 'Process 2')
future3 = executor.submit(process_function, 'Process 3')
# 获取结果
print(f'进程 {future1.result()} 完成')
print(f'进程 {future2.result()} 完成')
print(f'进程 {future3.result()} 完成')
print('所有进程结束')
import threading
import time
# 共享资源
counter = 0
# 互斥锁
lock = threading.Lock()
# 定义线程函数
def thread_function(name):
global counter
print(f'线程 {name} 开始')
# 获取锁
lock.acquire()
try:
# 修改共享资源
counter += 1
print(f'线程 {name} 已修改共享资源,值为 {counter}')
finally:
# 释放锁
lock.release()
time.sleep(2)
print(f'线程 {name} 结束')
# 创建线程
threads = []
for i in range(5):
thread = threading.Thread(target=thread_function, args=(f'Thread {i}',))
threads.append(thread)
thread.start()
# 等待所有线程结束
for thread in threads:
thread.join()
print(f'所有线程结束,共享资源的值为 {counter}')
import threading
import time
# 共享资源
items = []
# 条件变量
condition = threading.Condition()
# 生产者函数
def producer():
for i in range(5):
with condition:
items.append(i)
print(f'生产者生产了 {i}')
condition.notify()
time.sleep(1)
# 消费者函数
def consumer():
for i in range(5):
with condition:
while not items:
condition.wait()
item = items.pop(0)
print(f'消费者消费了 {item}')
time.sleep(1)
# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待线程结束
producer_thread.join()
consumer_thread.join()
print('所有线程结束')
开发一个并发下载文件的程序,支持以下功能:
import requests
import concurrent.futures
import os
# 下载文件
def download_file(url, save_path):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
print(f'文件 {save_path} 下载成功')
return save_path
except Exception as e:
print(f'文件 {save_path} 下载失败:{e}')
return None
# 并发下载文件
def download_files(urls, save_dir, max_workers=5):
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 创建保存路径
save_paths = [os.path.join(save_dir, os.path.basename(url)) for url in urls]
# 使用线程池下载文件
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(download_file, url, save_path) for url, save_path in zip(urls, save_paths)]
# 获取结果
for future in concurrent.futures.as_completed(futures):
future.result()
# 测试程序
if __name__ == '__main__':
urls = [
'https://www.example.com/page1.html',
'https://www.example.com/page2.html',
'https://www.example.com/page3.html',
'https://www.example.com/page4.html',
'https://www.example.com/page5.html'
]
save_dir = 'downloads'
download_files(urls, save_dir)
通过并发下载文件的程序,我们可以实现以下功能:
开发一个并发数据处理的程序,支持以下功能:
import pandas as pd
import concurrent.futures
import os
# 处理数据文件
def process_file(file_path):
try:
# 读取数据
df = pd.read_csv(file_path)
# 计算统计信息
stats = {
'文件名': os.path.basename(file_path),
'行数': df.shape[0],
'列数': df.shape[1],
'平均值': df.mean().to_dict(),
'最大值': df.max().to_dict(),
'最小值': df.min().to_dict()
}
print(f'文件 {os.path.basename(file_path)} 处理成功')
return stats
except Exception as e:
print(f'文件 {os.path.basename(file_path)} 处理失败:{e}')
return None
# 并发处理数据文件
def process_files(file_paths, max_workers=5):
# 使用进程池处理数据文件
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_file, file_path) for file_path in file_paths]
# 获取结果
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
results.append(result)
return results
# 保存处理结果
def save_results(results, save_path):
df = pd.DataFrame(results)
df.to_csv(save_path, index=False)
print(f'处理结果已保存到 {save_path}')
# 测试程序
if __name__ == '__main__':
file_paths = ['data1.csv', 'data2.csv', 'data3.csv', 'data4.csv', 'data5.csv']
save_path = 'results.csv'
results = process_files(file_paths)
save_results(results, save_path)
通过并发数据处理的程序,我们可以实现以下功能:
本文详细介绍了 Python 并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习了 threading、multiprocessing 等核心库的使用;通过实战案例开发了并发下载文件和并发数据处理的程序。
建议读者在学习过程中多练习,通过编写代码加深对知识点的理解。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online