手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统

手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统
在这里插入图片描述

🔥草莓熊Lotso:个人主页
❄️个人专栏: 《C++知识分享》《Linux 入门到实践:零基础也能懂》
✨生活是默默的坚持,毅力是永久的享受!


🎬 博主简介:

在这里插入图片描述

文章目录


前言:

在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建 / 销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。

一. 核心设计思路

本进程池实现的核心逻辑:

  • 父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 子进程循环读取管道中的任务码,执行对应任务;
  • 父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。
在这里插入图片描述

二. 代码模块拆解

2.1 任务定义与随机任务生成

这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////// 定义任务类型:无参数无返回值的函数对象using task_t = std::function<void()>;// 具体任务1:打印日志(带进程ID标识)voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}// 具体任务2:模拟下载voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}// 具体任务3:模拟访问MySQLvoidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}// 具体任务4:模拟访问RedisvoidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;}// 全局任务列表:存储所有可执行的任务 std::vector<task_t> gtasks;// 加载所有任务到全局列表voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// 随机生成50个任务码(输出型参数out存储结果)// 作用:模拟业务中随机产生的任务请求// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){// 随机选择任务(0~3)int code =rand()% gtasks.size();usleep(23223);// 模拟任务产生的时间间隔 out->push_back(code);}}// 任务码枚举(增强可读性)#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3// 任务码转字符串:方便日志打印 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}

2.2 子进程任务处理逻辑

Work函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务:

/////////////////////////进程池核心代码////////////////////////// 子进程工作函数:循环读取管道中的任务码并执行// rfd:管道读端文件描述符voidWork(int rfd){while(true){int code =0;// 从管道读取任务码(阻塞读) ssize_t n =read(rfd,&code,sizeof(code));// 读取成功且长度正确:执行任务if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();// 执行对应任务}}// 读取到0:表示管道写端关闭(父进程通知退出)elseif(n ==0){break;// 子进程退出循环}// 读取错误:直接退出else{break;}}}

2.3 通道(Channel)类:封装父子进程通信

Channel类封装了 “管道写端 + 子进程 ID” 的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程):

// 通道类:管理单个子进程的通信管道和进程IDclassChannel{public:// 构造函数:初始化管道写端、子进程ID,生成通道名称Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;}// 获取管道写端 pid_t SubId(){return _sub_process_id;}// 获取子进程ID std::string Name(){return _name;}// 获取通道名称(调试用)// 关闭管道写端voidClose(){if(_wfd >=0)close(_wfd);}// 等待子进程退出(回收资源)voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;// 屏蔽未使用变量警告}// 向子进程发送任务码(写管道)voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;// 屏蔽未使用变量警告(实际场景应检查写操作是否成功)}~Channel(){}private:int _wfd;// 管道写端文件描述符 pid_t _sub_process_id;// 对应子进程ID std::string _name;// 通道名称(调试用)};

2.4 进程池(ProcesspPool)类:核心管理逻辑

ProcesspPool类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池:

