跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
C++算法

Linux 进程池实战:基于管道通信的任务分发系统

基于 Linux 匿名管道实现的进程池模型。通过父进程创建子进程并建立单向通信,利用轮询策略分发任务码,子进程循环读取执行。重点涵盖管道 IO 机制、进程生命周期管理、以及优雅退出时的资源回收策略,提供完整 C++ 代码与编译运行指南。

清酒独酌发布于 2026/3/21更新于 2026/5/218 浏览
Linux 进程池实战:基于管道通信的任务分发系统

Linux 进程池实战:基于管道通信的任务分发系统

在 Linux 环境下,进程池是一种高效的并发编程模型。它通过预先创建一组子进程来处理任务,避免了频繁创建和销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。

一、核心设计思路

本进程池实现的核心逻辑如下:

  • 父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 子进程循环读取管道中的任务码,执行对应任务;
  • 父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。

二、代码模块拆解

2.1 任务定义与随机任务生成

这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。

#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>

#define __MAIN__

/////////////////////////////////////////////
// 任务测试代码
/////////////////////////////////////////////

// 定义任务类型:无参数无返回值的函数对象
using task_t = std::function<void()>;

// 具体任务 1:打印日志(带进程 ID 标识)
void  {
    std::cout <<  << () << std::endl;
}


{
    std::cout <<  << () << std::endl;
}


{
    std::cout <<  << () << std::endl;
}


{
    std::cout <<  << () << std::endl;
}


std::vector<> gtasks;


{
    gtasks.(PrintLog);
    gtasks.(DownLoad);
    gtasks.(ReadMysql);
    gtasks.(WriteRedis);
}



{
     ( i = ; i < ; i++) {
        
         code = () % gtasks.();
        (); 
        out->(code);
    }
}








{
     (code) {
         LOG_TASK:  ;
         DOWNLOAD_TASK:  ;
         MYSQL_TASK:  ;
         REDIES_TASK:  ;
        :  ;
    }
}
PrintLog
()
"我是一个打印日志的任务,pid"
getpid
// 具体任务 2:模拟下载
void DownLoad()
"我是一个下载任务,pid"
getpid
// 具体任务 3:模拟访问 MySQL
void ReadMysql()
"我是一个访问数据库的任务,pid"
getpid
// 具体任务 4:模拟访问 Redis
void WriteRedis()
"我是一个访问 Redis 的任务,pid"
getpid
// 全局任务列表:存储所有可执行的任务
task_t
// 加载所有任务到全局列表
void LoadTask()
push_back
push_back
push_back
push_back
// 随机生成 50 个任务码(输出型参数 out 存储结果)
// 作用:模拟业务中随机产生的任务请求
void RandomTask(std::vector<int>* out)
for
int
0
50
// 随机选择任务(0~3)
int
rand
size
usleep
23223
// 模拟任务产生的时间间隔
push_back
// 任务码枚举(增强可读性)
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIES_TASK 3
// 任务码转字符串:方便日志打印
std::string TaskToString(int code)
switch
case
return
"PrintLog"
case
return
"DownLoad"
case
return
"ReadMysql"
case
return
"WriteRedis"
default
return
"Unknown"

2.2 子进程任务处理逻辑

Work 函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务。

/////////////////////////进程池核心代码/////////////////////////

// 子进程工作函数:循环读取管道中的任务码并执行
// rfd:管道读端文件描述符
void Work(int rfd) {
    while (true) {
        int code = 0;
        // 从管道读取任务码(阻塞读)
        ssize_t n = read(rfd, &code, sizeof(code));
        
        // 读取成功且长度正确:执行任务
        if (n > 0 && n == sizeof(int)) {
            if (code >= 0 && code < gtasks.size()) {
                gtasks[code](); // 执行对应任务
            }
        }
        // 读取到 0:表示管道写端关闭(父进程通知退出)
        else if (n == 0) {
            break; // 子进程退出循环
        }
        // 读取错误:直接退出
        else {
            break;
        }
    }
}

2.3 通道(Channel)类:封装父子进程通信

Channel 类封装了'管道写端 + 子进程 ID'的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程)。

