Python开发从入门到精通:并发编程与多进程

Python开发从入门到精通:并发编程与多进程

Python开发从入门到精通:并发编程与多进程

在这里插入图片描述

一、学习目标与重点

💡 学习目标:掌握Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习threading、multiprocessing等核心库的使用;通过实战案例开发并发应用程序。
⚠️ 学习重点:多线程的创建与管理、多进程的创建与管理、线程池与进程池、同步与互斥、并发编程实战。

22.1 并发编程概述

22.1.1 什么是并发编程

并发编程是一种编程方式,允许程序同时执行多个任务,从而提高程序的执行效率。在并发编程中,任务可以是线程或进程。

22.1.2 并发编程的优势

  • 提高执行效率:同时执行多个任务,减少程序的总运行时间。
  • 充分利用资源:充分利用CPU和内存资源。
  • 简化代码结构:通过多线程或多进程,代码结构更加简洁。

22.1.3 并发编程的应用场景

  • CPU密集型任务:如数据处理、数学计算等。
  • I/O密集型任务:如文件操作、网络通信等。

22.2 多线程编程

22.2.1 线程的创建与管理

import threading import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')# 创建线程 thread1 = threading.Thread(target=thread_function, args=('Thread 1',)) thread2 = threading.Thread(target=thread_function, args=('Thread 2',))# 启动线程 thread1.start() thread2.start()# 等待线程结束 thread1.join() thread2.join()print('所有线程结束')

22.2.2 线程的同步与互斥

import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')

22.2.3 线程池

import concurrent.futures import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')return name # 使用线程池with concurrent.futures.ThreadPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(thread_function,'Thread 1') future2 = executor.submit(thread_function,'Thread 2') future3 = executor.submit(thread_function,'Thread 3')# 获取结果print(f'线程 {future1.result()} 完成')print(f'线程 {future2.result()} 完成')print(f'线程 {future3.result()} 完成')print('所有线程结束')

22.3 多进程编程

22.3.1 进程的创建与管理

import multiprocessing import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')# 创建进程 process1 = multiprocessing.Process(target=process_function, args=('Process 1',)) process2 = multiprocessing.Process(target=process_function, args=('Process 2',))# 启动进程 process1.start() process2.start()# 等待进程结束 process1.join() process2.join()print('所有进程结束')

22.3.2 进程的通信

import multiprocessing import time # 定义进程函数defprocess_function(conn):print(f'子进程发送数据') conn.send('Hello from child process') time.sleep(2)print(f'子进程结束') conn.close()# 创建管道 parent_conn, child_conn = multiprocessing.Pipe()# 创建进程 process = multiprocessing.Process(target=process_function, args=(child_conn,)) process.start()# 接收数据print(f'父进程接收数据:{parent_conn.recv()}')# 等待进程结束 process.join()print('所有进程结束')

22.3.3 进程池

import concurrent.futures import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')return name # 使用进程池with concurrent.futures.ProcessPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(process_function,'Process 1') future2 = executor.submit(process_function,'Process 2') future3 = executor.submit(process_function,'Process 3')# 获取结果print(f'进程 {future1.result()} 完成')print(f'进程 {future2.result()} 完成')print(f'进程 {future3.result()} 完成')print('所有进程结束')

22.4 同步与互斥

22.4.1 锁的使用

import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')

22.4.2 条件变量的使用

import threading import time # 共享资源 items =[]# 条件变量 condition = threading.Condition()# 生产者函数defproducer():for i inrange(5):with condition: items.append(i)print(f'生产者生产了 {i}') condition.notify() time.sleep(1)# 消费者函数defconsumer():for i inrange(5):with condition:whilenot items: condition.wait() item = items.pop(0)print(f'消费者消费了 {item}') time.sleep(1)# 创建线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer)# 启动线程 producer_thread.start() consumer_thread.start()# 等待线程结束 producer_thread.join() consumer_thread.join()print('所有线程结束')

22.5 实战案例:并发下载文件

22.5.1 需求分析

