手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统

手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统
在这里插入图片描述

🔥草莓熊Lotso:个人主页
❄️个人专栏: 《C++知识分享》《Linux 入门到实践:零基础也能懂》
✨生活是默默的坚持,毅力是永久的享受!


🎬 博主简介:

在这里插入图片描述

文章目录


前言:

在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建 / 销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。

一. 核心设计思路

本进程池实现的核心逻辑:

  • 父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 子进程循环读取管道中的任务码,执行对应任务;
  • 父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。
在这里插入图片描述

二. 代码模块拆解

2.1 任务定义与随机任务生成

这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////// 定义任务类型:无参数无返回值的函数对象using task_t = std::function<void()>;// 具体任务1:打印日志(带进程ID标识)voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}// 具体任务2:模拟下载voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}// 具体任务3:模拟访问MySQLvoidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}// 具体任务4:模拟访问RedisvoidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;}// 全局任务列表:存储所有可执行的任务 std::vector<task_t> gtasks;// 加载所有任务到全局列表voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// 随机生成50个任务码(输出型参数out存储结果)// 作用:模拟业务中随机产生的任务请求// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){// 随机选择任务(0~3)int code =rand()% gtasks.size();usleep(23223);// 模拟任务产生的时间间隔 out->push_back(code);}}// 任务码枚举(增强可读性)#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3// 任务码转字符串:方便日志打印 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}

2.2 子进程任务处理逻辑

Work函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务:

/////////////////////////进程池核心代码////////////////////////// 子进程工作函数:循环读取管道中的任务码并执行// rfd:管道读端文件描述符voidWork(int rfd){while(true){int code =0;// 从管道读取任务码(阻塞读) ssize_t n =read(rfd,&code,sizeof(code));// 读取成功且长度正确:执行任务if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();// 执行对应任务}}// 读取到0:表示管道写端关闭(父进程通知退出)elseif(n ==0){break;// 子进程退出循环}// 读取错误:直接退出else{break;}}}

2.3 通道(Channel)类:封装父子进程通信

Channel类封装了 “管道写端 + 子进程 ID” 的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程):

// 通道类:管理单个子进程的通信管道和进程IDclassChannel{public:// 构造函数:初始化管道写端、子进程ID,生成通道名称Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;}// 获取管道写端 pid_t SubId(){return _sub_process_id;}// 获取子进程ID std::string Name(){return _name;}// 获取通道名称(调试用)// 关闭管道写端voidClose(){if(_wfd >=0)close(_wfd);}// 等待子进程退出(回收资源)voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;// 屏蔽未使用变量警告}// 向子进程发送任务码(写管道)voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;// 屏蔽未使用变量警告(实际场景应检查写操作是否成功)}~Channel(){}private:int _wfd;// 管道写端文件描述符 pid_t _sub_process_id;// 对应子进程ID std::string _name;// 通道名称(调试用)};

2.4 进程池(ProcesspPool)类:核心管理逻辑

ProcesspPool类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池:

