Python开发从入门到精通:并发编程与多进程

Python开发从入门到精通:并发编程与多进程

Python开发从入门到精通:并发编程与多进程

在这里插入图片描述

一、学习目标与重点

💡 学习目标:掌握Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习threading、multiprocessing等核心库的使用;通过实战案例开发并发应用程序。
⚠️ 学习重点:多线程的创建与管理、多进程的创建与管理、线程池与进程池、同步与互斥、并发编程实战。

22.1 并发编程概述

22.1.1 什么是并发编程

并发编程是一种编程方式,允许程序同时执行多个任务,从而提高程序的执行效率。在并发编程中,任务可以是线程或进程。

22.1.2 并发编程的优势

  • 提高执行效率:同时执行多个任务,减少程序的总运行时间。
  • 充分利用资源:充分利用CPU和内存资源。
  • 简化代码结构:通过多线程或多进程,代码结构更加简洁。

22.1.3 并发编程的应用场景

  • CPU密集型任务:如数据处理、数学计算等。
  • I/O密集型任务:如文件操作、网络通信等。

22.2 多线程编程

22.2.1 线程的创建与管理

import threading import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')# 创建线程 thread1 = threading.Thread(target=thread_function, args=('Thread 1',)) thread2 = threading.Thread(target=thread_function, args=('Thread 2',))# 启动线程 thread1.start() thread2.start()# 等待线程结束 thread1.join() thread2.join()print('所有线程结束')

22.2.2 线程的同步与互斥

import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')

22.2.3 线程池

import concurrent.futures import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')return name # 使用线程池with concurrent.futures.ThreadPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(thread_function,'Thread 1') future2 = executor.submit(thread_function,'Thread 2') future3 = executor.submit(thread_function,'Thread 3')# 获取结果print(f'线程 {future1.result()} 完成')print(f'线程 {future2.result()} 完成')print(f'线程 {future3.result()} 完成')print('所有线程结束')

22.3 多进程编程

22.3.1 进程的创建与管理

import multiprocessing import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')# 创建进程 process1 = multiprocessing.Process(target=process_function, args=('Process 1',)) process2 = multiprocessing.Process(target=process_function, args=('Process 2',))# 启动进程 process1.start() process2.start()# 等待进程结束 process1.join() process2.join()print('所有进程结束')

22.3.2 进程的通信

import multiprocessing import time # 定义进程函数defprocess_function(conn):print(f'子进程发送数据') conn.send('Hello from child process') time.sleep(2)print(f'子进程结束') conn.close()# 创建管道 parent_conn, child_conn = multiprocessing.Pipe()# 创建进程 process = multiprocessing.Process(target=process_function, args=(child_conn,)) process.start()# 接收数据print(f'父进程接收数据:{parent_conn.recv()}')# 等待进程结束 process.join()print('所有进程结束')

22.3.3 进程池

import concurrent.futures import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')return name # 使用进程池with concurrent.futures.ProcessPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(process_function,'Process 1') future2 = executor.submit(process_function,'Process 2') future3 = executor.submit(process_function,'Process 3')# 获取结果print(f'进程 {future1.result()} 完成')print(f'进程 {future2.result()} 完成')print(f'进程 {future3.result()} 完成')print('所有进程结束')

22.4 同步与互斥

22.4.1 锁的使用

import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')

22.4.2 条件变量的使用

import threading import time # 共享资源 items =[]# 条件变量 condition = threading.Condition()# 生产者函数defproducer():for i inrange(5):with condition: items.append(i)print(f'生产者生产了 {i}') condition.notify() time.sleep(1)# 消费者函数defconsumer():for i inrange(5):with condition:whilenot items: condition.wait() item = items.pop(0)print(f'消费者消费了 {item}') time.sleep(1)# 创建线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer)# 启动线程 producer_thread.start() consumer_thread.start()# 等待线程结束 producer_thread.join() consumer_thread.join()print('所有线程结束')

22.5 实战案例:并发下载文件

22.5.1 需求分析

