Python开发从入门到精通:并发编程与多进程

Python开发从入门到精通:并发编程与多进程

Python开发从入门到精通:并发编程与多进程

在这里插入图片描述

一、学习目标与重点

💡 学习目标:掌握Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习threading、multiprocessing等核心库的使用;通过实战案例开发并发应用程序。
⚠️ 学习重点:多线程的创建与管理、多进程的创建与管理、线程池与进程池、同步与互斥、并发编程实战。

22.1 并发编程概述

22.1.1 什么是并发编程

并发编程是一种编程方式,允许程序同时执行多个任务,从而提高程序的执行效率。在并发编程中,任务可以是线程或进程。

22.1.2 并发编程的优势

  • 提高执行效率:同时执行多个任务,减少程序的总运行时间。
  • 充分利用资源:充分利用CPU和内存资源。
  • 简化代码结构:通过多线程或多进程,代码结构更加简洁。

22.1.3 并发编程的应用场景

  • CPU密集型任务:如数据处理、数学计算等。
  • I/O密集型任务:如文件操作、网络通信等。

22.2 多线程编程

22.2.1 线程的创建与管理

import threading import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')# 创建线程 thread1 = threading.Thread(target=thread_function, args=('Thread 1',)) thread2 = threading.Thread(target=thread_function, args=('Thread 2',))# 启动线程 thread1.start() thread2.start()# 等待线程结束 thread1.join() thread2.join()print('所有线程结束')

22.2.2 线程的同步与互斥

import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')

22.2.3 线程池

import concurrent.futures import time # 定义线程函数defthread_function(name):print(f'线程 {name} 开始') time.sleep(2)print(f'线程 {name} 结束')return name # 使用线程池with concurrent.futures.ThreadPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(thread_function,'Thread 1') future2 = executor.submit(thread_function,'Thread 2') future3 = executor.submit(thread_function,'Thread 3')# 获取结果print(f'线程 {future1.result()} 完成')print(f'线程 {future2.result()} 完成')print(f'线程 {future3.result()} 完成')print('所有线程结束')

22.3 多进程编程

22.3.1 进程的创建与管理

import multiprocessing import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')# 创建进程 process1 = multiprocessing.Process(target=process_function, args=('Process 1',)) process2 = multiprocessing.Process(target=process_function, args=('Process 2',))# 启动进程 process1.start() process2.start()# 等待进程结束 process1.join() process2.join()print('所有进程结束')

22.3.2 进程的通信

import multiprocessing import time # 定义进程函数defprocess_function(conn):print(f'子进程发送数据') conn.send('Hello from child process') time.sleep(2)print(f'子进程结束') conn.close()# 创建管道 parent_conn, child_conn = multiprocessing.Pipe()# 创建进程 process = multiprocessing.Process(target=process_function, args=(child_conn,)) process.start()# 接收数据print(f'父进程接收数据:{parent_conn.recv()}')# 等待进程结束 process.join()print('所有进程结束')

22.3.3 进程池

import concurrent.futures import time # 定义进程函数defprocess_function(name):print(f'进程 {name} 开始') time.sleep(2)print(f'进程 {name} 结束')return name # 使用进程池with concurrent.futures.ProcessPoolExecutor(max_workers=3)as executor:# 提交任务 future1 = executor.submit(process_function,'Process 1') future2 = executor.submit(process_function,'Process 2') future3 = executor.submit(process_function,'Process 3')# 获取结果print(f'进程 {future1.result()} 完成')print(f'进程 {future2.result()} 完成')print(f'进程 {future3.result()} 完成')print('所有进程结束')

22.4 同步与互斥

22.4.1 锁的使用

import threading import time # 共享资源 counter =0# 互斥锁 lock = threading.Lock()# 定义线程函数defthread_function(name):global counter print(f'线程 {name} 开始')# 获取锁 lock.acquire()try:# 修改共享资源 counter +=1print(f'线程 {name} 已修改共享资源,值为 {counter}')finally:# 释放锁 lock.release() time.sleep(2)print(f'线程 {name} 结束')# 创建线程 threads =[]for i inrange(5): thread = threading.Thread(target=thread_function, args=(f'Thread {i}',)) threads.append(thread) thread.start()# 等待所有线程结束for thread in threads: thread.join()print(f'所有线程结束,共享资源的值为 {counter}')

22.4.2 条件变量的使用