classProcesspPool{private:// 轮询选择下一个子进程(负载均衡策略)intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();// 取模实现循环return choice;}public:// 构造函数:初始化进程池大小、轮询索引ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 启动进程池(父进程执行):创建指定数量的子进程和管道voidStart(){for(int i =0; i < _number; i++){// 1. 创建匿名管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程逻辑{// 这里后面还有些变化,为了解决下面那个version2close(pipefd[1]);// 子进程关闭写端(只读)Work(pipefd[0]);// 执行工作函数close(pipefd[0]);// 任务完成后关闭读端exit(0);// 子进程退出}else// 父进程逻辑{close(pipefd[0]);// 父进程关闭读端(只写)// 创建通道对象并加入管理列表 _channels.emplace_back(pipefd[1], id);}}}// 推送任务:选择子进程并发送任务码voidPushTask(int taskcode){// 轮询选择一个子进程int who =Next(); _channels[who].SendTask(taskcode);// 打印任务分发日志(调试用) std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 停止进程池:关闭所有管道,回收子进程voidStop(){// version1 -- 可以成功// 1. 批量关闭所有管道写端(通知子进程退出)for(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 等待子进程处理完最后任务并退出// 2. 批量回收子进程资源for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功(原因:关闭管道后立即wait,子进程可能还未处理完读操作,导致阻塞)// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// // version3 -- 可以成功(逆序关闭+回收,避免资源竞争)// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}// 调试打印:输出所有通道信息voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;// 管理所有子进程的通道int _number;// 进程池大小(子进程数量)int _next_choice;// 轮询索引(下一个要分发任务的子进程)};

2.5 主函数:进程池使用示例

主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池:

#ifdef__MAIN__// 用法提示函数staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// 程序入口:./process_pool 5(5为子进程数量)intmain(int argc,char* argv[]){// 检查命令行参数if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 初始化:加载任务、随机生成50个任务码srand(time(nullptr)^getpid());// 设置随机数种子(结合时间+进程ID)LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象(智能指针自动管理内存) std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池(创建子进程和管道) pp->Start();sleep(2);// 等待所有子进程初始化完成// 3. 分发所有随机任务for(auto task : task_codes){ pp->PushTask(task);usleep(500000);// 模拟任务分发间隔(500ms)}// // 注释部分:交互式输入任务码(调试用)// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// }// 4. 停止进程池(回收资源) pp->Stop();return0;}#endif

三. 关键知识点解析

3.1 管道通信原理

  • 匿名管道pipe()创建的文件描述符对pipefd[0](读)、pipefd[1](写)是单向的;
  • 父子进程继承管道文件描述符,通过关闭不需要的端实现 “父写子读”;
  • 当写端关闭后,读端read()会返回 0,子进程通过这个信号判断退出。

3.2 轮询负载均衡

Next()函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。

3.3 进程回收的坑

Stop()函数中 version2 失败的原因:父进程关闭管道后立即waitpid(),子进程可能还在阻塞读管道,此时父进程waitpid()会阻塞,而子进程读取到管道关闭后退出,但若所有子进程都处于这种状态,会导致死锁。version1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。

总结
版本2失败的根本原因是父进程在等待一个子进程时,其他子进程的写端并未关闭(因为都是继承了父进程,关闭了一个,但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的),导致它们无法退出,从而形成串行阻塞。版本1通过先关闭所有写端,让子进程并发退出,避免了这一风险。因此,在实际开发中,应当采用版本1或类似策略来确保进程池能够优雅地停止。
在这里插入图片描述
在这里插入图片描述

我们可以怎么样去修改使version2变成可行的方案?

在这里插入图片描述


在这里插入图片描述

四. 完整代码展示

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////using task_t = std::function<void()>;voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}voidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}voidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;} std::vector<task_t> gtasks;voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){int code =rand()% gtasks.size();usleep(23223); out->push_back(code);}}#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}/////////////////////////进程池代码////////////////////////voidWork(int rfd){while(true){int code =0; ssize_t n =read(rfd,&code,sizeof(code));if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();}}elseif(n ==0){break;// 子进程只要读到返回值为0, 表明父进程让我退出}else{break;}}}classChannel{public:Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;} pid_t SubId(){return _sub_process_id;} std::string Name(){return _name;}voidClose(){if(_wfd >=0)close(_wfd);}voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;}voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;}~Channel(){}private:int _wfd; pid_t _sub_process_id; std::string _name;};classProcesspPool{private:intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();return choice;}public:ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 父进程voidStart(){for(int i =0; i < _number; i++){// 1. 创建管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程{// 关闭父进程历史的wfd!for(auto& channel : _channels) channel.Close();close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(0);}else// 父进程{close(pipefd[0]);// _channels c(pipefd[1], fd);// _channels.push_back(c); _channels.emplace_back(pipefd[1], id);// 内部会直接构造}}}// 1. 什么任务? 任务码决定// 2. 任务给谁? 属于进程池内部操作,负载均衡(我这里是用的轮询的机制)voidPushTask(int taskcode){// 选择一个子进程int who =Next(); _channels[who].SendTask(taskcode); std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 有版本存在一些问题, 后续会说为什么voidStop(){// version1 -- 可以成功// 1. 关闭wfdfor(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 2. 回收子进程for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功???// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// version3 -- 可以成功// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;int _number;int _next_choice;};// 父进程#ifdef__MAIN__staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// ./process_pool 5intmain(int argc,char* argv[]){if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 加载任务并随机生成任务srand(time(nullptr)^getpid());LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象 std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池 pp->Start();sleep(2);for(auto task : task_codes){ pp->PushTask(task);usleep(500000);}// // 自己输入发送任务// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// } pp->Stop();return0;}#endif

五. 编译与运行(附 Makefile)

process_pool:process_pool.cc g++ -o$@ $^ -std=c++14 .PHONY:clean clean: rm-f process_pool 
  • 编译:直接 make;
  • 运行./process_pool 5(5 为子进程数量,可自定义);
  • 输出:可以看到任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

六. 扩展与优化方向

  • 错误处理:当前代码未处理write()/read()的错误返回值,实际场景应增加重试、日志记录;
  • 动态扩容:支持运行时增加 / 减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加SIGCHLD信号处理,异步回收子进程,避免僵尸进程。

结尾:

🍓 我是草莓熊 Lotso!若这篇技术干货帮你打通了学习中的卡点: 👀 【关注】跟我一起深耕技术领域,从基础到进阶,见证每一次成长 ❤️ 【点赞】让优质内容被更多人看见,让知识传递更有力量 ⭐ 【收藏】把核心知识点、实战技巧存好,需要时直接查、随时用 💬 【评论】分享你的经验或疑问(比如曾踩过的技术坑?),一起交流避坑 🗳️ 【投票】用你的选择助力社区内容方向,告诉大家哪个技术点最该重点拆解 技术之路难免有困惑,但同行的人会让前进更有方向~愿我们都能在自己专注的领域里,一步步靠近心中的技术目标! 

结语:本文通过拆解一个极简的进程池实现,带你理解了 Linux 进程间通信、进程管理的核心知识点。这个进程池虽然简单,但涵盖了进程池的核心设计思想,是学习 Linux 并发编程的绝佳案例。

✨把这些内容吃透超牛的!放松下吧✨ʕ˘ᴥ˘ʔづきらど

Read more

【DeepSeek微调实践】DeepSeek-R1大模型基于MS-Swift框架部署/推理/微调实践大全

【DeepSeek微调实践】DeepSeek-R1大模型基于MS-Swift框架部署/推理/微调实践大全

系列篇章💥 No.文章01【DeepSeek应用实践】DeepSeek接入Word、WPS方法详解:无需代码,轻松实现智能办公助手功能02【DeepSeek应用实践】通义灵码 + DeepSeek:AI 编程助手的实战指南03【DeepSeek应用实践】Cline集成DeepSeek:开源AI编程助手,终端与Web开发的超强助力04【DeepSeek开发入门】DeepSeek API 开发初体验05【DeepSeek开发入门】DeepSeek API高级开发指南(推理与多轮对话机器人实践)06【DeepSeek开发入门】Function Calling 函数功能应用实战指南07【DeepSeek部署实战】DeepSeek-R1-Distill-Qwen-7B:本地部署与API服务快速上手08【DeepSeek部署实战】DeepSeek-R1-Distill-Qwen-7B:Web聊天机器人部署指南09【DeepSeek部署实战】DeepSeek-R1-Distill-Qwen-7B:基于vLLM 搭建高性能推理服务器10【DeepSeek部署实战】基于Ollama快速部署Dee

By Ne0inhk

DeepSeek各版本说明与优缺点分析_deepseek各版本区别

DeepSeek各版本说明与优缺点分析 DeepSeek是最近人工智能领域备受瞩目的一个语言模型系列,其在不同版本的发布过程中,逐步加强了对多种任务的处理能力。本文将详细介绍DeepSeek的各版本,从版本的发布时间、特点、优势以及不足之处,为广大AI技术爱好者和开发者提供一份参考指南。 1. DeepSeek-V1:起步与编码强劲 DeepSeek-V1是DeepSeek的起步版本,这里不过多赘述,主要分析它的优缺点。 发布时间: 2024年1月 特点: DeepSeek-V1是DeepSeek系列的首个版本,预训练于2TB的标记数据,主打自然语言处理和编码任务。它支持多种编程语言,具有强大的编码能力,适合程序开发人员和技术研究人员使用。 优势: * 强大编码能力:支持多种编程语言,能够理解和生成代码,适合开发者进行自动化代码生成与调试。 * 高上下文窗口:支持高达128K标记的上下文窗口,能够处理较为复杂的文本理解和生成任务。 缺点: * 多模态能力有限:该版本主要集中在文本处理上,缺少对图像、语音等多模态任务的支持。 * 推理能力较弱:尽管在自然语言

By Ne0inhk

用DeepSeek和Cursor从零打造智能代码审查工具:我的AI编程实践

💂 个人网站:【 摸鱼游戏】【神级代码资源网站】【星海网址导航】摸鱼、技术交流群👉 点此查看详情 引言:AI编程革命下的机遇与挑战 GitHub统计显示,使用AI编程工具的开发者平均效率提升55%,但仅有23%的开发者能充分发挥这些工具的潜力。作为一名全栈工程师,我曾对AI编程持怀疑态度,直到一次紧急项目让我彻底改变了看法。客户要求在72小时内交付一个能自动检测代码漏洞、优化性能的智能审查系统,传统开发方式根本不可能完成。正是这次挑战,让我探索出DeepSeek和Cursor这对"黄金组合"的惊人潜力。 一、工具选型:深入比较主流AI编程工具 1.1 为什么最终选择DeepSeek+Cursor? 经过两周的对比测试,我们发现不同工具在代码审查场景的表现差异显著: 工具代码理解深度响应速度定制灵活性多语言支持GitHub Copilot★★★☆★★★★★★☆★★★★Amazon CodeWhisperer★★☆★★★☆★★★★★★☆DeepSeek★★★★☆★★★★★★★☆★★★★☆Cursor★★★☆★★★★☆★★★★★★★★ 关键发现: * Dee

By Ne0inhk
【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱

【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱

【DeepSeek应用】Deepseek R1 本地部署(Ollama+Docker+OpenWebUI) 【DeepSeek应用】DeepSeek 搭建个人知识库(Ollama+CherryStudio) 【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱 【DeepSeek应用】Zotero+Deepseek 阅读与分析文献 【DeepSeek应用】100个 DeepSeek 官方推荐的工具箱 * 1. DeepSeek 工具箱:应用程序 * 2. DeepSeek 工具箱:AI Agent 框架 * 3. DeepSeek 工具箱:RAG 框架 * 4. DeepSeek 工具箱:即时通讯软件 * 5. DeepSeek 工具箱:浏览器插件 * 6. DeepSeek 工具箱:

By Ne0inhk