// 通道类:管理单个子进程的通信管道和进程 ID
class Channel {
public:
    // 构造函数:初始化管道写端、子进程 ID,生成通道名称
    Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
        _name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
    }

    int Fd() { return _wfd; }
    pid_t SubId() { return _sub_process_id; }
    std::string Name() { return _name; }

    // 关闭管道写端
    void Close() {
        if (_wfd >= 0) close(_wfd);
    }

    // 等待子进程退出(回收资源)
    void Wait() {
        pid_t rid = waitpid(_sub_process_id, nullptr, 0);
        (void)rid; // 屏蔽未使用变量警告
    }

    // 向子进程发送任务码(写管道)
    void SendTask(int taskcode) {
        ssize_t n = write(_wfd, &taskcode, sizeof(taskcode));
        (void)n; // 屏蔽未使用变量警告(实际场景应检查写操作是否成功)
    }

private:
    int _wfd;              // 管道写端文件描述符
    pid_t _sub_process_id; // 对应子进程 ID
    std::string _name;     // 通道名称(调试用)
};

2.4 进程池(ProcessPool)类:核心管理逻辑

ProcessPool 类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池。

class ProcessPool {
private:
    // 轮询选择下一个子进程(负载均衡策略)
    int Next() {
        int choice = _next_choice;
        _next_choice++;
        _next_choice %= _channels.size(); // 取模实现循环
        return choice;
    }

public:
    // 构造函数:初始化进程池大小、轮询索引
    ProcessPool(int number) : _number(number), _next_choice(0) {
        std::cout << "number: " << _number << std::endl;
    }

    // 启动进程池(父进程执行):创建指定数量的子进程和管道
    void Start() {
        for (int i = 0; i < _number; i++) {
            // 1. 创建匿名管道
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0) {
                perror("pipe");
                exit(2);
            }

            // 2. 创建子进程
            pid_t id = fork();
            if (id < 0) {
                perror("fork");
                exit(3);
            } else if (id == 0) {
                // 子进程逻辑
                close(pipefd[1]); // 子进程关闭写端(只读)
                Work(pipefd[0]);  // 执行工作函数
                close(pipefd[0]); // 任务完成后关闭读端
                exit(0);          // 子进程退出
            } else {
                // 父进程逻辑
                close(pipefd[0]); // 父进程关闭读端(只写)
                // 创建通道对象并加入管理列表
                _channels.emplace_back(pipefd[1], id);
            }
        }
    }

    // 推送任务:选择子进程并发送任务码
    void PushTask(int taskcode) {
        // 轮询选择一个子进程
        int who = Next();
        _channels[who].SendTask(taskcode);
        // 打印任务分发日志(调试用)
        std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]" 
                  << "给:" << _channels[who].Name() << std::endl;
    }

    // 停止进程池:关闭所有管道,回收子进程
    void Stop() {
        // version1 -- 可以成功
        // 1. 批量关闭所有管道写端(通知子进程退出)
        for (auto& channel : _channels) {
            channel.Close();
            std::cout << channel.Name() << " close success!" << std::endl;
        }
        sleep(3); // 等待子进程处理完最后任务并退出
        
        // 2. 批量回收子进程资源
        for (auto& channel : _channels) {
            channel.Wait();
            std::cout << channel.Name() << " wait success!" << std::endl;
        }
    }

    // 调试打印:输出所有通道信息
    void DebugPrint() {
        std::cout << "------------------------------------" << std::endl;
        for (auto& channel : _channels) {
            std::cout << channel.Fd() << std::endl;
            std::cout << channel.SubId() << std::endl;
            std::cout << channel.Name() << std::endl;
        }
        std::cout << "------------------------------------" << std::endl;
    }

private:
    std::vector<Channel> _channels; // 管理所有子进程的通道
    int _number;                    // 进程池大小(子进程数量)
    int _next_choice;               // 轮询索引(下一个要分发任务的子进程)
};

2.5 主函数:进程池使用示例

主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池。

#ifdef __MAIN__
// 用法提示函数
static void Usage(const std::string &proc) {
    std::cout << "Usage:\n\t" << proc << " proceess_number" << std::endl;
}