import threading import time # 共享资源 items =[]# 条件变量 condition = threading.Condition()# 生产者函数defproducer():for i inrange(5):with condition: items.append(i)print(f'生产者生产了 {i}') condition.notify() time.sleep(1)# 消费者函数defconsumer():for i inrange(5):with condition:whilenot items: condition.wait() item = items.pop(0)print(f'消费者消费了 {item}') time.sleep(1)# 创建线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer)# 启动线程 producer_thread.start() consumer_thread.start()# 等待线程结束 producer_thread.join() consumer_thread.join()print('所有线程结束')

22.5 实战案例:并发下载文件

22.5.1 需求分析

开发一个并发下载文件的程序,支持以下功能:

  • 并发下载多个文件。
  • 显示下载进度。
  • 处理下载失败的情况。

22.5.2 代码实现

import requests import concurrent.futures import os # 下载文件defdownload_file(url, save_path):try: response = requests.get(url, stream=True) response.raise_for_status()withopen(save_path,'wb')asfile:for chunk in response.iter_content(chunk_size=8192):if chunk:file.write(chunk)print(f'文件 {save_path} 下载成功')return save_path except Exception as e:print(f'文件 {save_path} 下载失败:{e}')returnNone# 并发下载文件defdownload_files(urls, save_dir, max_workers=5):ifnot os.path.exists(save_dir): os.makedirs(save_dir)# 创建保存路径 save_paths =[os.path.join(save_dir, os.path.basename(url))for url in urls]# 使用线程池下载文件with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(download_file, url, save_path)for url, save_path inzip(urls, save_paths)]# 获取结果for future in concurrent.futures.as_completed(futures): future.result()# 测试程序if __name__ =='__main__': urls =['https://www.example.com/page1.html','https://www.example.com/page2.html','https://www.example.com/page3.html','https://www.example.com/page4.html','https://www.example.com/page5.html'] save_dir ='downloads' download_files(urls, save_dir)

22.5.3 实施过程

  1. 安装requests库。
  2. 定义下载文件的函数。
  3. 定义并发下载文件的函数。
  4. 测试程序。

22.5.4 最终效果

通过并发下载文件的程序,我们可以实现以下功能:

  • 并发下载多个文件。
  • 显示下载进度。
  • 处理下载失败的情况。

22.6 实战案例:并发数据处理

22.6.1 需求分析

开发一个并发数据处理的程序,支持以下功能:

  • 并发处理多个数据文件。
  • 计算数据的统计信息。
  • 保存处理结果。

22.6.2 代码实现

import pandas as pd import concurrent.futures import os # 处理数据文件defprocess_file(file_path):try:# 读取数据 df = pd.read_csv(file_path)# 计算统计信息 stats ={'文件名': os.path.basename(file_path),'行数': df.shape[0],'列数': df.shape[1],'平均值': df.mean().to_dict(),'最大值': df.max().to_dict(),'最小值': df.min().to_dict()}print(f'文件 {os.path.basename(file_path)} 处理成功')return stats except Exception as e:print(f'文件 {os.path.basename(file_path)} 处理失败:{e}')returnNone# 并发处理数据文件defprocess_files(file_paths, max_workers=5):# 使用进程池处理数据文件with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)as executor: futures =[executor.submit(process_file, file_path)for file_path in file_paths]# 获取结果 results =[]for future in concurrent.futures.as_completed(futures): result = future.result()if result: results.append(result)return results # 保存处理结果defsave_results(results, save_path): df = pd.DataFrame(results) df.to_csv(save_path, index=False)print(f'处理结果已保存到 {save_path}')# 测试程序if __name__ =='__main__': file_paths =['data1.csv','data2.csv','data3.csv','data4.csv','data5.csv'] save_path ='results.csv' results = process_files(file_paths) save_results(results, save_path)

22.6.3 实施过程

  1. 安装pandas库。
  2. 定义处理数据文件的函数。
  3. 定义并发处理数据文件的函数。
  4. 定义保存处理结果的函数。
  5. 测试程序。

22.6.4 最终效果

通过并发数据处理的程序,我们可以实现以下功能:

  • 并发处理多个数据文件。
  • 计算数据的统计信息。
  • 保存处理结果。

总结

✅ 本文详细介绍了Python并发编程的基本概念和方法,包括多线程、多进程、线程池、进程池等;学习了threading、multiprocessing等核心库的使用;通过实战案例开发了并发下载文件和并发数据处理的程序。
✅ 建议读者在学习过程中多练习,通过编写代码加深对知识点的理解。

Read more

LangChain、Dify、Coze…盘点当下最热门的23个开源智能体框架

LangChain、Dify、Coze…盘点当下最热门的23个开源智能体框架