开发一个并发下载文件的程序,支持以下功能:

  • 并发下载多个文件。
  • 显示下载进度。
  • 处理下载失败的情况。

22.5.2 代码实现

import requests import concurrent.futures import os # 下载文件defdownload_file(url, save_path):try: response = requests.get(url, stream=True) response.raise_for_status()withopen(save_path,'wb')asfile:for chunk in response.iter_content(chunk_size=8192):if chunk:file.write(chunk)print(f'文件 {save_path} 下载成功')return save_path except Exception as e:print(f'文件 {save_path} 下载失败:{e}')returnNone# 并发下载文件defdownload_files(urls, save_dir, max_workers=5):ifnot os.path.exists(save_dir): os.makedirs(save_dir)# 创建保存路径 save_paths =[os.path.join(save_dir, os.path.basename(url))for url in urls]# 使用线程池下载文件with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(download_file, url, save_path)for url, save_path inzip(urls, save_paths)]# 获取结果for future in concurrent.futures.as_completed(futures): future.result()# 测试程序if __name__ =='__main__': urls =['https://www.example.com/page1.html','https://www.example.com/page2.html','https://www.example.com/page3.html','https://www.example.com/page4.html','https://www.example.com/page5.html'] save_dir ='downloads' download_files(urls, save_dir)

22.5.3 实施过程

  1. 安装requests库。
  2. 定义下载文件的函数。
  3. 定义并发下载文件的函数。
  4. 测试程序。

22.5.4 最终效果

通过并发下载文件的程序,我们可以实现以下功能:

  • 并发下载多个文件。
  • 显示下载进度。
  • 处理下载失败的情况。

22.6 实战案例:并发数据处理

22.6.1 需求分析

开发一个并发数据处理的程序,支持以下功能:

  • 并发处理多个数据文件。
  • 计算数据的统计信息。
  • 保存处理结果。

22.6.2 代码实现

import pandas as pd import concurrent.futures import os # 处理数据文件defprocess_file(file_path):try:# 读取数据 df = pd.read_csv(file_path)# 计算统计信息 stats ={'文件名': os.path.basename(file_path),'行数': df.shape[0],'列数': df.shape[1],'平均值': df.mean().to_dict(),'最大值': df.max().to_dict(),'最小值': df.min().to_dict()}print(f'文件 {os.path.basename(file_path)} 处理成功')return stats except Exception as e:print(f'文件 {os.path.basename(file_path)} 处理失败:{e}')returnNone# 并发处理数据文件defprocess_files(file_paths, max_workers=5):# 使用进程池处理数据文件with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(process_file, file_path)for file_path in file_paths]# 获取结果 results =[]for future in concurrent.futures.as_completed(futures): result = future.result()if result: results.append(result)return results # 保存处理结果defsave_results(results, save_path): df = pd.DataFrame(results) df.to_csv(save_path, index=False)print(f'处理结果已保存到 {save_path}')# 测试程序if __name__ =='__main__': file_paths =['data1.csv','data2.csv','data3.csv','data4.csv','data5.csv'] save_path ='results.csv' results = process_files(file_paths) save_results(results, save_path)

22.6.3 实施过程

  1. 安装pandas库。
  2. 定义处理数据文件的函数。
  3. 定义并发处理数据文件的函数。
  4. 定义保存处理结果的函数。
  5. 测试程序。

22.6.4 最终效果

通过并发数据处理的程序,我们可以实现以下功能:

  • 并发处理多个数据文件。
  • 计算数据的统计信息。
  • 保存处理结果。

总结

✅ 本文详细介绍了Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习了threading、multiprocessing等核心库的使用;通过实战案例开发了并发下载文件和并发数据处理的程序。
✅ 建议读者在学习过程中多练习,通过编写代码加深对知识点的理解。

Read more

【数据结构】排序算法(下篇·终结)·解析数据难点

【数据结构】排序算法(下篇·终结)·解析数据难点