开发一个并发下载文件的程序,支持以下功能:

  • 并发下载多个文件。
  • 显示下载进度。
  • 处理下载失败的情况。

22.5.2 代码实现

import requests import concurrent.futures import os # 下载文件defdownload_file(url, save_path):try: response = requests.get(url, stream=True) response.raise_for_status()withopen(save_path,'wb')asfile:for chunk in response.iter_content(chunk_size=8192):if chunk:file.write(chunk)print(f'文件 {save_path} 下载成功')return save_path except Exception as e:print(f'文件 {save_path} 下载失败:{e}')returnNone# 并发下载文件defdownload_files(urls, save_dir, max_workers=5):ifnot os.path.exists(save_dir): os.makedirs(save_dir)# 创建保存路径 save_paths =[os.path.join(save_dir, os.path.basename(url))for url in urls]# 使用线程池下载文件with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(download_file, url, save_path)for url, save_path inzip(urls, save_paths)]# 获取结果for future in concurrent.futures.as_completed(futures): future.result()# 测试程序if __name__ =='__main__': urls =['https://www.example.com/page1.html','https://www.example.com/page2.html','https://www.example.com/page3.html','https://www.example.com/page4.html','https://www.example.com/page5.html'] save_dir ='downloads' download_files(urls, save_dir)

22.5.3 实施过程

  1. 安装requests库。
  2. 定义下载文件的函数。
  3. 定义并发下载文件的函数。
  4. 测试程序。

22.5.4 最终效果

通过并发下载文件的程序,我们可以实现以下功能:

  • 并发下载多个文件。
  • 显示下载进度。
  • 处理下载失败的情况。

22.6 实战案例:并发数据处理

22.6.1 需求分析

开发一个并发数据处理的程序,支持以下功能:

  • 并发处理多个数据文件。
  • 计算数据的统计信息。
  • 保存处理结果。

22.6.2 代码实现

import pandas as pd import concurrent.futures import os # 处理数据文件defprocess_file(file_path):try:# 读取数据 df = pd.read_csv(file_path)# 计算统计信息 stats ={'文件名': os.path.basename(file_path),'行数': df.shape[0],'列数': df.shape[1],'平均值': df.mean().to_dict(),'最大值': df.max().to_dict(),'最小值': df.min().to_dict()}print(f'文件 {os.path.basename(file_path)} 处理成功')return stats except Exception as e:print(f'文件 {os.path.basename(file_path)} 处理失败:{e}')returnNone# 并发处理数据文件defprocess_files(file_paths, max_workers=5):# 使用进程池处理数据文件with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(process_file, file_path)for file_path in file_paths]# 获取结果 results =[]for future in concurrent.futures.as_completed(futures): result = future.result()if result: results.append(result)return results # 保存处理结果defsave_results(results, save_path): df = pd.DataFrame(results) df.to_csv(save_path, index=False)print(f'处理结果已保存到 {save_path}')# 测试程序if __name__ =='__main__': file_paths =['data1.csv','data2.csv','data3.csv','data4.csv','data5.csv'] save_path ='results.csv' results = process_files(file_paths) save_results(results, save_path)

22.6.3 实施过程

  1. 安装pandas库。
  2. 定义处理数据文件的函数。
  3. 定义并发处理数据文件的函数。
  4. 定义保存处理结果的函数。
  5. 测试程序。

22.6.4 最终效果

通过并发数据处理的程序,我们可以实现以下功能:

  • 并发处理多个数据文件。
  • 计算数据的统计信息。
  • 保存处理结果。

总结

✅ 本文详细介绍了Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习了threading、multiprocessing等核心库的使用;通过实战案例开发了并发下载文件和并发数据处理的程序。
✅ 建议读者在学习过程中多练习,通过编写代码加深对知识点的理解。

Read more

开源还是商用?大模型选型终极指南与实战搭配