随着人工智能技术的快速发展和广泛应用,智能体(AI Agent)技术已成为最具前景的研究方向之一。智能体通过将LLM与工具调用、记忆系统、推理规划等能力相结合,实现了从被动的问答系统向主动的任务执行系统的重要转变。 当前智能体框架的发展呈现出百花齐放的态势。从最早期的AutoGPT展示自主任务执行的可能性,到LangChain建立起完整的LLM应用开发生态,再到国内开源的Dify、Coze等,每个框架都在尝试解决智能体开发中的不同痛点。这些框架在架构设计、技术路线、应用场景等方面各具特色,形成了从学术研究到企业应用的完整生态链条。本文旨在对当前主流的开源智能体框架进行深度技术分析,为读者提供有价值的参考依据。 1.LangChain LangChain 由开发者 Harrison Chase 于 2022 年 10 月 发布,是最早专门为大语言模型构建应用开发框架的开源项目。目前由 LangChain 社区维护,核心代码托管在 GitHub,并衍生出 Python 与 JavaScript 两个主要分支。 LangChain 的初衷是让大语言模型具备调用外部知识和工具的能力。随

By Ne0inhk

llama-cpp-python上下文窗口扩展:突破长度限制技巧

llama-cpp-python上下文窗口扩展:突破长度限制技巧 【免费下载链接】llama-cpp-pythonPython bindings for llama.cpp 项目地址: https://gitcode.com/gh_mirrors/ll/llama-cpp-python 在处理长文档、多轮对话或复杂任务时,你是否经常遇到模型上下文窗口不足的问题?本文将介绍三种实用方法,帮助你突破llama-cpp-python的长度限制,轻松处理超长文本。读完本文,你将掌握:基础参数调优、滑动窗口实现和智能文本分块的完整解决方案。 核心参数解析:n_ctx与RoPE缩放 llama-cpp-python的上下文窗口大小主要由n_ctx参数控制,默认值为512 tokens。通过修改这个参数,可以直接调整模型能处理的最大上下文长度。以下是关键参数说明: 参数名类型描述默认值n_ctxint上下文窗口大小(tokens)512rope_scaling_typeintRoPE缩放类型LLAMA_ROPE_SCALING_TYPE_UNSPECIFIEDrope_freq_ba

By Ne0inhk
Copilot、Codeium 软件开发领域的代表性工具背后的技术

Copilot、Codeium 软件开发领域的代表性工具背后的技术

早期, Claude、Copilot、Codeium新兴的AI代码助手,模型的温度、切片的效果、检索方式、提示词的约束、AI 回复的约束、最终数据处理;整个环节,任何一个地方都可能造成最终效果不理想。 旨在通过代码生成、代码补全、代码解释和调试等多种功能,帮助开发者减少重复劳动,提高开发效率。尽管Codeium已经取得了显著的成果,但在处理复杂的代码任务、跨文件的修改以及支持定制化库和框架方面仍面临一定的局限性。 2020 年,OpenAI发布的GPT-3模型使AI生成代码的能力得以广泛应用,标志着AI代码助手的转型。2021年,GitHub 推出基于OpenAI Codex的 Copilot,提供实时代码补全和生成能力,提升开发效率,支持跨文件复杂任务。 其痛点,在大规模代码生成、跨文件任务处理以及定制化框架支持方面的局限性仍然限制了其在复杂项目中的应用。 2023年,Claude 3.5等新一代大型语言模型陆续出世,有效提升了自然语言理解与代码生成的能力。这类模型集成了代码生成、调试和文档自动生成等多项功能,能够帮助开发者快速编写高质量代码、优化程序性能并自动修复错误。随着

By Ne0inhk

innoextract高级技巧:自定义编译选项与静态链接配置

innoextract高级技巧:自定义编译选项与静态链接配置 【免费下载链接】innoextractA tool to unpack installers created by Inno Setup 项目地址: https://gitcode.com/gh_mirrors/in/innoextract innoextract是一款强大的Inno Setup安装包提取工具,通过自定义编译选项和静态链接配置,你可以打造更适合特定环境的高效版本。本文将深入探讨如何优化编译参数、实现静态链接以及解决常见编译问题,帮助你充分发挥innoextract的潜力。 一、编译环境准备 在开始自定义编译前,请确保系统已安装必要的构建工具。克隆项目仓库的命令如下: git clone https://gitcode.com/gh_mirrors/in/innoextract cd innoextract 项目使用CMake作为构建系统,核心配置文件位于CMakeLists.txt,编译选项定义主要集中在cmake/目录下的各类模块中。 二、核心编译选项配置

By Ne0inhk