// 程序入口:./process_pool 5(5 为子进程数量)
int main(int argc, char* argv[]) {
    // 检查命令行参数
    if (argc != 2) {
        Usage(argv[0]);
        exit(1);
    }
    int number = std::stoi(argv[1]);

    // 0. 初始化:加载任务、随机生成 50 个任务码
    srand(time(nullptr) ^ getpid()); // 设置随机数种子(结合时间 + 进程 ID)
    LoadTask();
    std::vector<int> task_codes;
    RandomTask(&task_codes);

    // 1. 创建进程池对象(智能指针自动管理内存)
    std::unique_ptr<ProcessPool> pp = std::make_unique<ProcessPool>(number);

    // 2. 启动进程池(创建子进程和管道)
    pp->Start();
    sleep(2); // 等待所有子进程初始化完成

    // 3. 分发所有随机任务
    for (auto task : task_codes) {
        pp->PushTask(task);
        usleep(500000); // 模拟任务分发间隔(500ms)
    }

    // 4. 停止进程池(回收资源)
    pp->Stop();
    return 0;
}
#endif

三、关键知识点解析

3.1 管道通信原理

  • 匿名管道 pipe() 创建的文件描述符对 pipefd[0](读)、pipefd[1](写)是单向的;
  • 父子进程继承管道文件描述符,通过关闭不需要的端实现'父写子读';
  • 当写端关闭后,读端 read() 会返回 0,子进程通过这个信号判断退出。

3.2 轮询负载均衡

Next() 函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。

3.3 进程回收的坑

在 Stop() 函数中,如果像 Version 2 那样关闭管道后立即 waitpid(),子进程可能还在阻塞读管道。此时父进程 waitpid() 会阻塞,而子进程读取到管道关闭后退出,若所有子进程都处于这种状态,会导致死锁。

Version 1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。因此,在实际开发中,应当采用版本 1 或类似策略来确保进程池能够优雅地停止。

四、完整代码展示

为了方便编译运行,这里整合了上述所有模块的完整代码:

#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>

#define __MAIN__

using task_t = std::function<void()>;

void PrintLog() {
    std::cout << "我是一个打印日志的任务,pid" << getpid() << std::endl;
}

void DownLoad() {
    std::cout << "我是一个下载任务,pid" << getpid() << std::endl;
}

void ReadMysql() {
    std::cout << "我是一个访问数据库的任务,pid" << getpid() << std::endl;
}

void WriteRedis() {
    std::cout << "我是一个访问 Redis 的任务,pid" << getpid() << std::endl;
}

std::vector<task_t> gtasks;

void LoadTask() {
    gtasks.push_back(PrintLog);
    gtasks.push_back(DownLoad);
    gtasks.push_back(ReadMysql);
    gtasks.push_back(WriteRedis);
}

void RandomTask(std::vector<int>* out) {
    for (int i = 0; i < 50; i++) {
        int code = rand() % gtasks.size();
        usleep(23223);
        out->push_back(code);
    }
}

#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIES_TASK 3

std::string TaskToString(int code) {
    switch (code) {
        case LOG_TASK: return "PrintLog";
        case DOWNLOAD_TASK: return "DownLoad";
        case MYSQL_TASK: return "ReadMysql";
        case REDIES_TASK: return "WriteRedis";
        default: return "Unknown";
    }
}

void Work(int rfd) {
    while (true) {
        int code = 0;
        ssize_t n = read(rfd, &code, sizeof(code));
        if (n > 0 && n == sizeof(int)) {
            if (code >= 0 && code < gtasks.size()) {
                gtasks[code]();
            }
        } else if (n == 0) {
            break;
        } else {
            break;
        }
    }
}

class Channel {
public:
    Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
        _name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
    }
    int Fd() { return _wfd; }
    pid_t SubId() { return _sub_process_id; }
    std::string Name() { return _name; }
    void Close() { if (_wfd >= 0) close(_wfd); }
    void Wait() { pid_t rid = waitpid(_sub_process_id, nullptr, 0); (void)rid; }
    void SendTask(int taskcode) { ssize_t n = write(_wfd, &taskcode, sizeof(taskcode)); (void)n; }
private:
    int _wfd;
    pid_t _sub_process_id;
    std::string _name;
};

