前言
在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建/销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。
Linux 进程池通过预创建子进程避免频繁创建销毁开销。基于匿名管道和轮询策略,使用 C++ 实现简易进程池。父进程分发任务码,子进程读取执行。解析管道通信原理、负载均衡机制及进程回收死锁问题。提供完整代码示例及 Makefile 编译方法,探讨错误处理、动态扩容等优化方向。

在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建/销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。
本进程池实现的核心逻辑:
这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>
#define __MAIN__
//////////////////////////////////////////任务测试代码/////////////////////////////////////////
// 定义任务类型:无参数无返回值的函数对象
using task_t = std::function<void()>;
// 具体任务 1:打印日志(带进程 ID 标识)
void PrintLog() {
std::cout << "我是一个打印日志的任务,pid" << getpid() << std::endl;
}
// 具体任务 2:模拟下载
void DownLoad() {
std::cout << "我是一个下载任务,pid" << getpid() << std::endl;
}
// 具体任务 3:模拟访问 MySQL
void ReadMysql() {
std::cout << "我是一个访问数据库的任务,pid" << getpid() << std::endl;
}
// 具体任务 4:模拟访问 Redis
void WriteRedis() {
std::cout << "我是一个访问 Redis 的任务,pid" << getpid() << std::endl;
}
// 全局任务列表:存储所有可执行的任务
std::vector<task_t> gtasks;
// 加载所有任务到全局列表
void LoadTask() {
gtasks.push_back(PrintLog);
gtasks.push_back(DownLoad);
gtasks.push_back(ReadMysql);
gtasks.push_back(WriteRedis);
}
// 随机生成 50 个任务码(输出型参数 out 存储结果)
// 作用:模拟业务中随机产生的任务请求
// *: 输出型参数
// const &: 输入型参数
// &: 输入输出型参数
void RandomTask(std::vector<int>* out) {
for (int i = 0; i < 50; i++) {
// 随机选择任务(0~3)
int code = rand() % gtasks.size();
usleep(23223); // 模拟任务产生的时间间隔
out->push_back(code);
}
}
// 任务码枚举(增强可读性)
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIES_TASK 3
// 任务码转字符串:方便日志打印
std::string TaskToString(int code) {
switch (code) {
case LOG_TASK: return "PrintLog";
case DOWNLOAD_TASK: return "DownLoad";
case MYSQL_TASK: return "ReadMysql";
case REDIES_TASK: return "WriteRedies";
default: return "Unknown";
}
}
Work 函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务:
/////////////////////////进程池核心代码/////////////////////////
// 子进程工作函数:循环读取管道中的任务码并执行
// rfd:管道读端文件描述符
void Work(int rfd) {
while (true) {
int code = 0;
// 从管道读取任务码(阻塞读)
ssize_t n = read(rfd, &code, sizeof(code));
// 读取成功且长度正确:执行任务
if (n > 0 && n == sizeof(int)) {
if (code >= 0 && code < gtasks.size()) {
gtasks[code](); // 执行对应任务
}
}
// 读取到 0:表示管道写端关闭(父进程通知退出)
else if (n == 0) {
break; // 子进程退出循环
}
// 读取错误:直接退出
else {
break;
}
}
}
Channel 类封装了'管道写端 + 子进程 ID'的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程):
// 通道类:管理单个子进程的通信管道和进程 ID
class Channel {
public:
// 构造函数:初始化管道写端、子进程 ID,生成通道名称
Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
_name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
}
int Fd() { return _wfd; }
pid_t SubId() { return _sub_process_id; }
std::string Name() { return _name; }
// 关闭管道写端
void Close() {
if (_wfd >= 0) close(_wfd);
}
// 等待子进程退出(回收资源)
void Wait() {
pid_t rid = waitpid(_sub_process_id, nullptr, 0);
(void)rid; // 屏蔽未使用变量警告
}
// 向子进程发送任务码(写管道)
void SendTask(int taskcode) {
ssize_t n = write(_wfd, &taskcode, sizeof(taskcode));
(void)n; // 屏蔽未使用变量警告(实际场景应检查写操作是否成功)
}
~Channel() {}
private:
int _wfd; // 管道写端文件描述符
pid_t _sub_process_id; // 对应子进程 ID
std::string _name; // 通道名称(调试用)
};
ProcessPool 类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池:
class ProcessPool {
private:
// 轮询选择下一个子进程(负载均衡策略)
int Next() {
int choice = _next_choice;
_next_choice++;
_next_choice %= _channels.size(); // 取模实现循环
return choice;
}
public:
// 构造函数:初始化进程池大小、轮询索引
ProcessPool(int number) : _number(number), _next_choice(0) {
std::cout << "number: " << _number << std::endl;
}
// 启动进程池(父进程执行):创建指定数量的子进程和管道
void Start() {
for (int i = 0; i < _number; i++) {
// 1. 创建匿名管道
int pipefd[2];
int n = pipe(pipefd);
if (n < 0) {
perror("pipe");
exit(2);
}
// 2. 创建子进程
pid_t id = fork();
if (id < 0) {
perror("fork");
exit(3);
} else if (id == 0) {
// 子进程逻辑
close(pipefd[1]); // 子进程关闭写端(只读)
Work(pipefd[0]); // 执行工作函数
close(pipefd[0]); // 任务完成后关闭读端
exit(0); // 子进程退出
} else {
// 父进程逻辑
close(pipefd[0]); // 父进程关闭读端(只写)
// 创建通道对象并加入管理列表
_channels.emplace_back(pipefd[1], id);
}
}
}
// 推送任务:选择子进程并发送任务码
void PushTask(int taskcode) {
// 轮询选择一个子进程
int who = Next();
_channels[who].SendTask(taskcode);
// 打印任务分发日志(调试用)
std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]" << "给:" << _channels[who].Name() << std::endl;
}
// 停止进程池:关闭所有管道,回收子进程
void Stop() {
// version1 -- 可以成功
// 1. 批量关闭所有管道写端(通知子进程退出)
for (auto& channel : _channels) {
channel.Close();
std::cout << channel.Name() << " close success!" << std::endl;
}
sleep(3); // 等待子进程处理完最后任务并退出
// 2. 批量回收子进程资源
for (auto& channel : _channels) {
channel.Wait();
std::cout << channel.Name() << " wait success!" << std::endl;
}
// version2 -- 不能成功(原因:关闭管道后立即 wait,子进程可能还未处理完读操作,导致阻塞)
// for(auto& channel: _channels)
// {
// channel.Close();
// channel.Wait();
// std::cout << channel.Name() << " close and wait success!" << std::endl;
// }
// version3 -- 可以成功(逆序关闭 + 回收,避免资源竞争)
// int end = _channels.size() - 1;
// while(end >= 0)
// {
// _channels[end].Close();
// _channels[end].Wait();
// std::cout << channel.Name() << " close and wait success!" << std::endl;
// end--;
// }
}
// 调试打印:输出所有通道信息
void DebugPrint() {
std::cout << "------------------------------------" << std::endl;
for (auto& channel : _channels) {
std::cout << channel.Fd() << std::endl;
std::cout << channel.SubId() << std::endl;
std::cout << channel.Name() << std::endl;
}
std::cout << "------------------------------------" << std::endl;
}
~ProcessPool() {}
private:
std::vector<Channel> _channels; // 管理所有子进程的通道
int _number; // 进程池大小(子进程数量)
int _next_choice; // 轮询索引(下一个要分发任务的子进程)
};
主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池:
#ifdef __MAIN__
// 用法提示函数
static void Usage(const std::string &proc) {
std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}
// 程序入口:./process_pool 5(5 为子进程数量)
int main(int argc, char* argv[]) {
// 检查命令行参数
if (argc != 2) {
Usage(argv[0]);
exit(1);
}
int number = std::stoi(argv[1]);
// 0. 初始化:加载任务、随机生成 50 个任务码
srand(time(nullptr) ^ getpid()); // 设置随机数种子(结合时间 + 进程 ID)
LoadTask();
std::vector<int> task_codes;
RandomTask(&task_codes);
// 1. 创建进程池对象(智能指针自动管理内存)
std::unique_ptr<ProcessPool> pp = std::make_unique<ProcessPool>(number);
// 2. 启动进程池(创建子进程和管道)
pp->Start();
sleep(2); // 等待所有子进程初始化完成
// 3. 分发所有随机任务
for (auto task : task_codes) {
pp->PushTask(task);
usleep(500000); // 模拟任务分发间隔(500ms)
}
// 注释部分:交互式输入任务码(调试用)
// while(true)
// {
// int code = 0;
// std::cout << "Please Enter Your Task# ";
// std::cin >> code;
// if(code < 0 || code > gtasks.size())
// {
// std::cout << "任务码错误,请重新输入" << std::endl;
// continue;
// }
// pp->PushTask(code);
// }
// 4. 停止进程池(回收资源)
pp->Stop();
return 0;
}
#endif
pipe() 创建的文件描述符对 pipefd[0](读)、pipefd[1](写)是单向的;read() 会返回 0,子进程通过这个信号判断退出。Next() 函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。
Stop() 函数中 version2 失败的原因:父进程关闭管道后立即 waitpid(),子进程可能还在阻塞读管道,此时父进程 waitpid() 会阻塞,而子进程读取到管道关闭后退出,但若所有子进程都处于这种状态,会导致死锁。version1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。
总结:
版本 2 失败的根本原因是父进程在等待一个子进程时,其他子进程的写端并未关闭 (因为都是继承了父进程,关闭了一个,但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的),导致它们无法退出,从而形成串行阻塞。版本 1 通过先关闭所有写端,让子进程并发退出,避免了这一风险。因此,在实际开发中,应当采用版本 1 或类似策略来确保进程池能够优雅地停止。
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>
#define __MAIN__
//////////////////////////////////////////任务测试代码///////////////////////////////////////
using task_t = std::function<void()>;
void PrintLog() {
std::cout << "我是一个打印日志的任务,pid" << getpid() << std::endl;
}
void DownLoad() {
std::cout << "我是一个下载任务,pid" << getpid() << std::endl;
}
void ReadMysql() {
std::cout << "我是一个访问数据库的任务,pid" << getpid() << std::endl;
}
void WriteRedis() {
std::cout << "我是一个访问 Redis 的任务,pid" << getpid() << std::endl;
}
std::vector<task_t> gtasks;
void LoadTask() {
gtasks.push_back(PrintLog);
gtasks.push_back(DownLoad);
gtasks.push_back(ReadMysql);
gtasks.push_back(WriteRedis);
}
// *: 输出型参数
// const &: 输入型参数
// &: 输入输出型参数
void RandomTask(std::vector<int>* out) {
for (int i = 0; i < 50; i++) {
int code = rand() % gtasks.size();
usleep(23223);
out->push_back(code);
}
}
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIES_TASK 3
std::string TaskToString(int code) {
switch (code) {
case LOG_TASK: return "PrintLog";
case DOWNLOAD_TASK: return "DownLoad";
case MYSQL_TASK: return "ReadMysql";
case REDIES_TASK: return "WriteRedies";
default: return "Unknown";
}
}
/////////////////////////进程池代码/////////////////////////
void Work(int rfd) {
while (true) {
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));
if (n > 0 && n == sizeof(int)) {
if (code >= 0 && code < gtasks.size()) {
gtasks[code]();
}
} else if (n == 0) {
break; // 子进程只要读到返回值为 0,表明父进程让我退出
} else {
break;
}
}
}
class Channel {
public:
Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
_name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
}
int Fd() { return _wfd; }
pid_t SubId() { return _sub_process_id; }
std::string Name() { return _name; }
void Close() {
if (_wfd >= 0) close(_wfd);
}
void Wait() {
pid_t rid = waitpid(_sub_process_id, nullptr, 0);
(void)rid;
}
void SendTask(int taskcode) {
ssize_t n = write(_wfd, &taskcode, sizeof(taskcode));
(void)n;
}
~Channel() {}
private:
int _wfd;
pid_t _sub_process_id;
std::string _name;
};
class ProcessPool {
private:
int Next() {
int choice = _next_choice;
_next_choice++;
_next_choice %= _channels.size();
return choice;
}
public:
ProcessPool(int number) : _number(number), _next_choice(0) {
std::cout << "number: " << _number << std::endl;
}
// 父进程
void Start() {
for (int i = 0; i < _number; i++) {
// 1. 创建管道
int pipefd[2];
int n = pipe(pipefd);
if (n < 0) {
perror("pipe");
exit(2);
}
// 2. 创建子进程
pid_t id = fork();
if (id < 0) {
perror("fork");
exit(3);
} else if (id == 0) {
// 子进程
// 关闭父进程历史的 wfd!
for (auto& channel : _channels) channel.Close();
close(pipefd[1]);
Work(pipefd[0]);
close(pipefd[0]);
exit(0);
} else {
// 父进程
close(pipefd[0]);
_channels.emplace_back(pipefd[1], id);
}
}
}
// 1. 什么任务?任务码决定
// 2. 任务给谁?属于进程池内部操作,负载均衡 (我这里是用的轮询的机制)
void PushTask(int taskcode) {
// 选择一个子进程
int who = Next();
_channels[who].SendTask(taskcode);
std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]" << "给:" << _channels[who].Name() << std::endl;
}
// 有版本存在一些问题,后续会说为什么
void Stop() {
// version1 -- 可以成功
// 1. 关闭 wfd
for (auto& channel : _channels) {
channel.Close();
std::cout << channel.Name() << " close success!" << std::endl;
}
sleep(3);
// 2. 回收子进程
for (auto& channel : _channels) {
channel.Wait();
std::cout << channel.Name() << " wait success!" << std::endl;
}
// version2 -- 不能成功???
// for(auto& channel: _channels)
// {
// channel.Close();
// channel.Wait();
// std::cout << channel.Name() << " close and wait success!" << std::endl;
// }
// version3 -- 可以成功
// int end = _channels.size() - 1;
// while(end >= 0)
// {
// _channels[end].Close();
// _channels[end].Wait();
// std::cout << channel.Name() << " close and wait success!" << std::endl;
// end--;
// }
}
void DebugPrint() {
std::cout << "------------------------------------" << std::endl;
for (auto& channel : _channels) {
std::cout << channel.Fd() << std::endl;
std::cout << channel.SubId() << std::endl;
std::cout << channel.Name() << std::endl;
}
std::cout << "------------------------------------" << std::endl;
}
~ProcessPool() {}
private:
std::vector<Channel> _channels;
int _number;
int _next_choice;
};
// 父进程
#ifdef __MAIN__
static void Usage(const std::string &proc) {
std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}
// ./process_pool 5
int main(int argc, char* argv[]) {
if (argc != 2) {
Usage(argv[0]);
exit(1);
}
int number = std::stoi(argv[1]);
// 0. 加载任务并随机生成任务
srand(time(nullptr) ^ getpid());
LoadTask();
std::vector<int> task_codes;
RandomTask(&task_codes);
// 1. 创建进程池对象
std::unique_ptr<ProcessPool> pp = std::make_unique<ProcessPool>(number);
// 2. 启动进程池
pp->Start();
sleep(2);
for (auto task : task_codes) {
pp->PushTask(task);
usleep(500000);
}
// 自己输入发送任务
// while(true)
// {
// int code = 0;
// std::cout << "Please Enter Your Task# ";
// std::cin >> code;
// if(code < 0 || code > gtasks.size())
// {
// std::cout << "任务码错误,请重新输入" << std::endl;
// continue;
// }
// pp->PushTask(code);
// }
pp->Stop();
return 0;
}
#endif
process_pool: process_pool.cc
g++ -o$@ $^ -std=c++14
.PHONY: clean
clean:
rm -f process_pool
./process_pool 5(5 为子进程数量,可自定义);write()/read() 的错误返回值,实际场景应增加重试、日志记录;SIGCHLD 信号处理,异步回收子进程,避免僵尸进程。
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online