手搓简易 Linux 进程池:基于管道实现任务分发
在 Linux 环境下,进程池是一种高效的并发编程模型。它通过预先创建一组子进程来处理任务,避免了频繁创建和销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解其核心设计思路、通信原理和任务分发机制。
核心设计思路
本进程池的核心逻辑如下:
- 进程创建:父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
- 任务分发:父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
- 任务执行:子进程循环读取管道中的任务码,执行对应任务;
- 资源回收:父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。
代码模块拆解
1. 任务定义与生成
这部分定义了进程池要执行的具体任务,以及随机生成任务的工具函数,模拟业务场景。
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>
using task_t = std::function<void()>;
// 具体任务示例
void PrintLog() {
std::cout << "我是一个打印日志的任务,pid" << getpid() << std::endl;
}
void DownLoad() {
std::cout << "我是一个下载任务,pid" << getpid() << std::endl;
}
void ReadMysql() {
std::cout << "我是一个访问数据库的任务,pid" << getpid() << std::endl;
}
void WriteRedis() {
std::cout << "我是一个访问 Redis 的任务,pid" << getpid() << std::endl;
}
// 全局任务列表
std::vector<task_t> gtasks;
void LoadTask() {
gtasks.push_back(PrintLog);
gtasks.push_back(DownLoad);
gtasks.push_back(ReadMysql);
gtasks.push_back(WriteRedis);
}
// 随机生成任务码
void RandomTask(std::vector<int>* out) {
for (int i = 0; i < 50; i++) {
int code = rand() % gtasks.size();
usleep(23223); // 模拟任务产生的时间间隔
out->push_back(code);
}
}
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIS_TASK 3
std::string TaskToString(int code) {
switch (code) {
case LOG_TASK: return "PrintLog";
case DOWNLOAD_TASK: return "DownLoad";
case MYSQL_TASK: return "ReadMysql";
case REDIS_TASK: return "WriteRedis";
default: return "Unknown";
}
}
2. 子进程工作逻辑
Work 函数是子进程的核心,负责从管道读取任务码并执行。当读到管道关闭信号时退出。
void Work(int rfd) {
while (true) {
int code = 0;
// 阻塞读
ssize_t n = read(rfd, &code, sizeof(code));
if (n > 0 && n == sizeof(int)) {
if (code >= 0 && code < gtasks.size()) {
gtasks[code](); // 执行任务
}
} else if (n == 0) {
break; // 管道写端关闭,退出
} else {
break; // 错误退出
}
}
}
3. 通道类封装
Channel 类封装了管道写端和子进程 ID 的关联,简化父进程对单个子进程的管理。
class Channel {
public:
Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
_name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
}
int Fd() { return _wfd; }
pid_t SubId() { return _sub_process_id; }
std::string Name() { return _name; }
void Close() {
if (_wfd >= 0) close(_wfd);
}
void Wait() {
pid_t rid = waitpid(_sub_process_id, nullptr, 0);
(void)rid;
}
void SendTask(int taskcode) {
ssize_t n = write(_wfd, &taskcode, sizeof(taskcode));
(void)n;
}
private:
int _wfd;
pid_t _sub_process_id;
std::string _name;
};
4. 进程池管理逻辑
ProcessPool 类负责创建子进程、管理通道、分发任务和停止进程池。
class ProcessPool {
private:
// 轮询选择下一个子进程
int Next() {
int choice = _next_choice;
_next_choice++;
_next_choice %= _channels.size();
return choice;
}
public:
ProcessPool(int number) : _number(number), _next_choice(0) {
std::cout << "number: " << _number << std::endl;
}
void Start() {
for (int i = 0; i < _number; i++) {
int pipefd[2];
if (pipe(pipefd) < 0) {
perror("pipe");
exit(2);
}
pid_t id = fork();
if (id < 0) {
perror("fork");
exit(3);
} else if (id == 0) {
// 子进程逻辑
close(pipefd[1]); // 关闭写端
Work(pipefd[0]); // 执行工作
close(pipefd[0]);
exit(0);
} else {
// 父进程逻辑
close(pipefd[0]); // 关闭读端
_channels.emplace_back(pipefd[1], id);
}
}
}
void PushTask(int taskcode) {
int who = Next();
_channels[who].SendTask(taskcode);
std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]"
<< "给:" << _channels[who].Name() << std::endl;
}
void Stop() {
// 版本 1:先批量关闭所有管道,再统一回收
for (auto& channel : _channels) {
channel.Close();
std::cout << channel.Name() << " close success!" << std::endl;
}
sleep(3); // 等待子进程处理完最后任务并退出
for (auto& channel : _channels) {
channel.Wait();
std::cout << channel.Name() << " wait success!" << std::endl;
}
}
private:
std::vector<Channel> _channels;
int _number;
int _next_choice;
};
5. 主函数入口
#ifdef __MAIN__
static void Usage(const std::string &proc) {
std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
Usage(argv[0]);
exit(1);
}
int number = std::stoi(argv[1]);
srand(time(nullptr) ^ getpid());
LoadTask();
std::vector<int> task_codes;
RandomTask(&task_codes);
auto pp = std::make_unique<ProcessPool>(number);
pp->Start();
sleep(2); // 等待子进程初始化
for (auto task : task_codes) {
pp->PushTask(task);
usleep(500000); // 模拟分发间隔
}
pp->Stop();
return 0;
}
#endif
关键知识点解析
1. 管道通信原理
匿名管道 pipe() 创建的文件描述符对中,pipefd[0] 为读端,pipefd[1] 为写端。父子进程继承后,通过关闭不需要的端实现'父写子读'。当所有写端关闭后,读端 read() 会返回 0,子进程据此判断退出。
2. 轮询负载均衡
Next() 函数通过递增取模的方式循环选择子进程,确保任务均匀分发,避免单点过载。
3. 进程回收的坑
在 Stop() 阶段,如果父进程关闭某个管道的写端后立即调用 waitpid(),而该子进程可能还在阻塞读取其他未关闭的管道(因为 fork 继承),或者父进程在等待一个子进程时导致其他子进程无法收到退出信号,从而引发死锁或僵尸进程。
解决方案:采用'先批量关闭所有写端,等待一段时间,再统一回收'的策略(如上述代码中的版本 1)。这样能确保所有子进程同时感知到退出信号,避免串行阻塞。
编译与运行
使用 Makefile 进行编译:
process_pool: process_pool.cc
g++ -o $@ $^ -std=c++14
.PHONY: clean
clean:
rm -f process_pool
- 编译:直接执行
make; - 运行:
./process_pool 5(5 为子进程数量); - 输出:任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。
扩展与优化方向
- 错误处理:当前代码未处理
write()/read()的错误返回值,实际场景应增加重试或日志记录; - 动态扩容:支持运行时增加或减少子进程数量;
- 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
- 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
- 信号处理:增加
SIGCHLD信号处理,异步回收子进程,彻底避免僵尸进程。
本文通过拆解一个极简的进程池实现,涵盖了 Linux 进程间通信、进程管理的核心知识点。这个进程池虽然简单,但包含了并发编程的关键设计思想,是学习 Linux 系统编程的绝佳案例。


