手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统

手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统
在这里插入图片描述

🔥草莓熊Lotso:个人主页
❄️个人专栏: 《C++知识分享》《Linux 入门到实践:零基础也能懂》
✨生活是默默的坚持,毅力是永久的享受!


🎬 博主简介:

在这里插入图片描述

文章目录


前言:

在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建 / 销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。

一. 核心设计思路

本进程池实现的核心逻辑:

  • 父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 子进程循环读取管道中的任务码,执行对应任务;
  • 父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。
在这里插入图片描述

二. 代码模块拆解

2.1 任务定义与随机任务生成

这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////// 定义任务类型:无参数无返回值的函数对象using task_t = std::function<void()>;// 具体任务1:打印日志(带进程ID标识)voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}// 具体任务2:模拟下载voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}// 具体任务3:模拟访问MySQLvoidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}// 具体任务4:模拟访问RedisvoidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;}// 全局任务列表:存储所有可执行的任务 std::vector<task_t> gtasks;// 加载所有任务到全局列表voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// 随机生成50个任务码(输出型参数out存储结果)// 作用:模拟业务中随机产生的任务请求// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){// 随机选择任务(0~3)int code =rand()% gtasks.size();usleep(23223);// 模拟任务产生的时间间隔 out->push_back(code);}}// 任务码枚举(增强可读性)#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3// 任务码转字符串:方便日志打印 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}

2.2 子进程任务处理逻辑

Work函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务:

/////////////////////////进程池核心代码////////////////////////// 子进程工作函数:循环读取管道中的任务码并执行// rfd:管道读端文件描述符voidWork(int rfd){while(true){int code =0;// 从管道读取任务码(阻塞读) ssize_t n =read(rfd,&code,sizeof(code));// 读取成功且长度正确:执行任务if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();// 执行对应任务}}// 读取到0:表示管道写端关闭(父进程通知退出)elseif(n ==0){break;// 子进程退出循环}// 读取错误:直接退出else{break;}}}

2.3 通道(Channel)类:封装父子进程通信

Channel类封装了 “管道写端 + 子进程 ID” 的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程):

// 通道类:管理单个子进程的通信管道和进程IDclassChannel{public:// 构造函数:初始化管道写端、子进程ID,生成通道名称Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;}// 获取管道写端 pid_t SubId(){return _sub_process_id;}// 获取子进程ID std::string Name(){return _name;}// 获取通道名称(调试用)// 关闭管道写端voidClose(){if(_wfd >=0)close(_wfd);}// 等待子进程退出(回收资源)voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;// 屏蔽未使用变量警告}// 向子进程发送任务码(写管道)voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;// 屏蔽未使用变量警告(实际场景应检查写操作是否成功)}~Channel(){}private:int _wfd;// 管道写端文件描述符 pid_t _sub_process_id;// 对应子进程ID std::string _name;// 通道名称(调试用)};

2.4 进程池(ProcesspPool)类:核心管理逻辑

ProcesspPool类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池:

classProcesspPool{private:// 轮询选择下一个子进程(负载均衡策略)intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();// 取模实现循环return choice;}public:// 构造函数:初始化进程池大小、轮询索引ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 启动进程池(父进程执行):创建指定数量的子进程和管道voidStart(){for(int i =0; i < _number; i++){// 1. 创建匿名管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程逻辑{// 这里后面还有些变化,为了解决下面那个version2close(pipefd[1]);// 子进程关闭写端(只读)Work(pipefd[0]);// 执行工作函数close(pipefd[0]);// 任务完成后关闭读端exit(0);// 子进程退出}else// 父进程逻辑{close(pipefd[0]);// 父进程关闭读端(只写)// 创建通道对象并加入管理列表 _channels.emplace_back(pipefd[1], id);}}}// 推送任务:选择子进程并发送任务码voidPushTask(int taskcode){// 轮询选择一个子进程int who =Next(); _channels[who].SendTask(taskcode);// 打印任务分发日志(调试用) std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 停止进程池:关闭所有管道,回收子进程voidStop(){// version1 -- 可以成功// 1. 批量关闭所有管道写端(通知子进程退出)for(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 等待子进程处理完最后任务并退出// 2. 批量回收子进程资源for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功(原因:关闭管道后立即wait,子进程可能还未处理完读操作,导致阻塞)// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// // version3 -- 可以成功(逆序关闭+回收,避免资源竞争)// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}// 调试打印:输出所有通道信息voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;// 管理所有子进程的通道int _number;// 进程池大小(子进程数量)int _next_choice;// 轮询索引(下一个要分发任务的子进程)};

2.5 主函数:进程池使用示例

主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池:

#ifdef__MAIN__// 用法提示函数staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// 程序入口:./process_pool 5(5为子进程数量)intmain(int argc,char* argv[]){// 检查命令行参数if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 初始化:加载任务、随机生成50个任务码srand(time(nullptr)^getpid());// 设置随机数种子(结合时间+进程ID)LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象(智能指针自动管理内存) std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池(创建子进程和管道) pp->Start();sleep(2);// 等待所有子进程初始化完成// 3. 分发所有随机任务for(auto task : task_codes){ pp->PushTask(task);usleep(500000);// 模拟任务分发间隔(500ms)}// // 注释部分:交互式输入任务码(调试用)// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// }// 4. 停止进程池(回收资源) pp->Stop();return0;}#endif

三. 关键知识点解析

3.1 管道通信原理

  • 匿名管道pipe()创建的文件描述符对pipefd[0](读)、pipefd[1](写)是单向的;
  • 父子进程继承管道文件描述符,通过关闭不需要的端实现 “父写子读”;
  • 当写端关闭后,读端read()会返回 0,子进程通过这个信号判断退出。

3.2 轮询负载均衡

Next()函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。

3.3 进程回收的坑

Stop()函数中 version2 失败的原因:父进程关闭管道后立即waitpid(),子进程可能还在阻塞读管道,此时父进程waitpid()会阻塞,而子进程读取到管道关闭后退出,但若所有子进程都处于这种状态,会导致死锁。version1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。

总结
版本2失败的根本原因是父进程在等待一个子进程时,其他子进程的写端并未关闭(因为都是继承了父进程,关闭了一个,但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的),导致它们无法退出,从而形成串行阻塞。版本1通过先关闭所有写端,让子进程并发退出,避免了这一风险。因此,在实际开发中,应当采用版本1或类似策略来确保进程池能够优雅地停止。
在这里插入图片描述
在这里插入图片描述

我们可以怎么样去修改使version2变成可行的方案?

在这里插入图片描述


在这里插入图片描述

四. 完整代码展示

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////using task_t = std::function<void()>;voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}voidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}voidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;} std::vector<task_t> gtasks;voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){int code =rand()% gtasks.size();usleep(23223); out->push_back(code);}}#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}/////////////////////////进程池代码////////////////////////voidWork(int rfd){while(true){int code =0; ssize_t n =read(rfd,&code,sizeof(code));if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();}}elseif(n ==0){break;// 子进程只要读到返回值为0, 表明父进程让我退出}else{break;}}}classChannel{public:Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;} pid_t SubId(){return _sub_process_id;} std::string Name(){return _name;}voidClose(){if(_wfd >=0)close(_wfd);}voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;}voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;}~Channel(){}private:int _wfd; pid_t _sub_process_id; std::string _name;};classProcesspPool{private:intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();return choice;}public:ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 父进程voidStart(){for(int i =0; i < _number; i++){// 1. 创建管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程{// 关闭父进程历史的wfd!for(auto& channel : _channels) channel.Close();close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(0);}else// 父进程{close(pipefd[0]);// _channels c(pipefd[1], fd);// _channels.push_back(c); _channels.emplace_back(pipefd[1], id);// 内部会直接构造}}}// 1. 什么任务? 任务码决定// 2. 任务给谁? 属于进程池内部操作,负载均衡(我这里是用的轮询的机制)voidPushTask(int taskcode){// 选择一个子进程int who =Next(); _channels[who].SendTask(taskcode); std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 有版本存在一些问题, 后续会说为什么voidStop(){// version1 -- 可以成功// 1. 关闭wfdfor(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 2. 回收子进程for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功???// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// version3 -- 可以成功// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;int _number;int _next_choice;};// 父进程#ifdef__MAIN__staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// ./process_pool 5intmain(int argc,char* argv[]){if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 加载任务并随机生成任务srand(time(nullptr)^getpid());LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象 std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池 pp->Start();sleep(2);for(auto task : task_codes){ pp->PushTask(task);usleep(500000);}// // 自己输入发送任务// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// } pp->Stop();return0;}#endif

五. 编译与运行(附 Makefile)

process_pool:process_pool.cc g++ -o$@ $^ -std=c++14 .PHONY:clean clean: rm-f process_pool 
  • 编译:直接 make;
  • 运行./process_pool 5(5 为子进程数量,可自定义);
  • 输出:可以看到任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

六. 扩展与优化方向

  • 错误处理:当前代码未处理write()/read()的错误返回值,实际场景应增加重试、日志记录;
  • 动态扩容:支持运行时增加 / 减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加SIGCHLD信号处理,异步回收子进程,避免僵尸进程。

结尾:

🍓 我是草莓熊 Lotso!若这篇技术干货帮你打通了学习中的卡点: 👀 【关注】跟我一起深耕技术领域,从基础到进阶,见证每一次成长 ❤️ 【点赞】让优质内容被更多人看见,让知识传递更有力量 ⭐ 【收藏】把核心知识点、实战技巧存好,需要时直接查、随时用 💬 【评论】分享你的经验或疑问(比如曾踩过的技术坑?),一起交流避坑 🗳️ 【投票】用你的选择助力社区内容方向,告诉大家哪个技术点最该重点拆解 技术之路难免有困惑,但同行的人会让前进更有方向~愿我们都能在自己专注的领域里,一步步靠近心中的技术目标! 

结语:本文通过拆解一个极简的进程池实现,带你理解了 Linux 进程间通信、进程管理的核心知识点。这个进程池虽然简单,但涵盖了进程池的核心设计思想,是学习 Linux 并发编程的绝佳案例。

✨把这些内容吃透超牛的!放松下吧✨ʕ˘ᴥ˘ʔづきらど

Read more

Java调用YOLOv26全解析:从ONNX模型部署到工业质检实战

Java调用YOLOv26全解析:从ONNX模型部署到工业质检实战

我做工业自动化领域的上位机开发快5年了,前两年用OpenCV+传统算法做零件缺陷检测,误检率、漏检率一直卡着瓶颈——去年底换了YOLOv26,配合Java的ONNX Runtime部署,误检率从12%降到了2.1%,漏检率从8%降到了0.8%,完全满足客户的工业级要求。 今天就把这套完整的Java调用YOLOv26的方案分享出来,从ONNX模型的导出、预处理、推理、后处理,到工业质检场景的落地优化,全程都是我踩过坑、验证过的成熟方案,新手跟着步骤操作就能跑通,资深开发者也可以直接复用其中的性能优化、多线程推理、缺陷分类逻辑。 先说明:本文以YOLOv26s(小模型,工业场景部署首选) 为核心,推理引擎用ONNX Runtime Java 1.18.0(跨平台、性能好、官方支持完善),工业质检场景以汽车零件表面划痕、毛刺、缺角检测为例,所有代码都是基于真实项目简化的,没有冗余设计。 一、先搞懂:为什么选YOLOv26+ONNX

By Ne0inhk
【Java 开发日记】我们来说一下无锁队列 Disruptor 的原理

【Java 开发日记】我们来说一下无锁队列 Disruptor 的原理

目录 一、为什么需要 Disruptor?—— 背景与问题 二、核心设计思想 三、核心组件与原理 1. 环形缓冲区(Ring Buffer) 2. 序列(Sequence) 3. 序列屏障(Sequence Barrier) 4. 等待策略(Wait Strategy) 5. 事件处理器(EventProcessor) 6. 生产者(Producer) 四、工作流程示例(单生产者 -> 单消费者) 五、多消费者与依赖关系 六、总结:Disruptor 高性能的秘诀 一、为什么需要 Disruptor?—— 背景与问题 在高并发编程中,传统的队列(如 java.

By Ne0inhk

骑士一百天下载安装 MC JAVA

一、先搞清楚:到底下的是啥? 名称说明骑士一百天B 站 UP 主“M 仔”等发布的剧情向生存整合包(含任务书、假面骑士系模组、100 天倒计时)。核心 Mod假面骑士(KamenRider)、CraftTweaker、GameStage、CustomNPC、倒计时插件等。运行环境Java 版 1.16.5( Forge 36.2+ 实测可启动)。 二、获取地址(官方源) 直达链接:【整合包】MC假面骑士100天整合包(完结) 三、两种安装姿势:懒人一键包 vs 手动拼装 ✅ 推荐:一键整合(小白专用) 1. 下整合包 得到 骑士一百天v0.95.

By Ne0inhk
【JAVA探索之路】简单聊聊Kafka

【JAVA探索之路】简单聊聊Kafka

目录 一、Kafka核心概念与架构 核心概念解析 集群架构一览 二、Kafka核心特性与工作原理 顺序I/O与零拷贝 生产者可靠性保证 精确一次语义 三、Kafka关键API与生态系统 四、Kafka运维管理 五、Kafka典型应用场景 一、Kafka核心概念与架构 要掌握 Kafka,必须从理解其精心设计的基本模型开始。 核心概念解析 * 消息与批次:Kafka 的基本数据单元称为“记录”,包含键、值和时间戳。为提高效率,多条记录会组合成“批次”进行传输。 * 主题与分区:消息按“主题”进行分类,类似于数据库的表。每个主题可被分割为多个“分区”,这是 Kafka 实现并行处理和横向扩展的基石。消息在分区内按追加顺序存储,并分配一个单调递增的偏移量,从而保证了消息的顺序性。 * 生产与消费:生产者将消息发布到指定主题的特定分区;消费者则以“拉”

By Ne0inhk