手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统

手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统
在这里插入图片描述

🔥草莓熊Lotso:个人主页
❄️个人专栏: 《C++知识分享》《Linux 入门到实践:零基础也能懂》
✨生活是默默的坚持,毅力是永久的享受!


🎬 博主简介:

在这里插入图片描述

文章目录


前言:

在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建 / 销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。

一. 核心设计思路

本进程池实现的核心逻辑:

  • 父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 子进程循环读取管道中的任务码,执行对应任务;
  • 父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。
在这里插入图片描述

二. 代码模块拆解

2.1 任务定义与随机任务生成

这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////// 定义任务类型:无参数无返回值的函数对象using task_t = std::function<void()>;// 具体任务1:打印日志(带进程ID标识)voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}// 具体任务2:模拟下载voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}// 具体任务3:模拟访问MySQLvoidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}// 具体任务4:模拟访问RedisvoidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;}// 全局任务列表:存储所有可执行的任务 std::vector<task_t> gtasks;// 加载所有任务到全局列表voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// 随机生成50个任务码(输出型参数out存储结果)// 作用:模拟业务中随机产生的任务请求// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){// 随机选择任务(0~3)int code =rand()% gtasks.size();usleep(23223);// 模拟任务产生的时间间隔 out->push_back(code);}}// 任务码枚举(增强可读性)#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3// 任务码转字符串:方便日志打印 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}

2.2 子进程任务处理逻辑

Work函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务:

/////////////////////////进程池核心代码////////////////////////// 子进程工作函数:循环读取管道中的任务码并执行// rfd:管道读端文件描述符voidWork(int rfd){while(true){int code =0;// 从管道读取任务码(阻塞读) ssize_t n =read(rfd,&code,sizeof(code));// 读取成功且长度正确:执行任务if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();// 执行对应任务}}// 读取到0:表示管道写端关闭(父进程通知退出)elseif(n ==0){break;// 子进程退出循环}// 读取错误:直接退出else{break;}}}

2.3 通道(Channel)类:封装父子进程通信

Channel类封装了 “管道写端 + 子进程 ID” 的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程):

// 通道类:管理单个子进程的通信管道和进程IDclassChannel{public:// 构造函数:初始化管道写端、子进程ID,生成通道名称Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;}// 获取管道写端 pid_t SubId(){return _sub_process_id;}// 获取子进程ID std::string Name(){return _name;}// 获取通道名称(调试用)// 关闭管道写端voidClose(){if(_wfd >=0)close(_wfd);}// 等待子进程退出(回收资源)voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;// 屏蔽未使用变量警告}// 向子进程发送任务码(写管道)voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;// 屏蔽未使用变量警告(实际场景应检查写操作是否成功)}~Channel(){}private:int _wfd;// 管道写端文件描述符 pid_t _sub_process_id;// 对应子进程ID std::string _name;// 通道名称(调试用)};

2.4 进程池(ProcesspPool)类:核心管理逻辑

ProcesspPool类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池:

classProcesspPool{private:// 轮询选择下一个子进程(负载均衡策略)intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();// 取模实现循环return choice;}public:// 构造函数:初始化进程池大小、轮询索引ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 启动进程池(父进程执行):创建指定数量的子进程和管道voidStart(){for(int i =0; i < _number; i++){// 1. 创建匿名管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程逻辑{// 这里后面还有些变化,为了解决下面那个version2close(pipefd[1]);// 子进程关闭写端(只读)Work(pipefd[0]);// 执行工作函数close(pipefd[0]);// 任务完成后关闭读端exit(0);// 子进程退出}else// 父进程逻辑{close(pipefd[0]);// 父进程关闭读端(只写)// 创建通道对象并加入管理列表 _channels.emplace_back(pipefd[1], id);}}}// 推送任务:选择子进程并发送任务码voidPushTask(int taskcode){// 轮询选择一个子进程int who =Next(); _channels[who].SendTask(taskcode);// 打印任务分发日志(调试用) std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 停止进程池:关闭所有管道,回收子进程voidStop(){// version1 -- 可以成功// 1. 批量关闭所有管道写端(通知子进程退出)for(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 等待子进程处理完最后任务并退出// 2. 批量回收子进程资源for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功(原因:关闭管道后立即wait,子进程可能还未处理完读操作,导致阻塞)// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// // version3 -- 可以成功(逆序关闭+回收,避免资源竞争)// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}// 调试打印:输出所有通道信息voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;// 管理所有子进程的通道int _number;// 进程池大小(子进程数量)int _next_choice;// 轮询索引(下一个要分发任务的子进程)};

2.5 主函数:进程池使用示例

主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池:

#ifdef__MAIN__// 用法提示函数staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// 程序入口:./process_pool 5(5为子进程数量)intmain(int argc,char* argv[]){// 检查命令行参数if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 初始化:加载任务、随机生成50个任务码srand(time(nullptr)^getpid());// 设置随机数种子(结合时间+进程ID)LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象(智能指针自动管理内存) std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池(创建子进程和管道) pp->Start();sleep(2);// 等待所有子进程初始化完成// 3. 分发所有随机任务for(auto task : task_codes){ pp->PushTask(task);usleep(500000);// 模拟任务分发间隔(500ms)}// // 注释部分:交互式输入任务码(调试用)// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// }// 4. 停止进程池(回收资源) pp->Stop();return0;}#endif

三. 关键知识点解析

3.1 管道通信原理

  • 匿名管道pipe()创建的文件描述符对pipefd[0](读)、pipefd[1](写)是单向的;
  • 父子进程继承管道文件描述符,通过关闭不需要的端实现 “父写子读”;
  • 当写端关闭后,读端read()会返回 0,子进程通过这个信号判断退出。

3.2 轮询负载均衡

Next()函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。

3.3 进程回收的坑

Stop()函数中 version2 失败的原因:父进程关闭管道后立即waitpid(),子进程可能还在阻塞读管道,此时父进程waitpid()会阻塞,而子进程读取到管道关闭后退出,但若所有子进程都处于这种状态,会导致死锁。version1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。

总结
版本2失败的根本原因是父进程在等待一个子进程时,其他子进程的写端并未关闭(因为都是继承了父进程,关闭了一个,但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的),导致它们无法退出,从而形成串行阻塞。版本1通过先关闭所有写端,让子进程并发退出,避免了这一风险。因此,在实际开发中,应当采用版本1或类似策略来确保进程池能够优雅地停止。
在这里插入图片描述
在这里插入图片描述

我们可以怎么样去修改使version2变成可行的方案?

在这里插入图片描述


在这里插入图片描述

四. 完整代码展示

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////using task_t = std::function<void()>;voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}voidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}voidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;} std::vector<task_t> gtasks;voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){int code =rand()% gtasks.size();usleep(23223); out->push_back(code);}}#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}/////////////////////////进程池代码////////////////////////voidWork(int rfd){while(true){int code =0; ssize_t n =read(rfd,&code,sizeof(code));if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();}}elseif(n ==0){break;// 子进程只要读到返回值为0, 表明父进程让我退出}else{break;}}}classChannel{public:Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;} pid_t SubId(){return _sub_process_id;} std::string Name(){return _name;}voidClose(){if(_wfd >=0)close(_wfd);}voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;}voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;}~Channel(){}private:int _wfd; pid_t _sub_process_id; std::string _name;};classProcesspPool{private:intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();return choice;}public:ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 父进程voidStart(){for(int i =0; i < _number; i++){// 1. 创建管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程{// 关闭父进程历史的wfd!for(auto& channel : _channels) channel.Close();close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(0);}else// 父进程{close(pipefd[0]);// _channels c(pipefd[1], fd);// _channels.push_back(c); _channels.emplace_back(pipefd[1], id);// 内部会直接构造}}}// 1. 什么任务? 任务码决定// 2. 任务给谁? 属于进程池内部操作,负载均衡(我这里是用的轮询的机制)voidPushTask(int taskcode){// 选择一个子进程int who =Next(); _channels[who].SendTask(taskcode); std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 有版本存在一些问题, 后续会说为什么voidStop(){// version1 -- 可以成功// 1. 关闭wfdfor(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 2. 回收子进程for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功???// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// version3 -- 可以成功// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;int _number;int _next_choice;};// 父进程#ifdef__MAIN__staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// ./process_pool 5intmain(int argc,char* argv[]){if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 加载任务并随机生成任务srand(time(nullptr)^getpid());LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象 std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池 pp->Start();sleep(2);for(auto task : task_codes){ pp->PushTask(task);usleep(500000);}// // 自己输入发送任务// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// } pp->Stop();return0;}#endif

五. 编译与运行(附 Makefile)

process_pool:process_pool.cc g++ -o$@ $^ -std=c++14 .PHONY:clean clean: rm-f process_pool 
  • 编译:直接 make;
  • 运行./process_pool 5(5 为子进程数量,可自定义);
  • 输出:可以看到任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

六. 扩展与优化方向

  • 错误处理:当前代码未处理write()/read()的错误返回值,实际场景应增加重试、日志记录;
  • 动态扩容:支持运行时增加 / 减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加SIGCHLD信号处理,异步回收子进程,避免僵尸进程。

结尾:

🍓 我是草莓熊 Lotso!若这篇技术干货帮你打通了学习中的卡点: 👀 【关注】跟我一起深耕技术领域,从基础到进阶,见证每一次成长 ❤️ 【点赞】让优质内容被更多人看见,让知识传递更有力量 ⭐ 【收藏】把核心知识点、实战技巧存好,需要时直接查、随时用 💬 【评论】分享你的经验或疑问(比如曾踩过的技术坑?),一起交流避坑 🗳️ 【投票】用你的选择助力社区内容方向,告诉大家哪个技术点最该重点拆解 技术之路难免有困惑,但同行的人会让前进更有方向~愿我们都能在自己专注的领域里,一步步靠近心中的技术目标! 

结语:本文通过拆解一个极简的进程池实现,带你理解了 Linux 进程间通信、进程管理的核心知识点。这个进程池虽然简单,但涵盖了进程池的核心设计思想,是学习 Linux 并发编程的绝佳案例。

✨把这些内容吃透超牛的!放松下吧✨ʕ˘ᴥ˘ʔづきらど

Read more

曝Windows 12将于今年发布?以AI为核心、NPU成「硬件门槛」,网友吐槽:“不想要的全塞进来了”

曝Windows 12将于今年发布?以AI为核心、NPU成「硬件门槛」,网友吐槽:“不想要的全塞进来了”

整理 | 郑丽媛 出品 | ZEEKLOG(ID:ZEEKLOGnews) 当年,微软一句“Windows 10 将是最后一个版本”的表态,让不少用户以为 Windows 进入了“只更新、不换代”的时代。但几年过去,现实却完全不同。 在 Windows 11 发布之后,如今关于 Windows 12 的传闻再次密集出现。从内部代号、代码片段,到硬件厂商的暗示与 OEM 预热标签,种种线索拼在一起,勾勒出一个明显的趋势——这不会只是一次常规升级,而更像是一次围绕 AI 的平台级重构。 更关键的是,这次争议,可能远比当年 TPM 2.0 更大。 精准卡位 Windows 10 退场的时间?

By Ne0inhk
Python热度下滑、AI能取代搜索引擎?TIOBE最新榜单揭晓!

Python热度下滑、AI能取代搜索引擎?TIOBE最新榜单揭晓!

整理 | 屠敏 出品 | ZEEKLOG(ID:ZEEKLOGnews) 日前,TIOBE 发布了最新的 3 月编程语言榜单。整体来看,本月排名变化不算大,但榜单中仍然出现了一些值得关注的小波动。  AI 工具能帮大家秒懂最新编程语言趋势? 由于 2 月天数较少,3 月的榜单整体变化有限。借着这次发布,TIOBE CEO Paul Jansen 也回应了一个最近被频繁讨论的问题:为什么 TIOBE 指数仍然依赖搜索引擎统计结果?在大语言模型流行的今天,直接询问 AI 哪些编程语言最流行,是不是更简单? 对此,Jansen 的回答是否定的。 他解释称,TIOBE 指数本质上统计的是互联网上关于某种编程语言的网页数量。而大语言模型的训练数据同样来自这些网页内容,因此从信息来源来看,两者并没有本质区别。换句话说,LLM 的判断,本质上也是建立在这些网页数据之上的。 Python 活跃度仍在下降

By Ne0inhk
“裸奔龙虾”数量已达27万只,业内人士警告;AI浪潮下,中传“砍掉”翻译等16个专业;薪资谈判破裂,三星电子8.9万人要罢工 | 极客头条

“裸奔龙虾”数量已达27万只,业内人士警告;AI浪潮下,中传“砍掉”翻译等16个专业;薪资谈判破裂,三星电子8.9万人要罢工 | 极客头条

「极客头条」—— 技术人员的新闻圈! ZEEKLOG 的读者朋友们好,「极客头条」来啦,快来看今天都有哪些值得我们技术人关注的重要新闻吧。(投稿或寻求报道:[email protected]) 整理 | 郑丽媛 出品 | ZEEKLOG(ID:ZEEKLOGnews) 一分钟速览新闻点! * “裸奔龙虾”已高达27万只!业内人士警告:一旦黑客入侵,敏感信息一秒搬空 * 阿里云 CTO 周靖人代管千问模型一号位,刘大一恒管理更多团队 * 中国传媒大学砍掉翻译、摄影等 16 个本科专业,直言教育要面向人机分工时代 * 雷军放话:小米将很快推出 L3、L4 的驾驶 * 消息称原理想汽车智驾一号位郎咸朋具身智能赛道创业 * vivo 前产品经理宋紫薇创业,瞄准 AI 时尚Agent,获亿元融资 * MiniMax 发布龙虾新技能,股价暴涨超 23% * 薪资谈判破裂,三星电子

By Ne0inhk
一天开13个会、一个Bug要修200天!前亚马逊L7爆料:这轮大裁员,AI只是“背锅侠”

一天开13个会、一个Bug要修200天!前亚马逊L7爆料:这轮大裁员,AI只是“背锅侠”

整理 | 郑丽媛 出品 | ZEEKLOG(ID:ZEEKLOGnews) 过去一年,大型科技公司的裁员消息几乎从未停过。但当公司对外给出的理由越来越统一,“AI 让组织更高效”,也有越来越多内部员工开始提出另一种质疑:事情或许没那么简单。 最近,一段来自前亚马逊员工 Becky 的 YouTube 视频在开发者社区流传开来。她曾在亚马逊工作 7 年,其中 5 年担任 L7 级别的技术管理者,负责过团队年度规划(OP1)等核心管理工作——可去年,她主动离开了亚马逊。 就在最近,她的三位前同事接连被裁,其中两人还是 H-1B 签证员工,都背着房贷压力。其中一位同事忍不住给 Becky 发消息:“你去年离开的时候,是不是已经预料到会发生这些?” 对此,Becky 的回答很坦诚:她不知道具体什么时候会裁员,但她早就感觉情况不对劲了。 在她看来,这轮裁员被归因为

By Ne0inhk