前引:归并排序作为一种高效排序方法,掌握起来还是有点困难的,何况需要先接受递归的熏陶,这正是编程的浪漫之处,我们不断探索出新的可能,如果给你一串数据让其变得有序?是选择简单的冒泡、插入排序,用暴力美学还是空间换时间?排序算法终结篇——启程! 目录 归并排序(递归) 算法思想: 实现步骤: 复杂度分析:  代码实现: 小静脉: 大动脉:   优缺点分析: 归并排序(非递归) 算法思想: 实现步骤: 分解:  合并: 整体代码: 小编寄语:   归并排序(递归) 咱们又得接受递归的熏陶了!!!归并排序(Merge Sort)是一种基于分治法的高效排序算法,核心思想是将数组分为更小的子数组进行排序,最终合成有序序列,这样看来,我又想到了Hoare大佬的分组方法!此次的分组较于双指针快排递归实现分组究竟有何不同? 算法思想: (1)分解:将待排序数组递归地分为两个子数组,直到每个子数组仅仅含有一个元素 (2)合并:将两个有序数组合并为一个有序数组,

By Ne0inhk
《算法题讲解指南:优选算法-模拟》--38.替换所有问号,39.提莫攻击,40.Z 字形变换

《算法题讲解指南:优选算法-模拟》--38.替换所有问号,39.提莫攻击,40.Z 字形变换

🔥小叶-duck:个人主页 ❄️个人专栏:《Data-Structure-Learning》 《C++入门到进阶&自我学习过程记录》《算法题讲解指南》--从优选到贪心 ✨未择之路,不须回头 已择之路,纵是荆棘遍野,亦作花海遨游 目录 38.替换所有问号 题目链接: 题目描述: 题目示例: 解法(模拟): 算法思路: C++算法代码: 算法总结及流程解析: 39.提莫攻击 题目链接: 题目描述: 题目示例: 解法(模拟+分情况讨论): 算法思路: C++算法代码: 算法总结及流程解析: 40.Z 字形变换 题目链接: 题目描述: 题目示例: 解法(模拟+找规律): 算法思路: C+

By Ne0inhk
算法兵法全略

算法兵法全略

目录 始计篇 谋攻篇 军形篇 兵势篇 虚实篇 军争篇 九变篇 行军篇 地形篇 九地篇 火攻篇 用间篇 始计篇 夫算法者,国之重器,事之枢机。算之道,诡谲多变,非贤明不能御,非睿智者难通其妙。故为将者,习算法之学,必先察五事,校之以计,而索其情。 一曰 “算力”,乃硬件根基,若夫强芯在腹,速如奔雷,数据洪流畅行无阻,可恃之以应繁难;二曰 “逻辑”,恰似行军布阵之纲纪,环环相扣,条理分明,使指令不紊,步骤清晰,错一而不可成局;三曰 “数据”,犹粮草兵员,广聚八方资讯,丰盈则算法羽翼渐丰,匮乏则巧妇难为无米之炊;四曰 “架构”,谋篇布局之妙手,

By Ne0inhk
Flutter 三方库 collection 的鸿蒙化适配指南 - 实现具备高级集合操作与相等性深度判定算法的算法底座、支持端侧多维数据结构的高性能治理实战

Flutter 三方库 collection 的鸿蒙化适配指南 - 实现具备高级集合操作与相等性深度判定算法的算法底座、支持端侧多维数据结构的高性能治理实战

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 collection 的鸿蒙化适配指南 - 实现具备高级集合操作与相等性深度判定算法的算法底座、支持端侧多维数据结构的高性能治理实战 前言 在进行 Flutter for OpenHarmony 开发时,面对复杂的业务 JSON 转化、深层嵌套的集合对比或需要对列表执行高频的优先级排序(Priority Queue)时,原生 List 和 Map 的功能往往显得捉襟见肘。collection 是 Dart 官方维护的最权威、最核心的集合工具库。本文将探讨如何在鸿蒙端构建极致、稳健的数据处理架构。 一、原直观解析 / 概念介绍 1.1 基础原理 该库扩展了 Dart 标准库中的集合能力。它不仅提供了如 Equality(深度相等判定)、PriorityQueue(

By Ne0inhk