Python开发从入门到精通:并发编程与多进程
Python开发从入门到精通:并发编程与多进程
一、学习目标与重点
💡 学习目标:掌握Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习threading、multiprocessing等核心库的使用;通过实战案例开发并发应用程序。
⚠️ 学习重点:多线程的创建与管理、多进程的创建与管理、线程池与进程池、同步与互斥、并发编程实战。
22.1 并发编程概述
22.1.1 什么是并发编程
并发编程是一种编程方式,允许程序同时执行多个任务,从而提高程序的执行效率。在并发编程中,任务可以是线程或进程。
22.1.2 并发编程的优势
- 提高执行效率:同时执行多个任务,减少程序的总运行时间。
- 充分利用资源:充分利用CPU和内存资源。
- 简化代码结构:通过多线程或多进程,代码结构更加简洁。
22.1.3 并发编程的应用场景
- CPU密集型任务:如数据处理、数学计算等。
- I/O密集型任务:如文件操作、网络通信等。
22.2 多线程编程
22.2.1 线程的创建与管理
import threading import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')# 创建线程 thread1 = threading.Thread(target=thread_function, args=('Thread 1',)) thread2 = threading.Thread(target=thread_function, args=('Thread 2',))# 启动线程 thread1.start() thread2.start()# 等待线程结束 thread1.join() thread2.join()print('所有线程结束')22.2.2 线程的同步与互斥
import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')22.2.3 线程池
import concurrent.futures import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')return name # 使用线程池with concurrent.futures.ThreadPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(thread_function,'Thread 1') future2 = executor.submit(thread_function,'Thread 2') future3 = executor.submit(thread_function,'Thread 3')# 获取结果print(f'线程 {future1.result()} 完成')print(f'线程 {future2.result()} 完成')print(f'线程 {future3.result()} 完成')print('所有线程结束')22.3 多进程编程
22.3.1 进程的创建与管理
import multiprocessing import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')# 创建进程 process1 = multiprocessing.Process(target=process_function, args=('Process 1',)) process2 = multiprocessing.Process(target=process_function, args=('Process 2',))# 启动进程 process1.start() process2.start()# 等待进程结束 process1.join() process2.join()print('所有进程结束')22.3.2 进程的通信
import multiprocessing import time # 定义进程函数defprocess_function(conn):print(f'子进程发送数据') conn.send('Hello from child process') time.sleep(2)print(f'子进程结束') conn.close()# 创建管道 parent_conn, child_conn = multiprocessing.Pipe()# 创建进程 process = multiprocessing.Process(target=process_function, args=(child_conn,)) process.start()# 接收数据print(f'父进程接收数据:{parent_conn.recv()}')# 等待进程结束 process.join()print('所有进程结束')22.3.3 进程池
import concurrent.futures import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')return name # 使用进程池with concurrent.futures.ProcessPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(process_function,'Process 1') future2 = executor.submit(process_function,'Process 2') future3 = executor.submit(process_function,'Process 3')# 获取结果print(f'进程 {future1.result()} 完成')print(f'进程 {future2.result()} 完成')print(f'进程 {future3.result()} 完成')print('所有进程结束')22.4 同步与互斥
22.4.1 锁的使用
import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')22.4.2 条件变量的使用
import threading import time # 共享资源 items =[]# 条件变量 condition = threading.Condition()# 生产者函数defproducer():for i inrange(5):with condition: items.append(i)print(f'生产者生产了 {i}') condition.notify() time.sleep(1)# 消费者函数defconsumer():for i inrange(5):with condition:whilenot items: condition.wait() item = items.pop(0)print(f'消费者消费了 {item}') time.sleep(1)# 创建线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer)# 启动线程 producer_thread.start() consumer_thread.start()# 等待线程结束 producer_thread.join() consumer_thread.join()print('所有线程结束')22.5 实战案例:并发下载文件
22.5.1 需求分析
开发一个并发下载文件的程序,支持以下功能:
- 并发下载多个文件。
- 显示下载进度。
- 处理下载失败的情况。
22.5.2 代码实现
import requests import concurrent.futures import os # 下载文件defdownload_file(url, save_path):try: response = requests.get(url, stream=True) response.raise_for_status()withopen(save_path,'wb')asfile:for chunk in response.iter_content(chunk_size=8192):if chunk:file.write(chunk)print(f'文件 {save_path} 下载成功')return save_path except Exception as e:print(f'文件 {save_path} 下载失败:{e}')returnNone# 并发下载文件defdownload_files(urls, save_dir, max_workers=5):ifnot os.path.exists(save_dir): os.makedirs(save_dir)# 创建保存路径 save_paths =[os.path.join(save_dir, os.path.basename(url))for url in urls]# 使用线程池下载文件with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(download_file, url, save_path)for url, save_path inzip(urls, save_paths)]# 获取结果for future in concurrent.futures.as_completed(futures): future.result()# 测试程序if __name__ =='__main__': urls =['https://www.example.com/page1.html','https://www.example.com/page2.html','https://www.example.com/page3.html','https://www.example.com/page4.html','https://www.example.com/page5.html'] save_dir ='downloads' download_files(urls, save_dir)22.5.3 实施过程
- 安装requests库。
- 定义下载文件的函数。
- 定义并发下载文件的函数。
- 测试程序。
22.5.4 最终效果
通过并发下载文件的程序,我们可以实现以下功能:
- 并发下载多个文件。
- 显示下载进度。
- 处理下载失败的情况。
22.6 实战案例:并发数据处理
22.6.1 需求分析
开发一个并发数据处理的程序,支持以下功能:
- 并发处理多个数据文件。
- 计算数据的统计信息。
- 保存处理结果。
22.6.2 代码实现
import pandas as pd import concurrent.futures import os # 处理数据文件defprocess_file(file_path):try:# 读取数据 df = pd.read_csv(file_path)# 计算统计信息 stats ={'文件名': os.path.basename(file_path),'行数': df.shape[0],'列数': df.shape[1],'平均值': df.mean().to_dict(),'最大值': df.max().to_dict(),'最小值': df.min().to_dict()}print(f'文件 {os.path.basename(file_path)} 处理成功')return stats except Exception as e:print(f'文件 {os.path.basename(file_path)} 处理失败:{e}')returnNone# 并发处理数据文件defprocess_files(file_paths, max_workers=5):# 使用进程池处理数据文件with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(process_file, file_path)for file_path in file_paths]# 获取结果 results =[]for future in concurrent.futures.as_completed(futures): result = future.result()if result: results.append(result)return results # 保存处理结果defsave_results(results, save_path): df = pd.DataFrame(results) df.to_csv(save_path, index=False)print(f'处理结果已保存到 {save_path}')# 测试程序if __name__ =='__main__': file_paths =['data1.csv','data2.csv','data3.csv','data4.csv','data5.csv'] save_path ='results.csv' results = process_files(file_paths) save_results(results, save_path)22.6.3 实施过程
- 安装pandas库。
- 定义处理数据文件的函数。
- 定义并发处理数据文件的函数。
- 定义保存处理结果的函数。
- 测试程序。
22.6.4 最终效果
通过并发数据处理的程序,我们可以实现以下功能:
- 并发处理多个数据文件。
- 计算数据的统计信息。
- 保存处理结果。
总结
✅ 本文详细介绍了Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习了threading、multiprocessing等核心库的使用;通过实战案例开发了并发下载文件和并发数据处理的程序。
✅ 建议读者在学习过程中多练习,通过编写代码加深对知识点的理解。