classProcesspPool{private:// 轮询选择下一个子进程(负载均衡策略)intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();// 取模实现循环return choice;}public:// 构造函数:初始化进程池大小、轮询索引ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 启动进程池(父进程执行):创建指定数量的子进程和管道voidStart(){for(int i =0; i < _number; i++){// 1. 创建匿名管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程逻辑{// 这里后面还有些变化,为了解决下面那个version2close(pipefd[1]);// 子进程关闭写端(只读)Work(pipefd[0]);// 执行工作函数close(pipefd[0]);// 任务完成后关闭读端exit(0);// 子进程退出}else// 父进程逻辑{close(pipefd[0]);// 父进程关闭读端(只写)// 创建通道对象并加入管理列表 _channels.emplace_back(pipefd[1], id);}}}// 推送任务:选择子进程并发送任务码voidPushTask(int taskcode){// 轮询选择一个子进程int who =Next(); _channels[who].SendTask(taskcode);// 打印任务分发日志(调试用) std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 停止进程池:关闭所有管道,回收子进程voidStop(){// version1 -- 可以成功// 1. 批量关闭所有管道写端(通知子进程退出)for(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 等待子进程处理完最后任务并退出// 2. 批量回收子进程资源for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功(原因:关闭管道后立即wait,子进程可能还未处理完读操作,导致阻塞)// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// // version3 -- 可以成功(逆序关闭+回收,避免资源竞争)// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}// 调试打印:输出所有通道信息voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;// 管理所有子进程的通道int _number;// 进程池大小(子进程数量)int _next_choice;// 轮询索引(下一个要分发任务的子进程)};

2.5 主函数:进程池使用示例

主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池:

#ifdef__MAIN__// 用法提示函数staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// 程序入口:./process_pool 5(5为子进程数量)intmain(int argc,char* argv[]){// 检查命令行参数if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 初始化:加载任务、随机生成50个任务码srand(time(nullptr)^getpid());// 设置随机数种子(结合时间+进程ID)LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象(智能指针自动管理内存) std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池(创建子进程和管道) pp->Start();sleep(2);// 等待所有子进程初始化完成// 3. 分发所有随机任务for(auto task : task_codes){ pp->PushTask(task);usleep(500000);// 模拟任务分发间隔(500ms)}// // 注释部分:交互式输入任务码(调试用)// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// }// 4. 停止进程池(回收资源) pp->Stop();return0;}#endif

三. 关键知识点解析

3.1 管道通信原理

  • 匿名管道pipe()创建的文件描述符对pipefd[0](读)、pipefd[1](写)是单向的;
  • 父子进程继承管道文件描述符,通过关闭不需要的端实现 “父写子读”;
  • 当写端关闭后,读端read()会返回 0,子进程通过这个信号判断退出。

3.2 轮询负载均衡

Next()函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。

3.3 进程回收的坑

Stop()函数中 version2 失败的原因:父进程关闭管道后立即waitpid(),子进程可能还在阻塞读管道,此时父进程waitpid()会阻塞,而子进程读取到管道关闭后退出,但若所有子进程都处于这种状态,会导致死锁。version1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。

总结
版本2失败的根本原因是父进程在等待一个子进程时,其他子进程的写端并未关闭(因为都是继承了父进程,关闭了一个,但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的),导致它们无法退出,从而形成串行阻塞。版本1通过先关闭所有写端,让子进程并发退出,避免了这一风险。因此,在实际开发中,应当采用版本1或类似策略来确保进程池能够优雅地停止。
在这里插入图片描述
在这里插入图片描述

我们可以怎么样去修改使version2变成可行的方案?

在这里插入图片描述


在这里插入图片描述

四. 完整代码展示

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////using task_t = std::function<void()>;voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}voidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}voidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;} std::vector<task_t> gtasks;voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){int code =rand()% gtasks.size();usleep(23223); out->push_back(code);}}#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}/////////////////////////进程池代码////////////////////////voidWork(int rfd){while(true){int code =0; ssize_t n =read(rfd,&code,sizeof(code));if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();}}elseif(n ==0){break;// 子进程只要读到返回值为0, 表明父进程让我退出}else{break;}}}classChannel{public:Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;} pid_t SubId(){return _sub_process_id;} std::string Name(){return _name;}voidClose(){if(_wfd >=0)close(_wfd);}voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;}voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;}~Channel(){}private:int _wfd; pid_t _sub_process_id; std::string _name;};classProcesspPool{private:intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();return choice;}public:ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 父进程voidStart(){for(int i =0; i < _number; i++){// 1. 创建管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程{// 关闭父进程历史的wfd!for(auto& channel : _channels) channel.Close();close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(0);}else// 父进程{close(pipefd[0]);// _channels c(pipefd[1], fd);// _channels.push_back(c); _channels.emplace_back(pipefd[1], id);// 内部会直接构造}}}// 1. 什么任务? 任务码决定// 2. 任务给谁? 属于进程池内部操作,负载均衡(我这里是用的轮询的机制)voidPushTask(int taskcode){// 选择一个子进程int who =Next(); _channels[who].SendTask(taskcode); std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 有版本存在一些问题, 后续会说为什么voidStop(){// version1 -- 可以成功// 1. 关闭wfdfor(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 2. 回收子进程for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功???// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// version3 -- 可以成功// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;int _number;int _next_choice;};// 父进程#ifdef__MAIN__staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// ./process_pool 5intmain(int argc,char* argv[]){if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 加载任务并随机生成任务srand(time(nullptr)^getpid());LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象 std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池 pp->Start();sleep(2);for(auto task : task_codes){ pp->PushTask(task);usleep(500000);}// // 自己输入发送任务// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// } pp->Stop();return0;}#endif

五. 编译与运行(附 Makefile)

process_pool:process_pool.cc g++ -o$@ $^ -std=c++14 .PHONY:clean clean: rm-f process_pool 
  • 编译:直接 make;
  • 运行./process_pool 5(5 为子进程数量,可自定义);
  • 输出:可以看到任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

六. 扩展与优化方向

  • 错误处理:当前代码未处理write()/read()的错误返回值,实际场景应增加重试、日志记录;
  • 动态扩容:支持运行时增加 / 减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加SIGCHLD信号处理,异步回收子进程,避免僵尸进程。

结尾:

🍓 我是草莓熊 Lotso!若这篇技术干货帮你打通了学习中的卡点: 👀 【关注】跟我一起深耕技术领域,从基础到进阶,见证每一次成长 ❤️ 【点赞】让优质内容被更多人看见,让知识传递更有力量 ⭐ 【收藏】把核心知识点、实战技巧存好,需要时直接查、随时用 💬 【评论】分享你的经验或疑问(比如曾踩过的技术坑?),一起交流避坑 🗳️ 【投票】用你的选择助力社区内容方向,告诉大家哪个技术点最该重点拆解 技术之路难免有困惑,但同行的人会让前进更有方向~愿我们都能在自己专注的领域里,一步步靠近心中的技术目标! 

结语:本文通过拆解一个极简的进程池实现,带你理解了 Linux 进程间通信、进程管理的核心知识点。这个进程池虽然简单,但涵盖了进程池的核心设计思想,是学习 Linux 并发编程的绝佳案例。

✨把这些内容吃透超牛的!放松下吧✨ʕ˘ᴥ˘ʔづきらど

Read more

深入理解 Linux 基础 IO:从 C 库到系统调用的完整剖析

深入理解 Linux 基础 IO:从 C 库到系统调用的完整剖析

🔥个人主页:Cx330🌸 ❄️个人专栏:《C语言》《LeetCode刷题集》《数据结构-初阶》《C++知识分享》 《优选算法指南-必刷经典100题》《Linux操作系统》:从入门到入魔 《Git深度解析》:版本管理实战全解 🌟心向往之行必能至 🎥Cx330🌸的简介: 目录 前言: 一、理解 “文件” 二、温故知新:C 标准库的文件 IO 操作  2.1 C语言文件操作常用函数 2.2 文件写入:fwrite(附加其他函数) 2.3 文件读取:fread(附加其他函数) 2.4 标准输入输出:stdin、stdout、stderr 三、走进内核:文件相关的系统调用接口 3.

By Ne0inhk
Flutter for OpenHarmony 实战:Injectable — 自动化依赖注入大师

Flutter for OpenHarmony 实战:Injectable — 自动化依赖注入大师

Flutter for OpenHarmony 实战:Injectable — 自动化依赖注入大师 前言 在维护 Flutter for OpenHarmony 商业级项目时,由于功能重叠与模块解耦的需求,代码库中会充斥着大量的 Service(业务服务)、Repository(数据中心)及 Bloc/ViewModel。如果采用手动实例化这些类并逐层透传,代码会迅速演变成不可维护的“意大利面条”。 依赖注入 (Dependency Injection, DI) 是解决该问题的业界公认方案。而 Injectable 配合 GetIt,则是 Dart 生态中实现 DI 的皇冠。它能通过极其简洁的注解,在编译期自动生成复杂的注册代码。本文将带你探索如何利用 Injectable 构建一个灵活适配鸿蒙多运行环境的高级架构。 一、为什么 Injectable 是鸿蒙项目的必选项? 1.1 依赖管理的解耦

By Ne0inhk
Flutter for OpenHarmony 实战:FFIGEN — 自动化打通鸿蒙 C 语言接口

Flutter for OpenHarmony 实战:FFIGEN — 自动化打通鸿蒙 C 语言接口

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 在 Flutter for OpenHarmony 开发中,当我们需要调用鸿蒙系统提供的原生 C/C++ 能力(如:高性能图像处理、系统级的硬件通信、或者是复用现有的 C 语言算法库)时,dart:ffi 是必经之路。 然而,手动编写 C 语言结构体(struct)和函数指针的 Dart 映射代码不仅枯燥无味,还极度容易因为一个字节偏移的错误导致鸿蒙应用直接崩溃(Segment Fault)。ffigen 是 Dart 官方提供的终极工具,它可以通过解析 C 语言头文件(.h),全自动生成安全、高性能的 Dart 胶水代码。本文将教你如何自动化驱动鸿蒙应用的底层性能。 一、

By Ne0inhk
Flutter 组件 flutterw_sidekick_plugin 适配鸿蒙 HarmonyOS 实战:侧翼脚手架扩展,构建工程自动化与环境一致性治理架构

Flutter 组件 flutterw_sidekick_plugin 适配鸿蒙 HarmonyOS 实战:侧翼脚手架扩展,构建工程自动化与环境一致性治理架构

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 组件 flutterw_sidekick_plugin 适配鸿蒙 HarmonyOS 实战:侧翼脚手架扩展,构建工程自动化与环境一致性治理架构 前言 在鸿蒙(OpenHarmony)生态迈向大规模团队协作、涉及多分支并行开发及复杂的 SDK 版本管控的背景下,如何确保每一位开发者的本地构建环境(Flutter/Dart SDK)与生产基准完全对齐,已成为保障项目交付质量的“工程定海神针”。在鸿蒙设备这类强调定制化编译工具链与私有插件依赖的环境下,如果团队缺乏统一的脚手架工具,由于由于本地 SDK 版本的微小代差(如空安全检测差异),极易由于由于“环境不一致”导致代码在不同机器上产生不可预知的编译崩溃。 我们需要一种能够深度集成 Sidekick、支持自定义命令扩展且具备“强制版本锁死”能力的脚手架治理方案。 flutterw_sidekick_plugin 为 Flutter 开发者引入了基于 Sidekick

By Ne0inhk