class ProcessPool {
private:
    int Next() {
        int choice = _next_choice;
        _next_choice++;
        _next_choice %= _channels.size();
        return choice;
    }
public:
    ProcessPool(int number) : _number(number), _next_choice(0) {
        std::cout << "number: " << _number << std::endl;
    }
    void Start() {
        for (int i = 0; i < _number; i++) {
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0) { perror("pipe"); exit(2); }
            pid_t id = fork();
            if (id < 0) { perror("fork"); exit(3); }
            else if (id == 0) {
                close(pipefd[1]);
                Work(pipefd[0]);
                close(pipefd[0]);
                exit(0);
            } else {
                close(pipefd[0]);
                _channels.emplace_back(pipefd[1], id);
            }
        }
    }
    void PushTask(int taskcode) {
        int who = Next();
        _channels[who].SendTask(taskcode);
        std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]" 
                  << "给:" << _channels[who].Name() << std::endl;
    }
    void Stop() {
        for (auto& channel : _channels) {
            channel.Close();
            std::cout << channel.Name() << " close success!" << std::endl;
        }
        sleep(3);
        for (auto& channel : _channels) {
            channel.Wait();
            std::cout << channel.Name() << " wait success!" << std::endl;
        }
    }
private:
    std::vector<Channel> _channels;
    int _number;
    int _next_choice;
};

#ifdef __MAIN__
static void Usage(const std::string &proc) {
    std::cout << "Usage:\n\t" << proc << " proceess_number" << std::endl;
}

int main(int argc, char* argv[]) {
    if (argc != 2) { Usage(argv[0]); exit(1); }
    int number = std::stoi(argv[1]);
    srand(time(nullptr) ^ getpid());
    LoadTask();
    std::vector<int> task_codes;
    RandomTask(&task_codes);
    std::unique_ptr<ProcessPool> pp = std::make_unique<ProcessPool>(number);
    pp->Start();
    sleep(2);
    for (auto task : task_codes) {
        pp->PushTask(task);
        usleep(500000);
    }
    pp->Stop();
    return 0;
}
#endif

五、编译与运行

使用以下 Makefile 进行编译:

process_pool: process_pool.cc
	g++ -o $@ $^ -std=c++14

.PHONY: clean
clean:
	rm -f process_pool
  • 编译:直接执行 make;
  • 运行:./process_pool 5(5 为子进程数量,可自定义);
  • 输出:可以看到任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

六、扩展与优化方向

  • 错误处理:当前代码未处理 write()/read() 的错误返回值,实际场景应增加重试、日志记录;
  • 动态扩容:支持运行时增加 / 减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加 SIGCHLD 信号处理,异步回收子进程,避免僵尸进程。

目录

  1. Linux 进程池实战:基于管道通信的任务分发系统
  2. 一、核心设计思路
  3. 二、代码模块拆解
  4. 2.1 任务定义与随机任务生成
  5. 2.2 子进程任务处理逻辑
  6. 2.3 通道(Channel)类:封装父子进程通信
  7. 2.4 进程池(ProcessPool)类:核心管理逻辑
  8. 2.5 主函数:进程池使用示例
  9. 三、关键知识点解析
  10. 3.1 管道通信原理
  11. 3.2 轮询负载均衡
  12. 3.3 进程回收的坑
  13. 四、完整代码展示
  14. 五、编译与运行
  15. 六、扩展与优化方向
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 多模态大模型 Llama 3.2 正式发布,支持视觉推理与边缘部署
  • 深度解析Python.NET:C#调用Python函数的5个关键陷阱
  • AI Agent 基础架构与核心组件详解
  • C++ 继承详解:面向对象代码复用的核心机制
  • Python 属性描述符:原理剖析与 ORM 实战
  • C/C++依赖管理:Conan 深度解析与实战
  • C++ 动态规划:第 N 个泰波那契数与三步问题
  • 大模型浪潮:是泡沫还是技术革命?
  • K8s 集群外通过路由直连 Pod 实战
  • LeetCode 链表专题:分割、相交及环形链表 C++ 解法
  • Java Lambda 和匿名内部类为何不能修改外部变量?final 与等效 final 解析
  • Docker 安装与配置 Neo4j 教程
  • Agent 反思工作流框架 Reflexion 中篇:ReactAgent 实现与原理详解
  • yshopmall 开源电商 SaaS 解决方案解析
  • 内容创作模式解析:UGC、PGC、PUGC、OGC、MGC、BGC 与 AIGC
  • OpenClaw.ai:Agentic AI 时代的 Spring Framework 时刻
  • WebCode 与 Clawdbot:AI 助手框架的技术架构深度对比
  • 华为 OD 机试真题:部门人力分配算法题解
  • 研究发现思维链(CoT)在某些任务中会降低大模型准确率
  • AI 生成海贼王漫画、苹果限制员工用 ChatGPT、李彦宏谈大模型与就业

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online