一、开源大模型 vs 商用大模型:该怎么选? 1. 概念和许可证上的差异 开源 / 开放权重大模型 模型权重(weights)公开,可下载、本地部署、二次训练。 多数采用 Apache 2.0、MIT 等宽松开源许可(如 Mistral 7B、Mixtral、Gemma、Falcon 等都是 Apache 2.0 或相近许可)。 也有“开放但非真正开源”的,如 Llama 3 / Llama 2:权重可下载,但许可证不是 OSI 认可的开源协议,商业使用有附加条款,需要阅读 Meta 的 Llama License。

By Ne0inhk

VS Code中实时显示代码作者的终极方案:7个被90%开发者忽略的Git插件配置技巧

第一章:VS Code中实时显示代码作者的核心价值 在现代软件开发中,团队协作日益频繁,多人共同维护同一代码库已成为常态。VS Code 通过集成 Git 和丰富的扩展生态,提供了实时显示代码作者的功能,极大提升了代码可追溯性与协作效率。 提升代码责任归属的透明度 当开发者在文件中编辑某一行代码时,VS Code 的“行内作者视图”(如 GitLens 扩展)会实时标注该行最近一次修改的提交者、提交时间及关联的提交哈希。这种可视化信息帮助团队成员快速识别代码责任人,便于在出现疑问时精准沟通。 加速代码审查与知识传递 通过查看每行代码的贡献者,新加入项目的成员可以更快理解不同模块的设计思路和历史变更背景。例如,GitLens 支持点击作者标签查看完整提交记录: { "commit": "a1b2c3d", // 最近一次提交的哈希值 "author": "zhangsan", // 提交者姓名 "date": "

By Ne0inhk
AI的提示词专栏:通过 “Logit Bias” 精细调控词汇生成

AI的提示词专栏:通过 “Logit Bias” 精细调控词汇生成

AI的提示词专栏:通过 “Logit Bias” 精细调控词汇生成 本文围绕 “Logit Bias(对数几率偏移)” 展开全面解析,先阐释其核心概念,说明它通过干预模型词汇对数几率实现精准调控,区别于 Temperature 等全局参数;接着介绍其在敏感信息管控、核心信息强化、输出格式固定等场景的应用,如电商客服合规话术生成、产品卖点突出等;随后给出实操指南,包括配置流程、Bias 值设定及常见问题解决方案,还探讨其与结构化 Prompt、RAG 技术的结合应用;最后展望多模态扩展、动态调控等未来趋势,强调 Logit Bias 对提升文本生成质量的重要性,为相关从业者提供系统参考。 人工智能专栏介绍     人工智能学习合集专栏是 AI 学习者的实用工具。它像一个全面的 AI 知识库,把提示词设计、AI 创作、智能绘图等多个细分领域的知识整合起来。无论你是刚接触 AI 的新手,还是有一定基础想提升的人,

By Ne0inhk
从零到一:本地项目上传Gitee完整指南(新手避坑版)

从零到一:本地项目上传Gitee完整指南(新手避坑版)

从零到一:本地项目上传Gitee完整指南(新手避坑版) 在日常开发或毕业设计中,将本地项目上传到代码托管平台是必备技能。Gitee(码云)作为国内优质的Git托管服务,不仅访问速度快,还支持私有仓库、协作者管理等实用功能,特别适合国内开发者和学生使用。本文将以「ESP32车载声浪模拟控制系统」毕设项目为例,从环境准备到代码推送,一步步教你完成本地项目上传,同时揭秘新手常踩的坑及解决方案。 一、前置知识:核心概念快速理解 在开始操作前,先搞懂3个关键概念,避免后续 confusion: * Git:本地代码版本控制工具,负责跟踪文件修改、管理提交记录,是连接本地项目和Gitee的桥梁; * Gitee:远程代码托管平台,相当于“云端代码仓库”,用于存储、共享你的项目; * 分支(Branch):项目的“并行开发线”,默认主分支为main(新版Git)或master(旧版Git),推送时需保证本地分支与远程分支名称一致。 二、准备工作(3步必做) 2.1

By Ne0inhk