跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
C++算法

Linux 进程池实现:基于管道的任务分发系统

基于 Linux 匿名管道实现的进程池模型,通过父进程创建子进程建立单向通信通道,采用轮询策略分发任务码至子进程执行。重点阐述管道读写端关闭机制、子进程退出判断及资源回收逻辑,对比不同停止策略下的死锁风险,提供完整 C++ 代码示例与编译运行方法,适用于学习进程间通信与并发编程基础。

dehua dong发布于 2026/3/26更新于 2026/6/923 浏览
Linux 进程池实现:基于管道的任务分发系统

Linux 进程池实现:基于管道的任务分发系统

前言

在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建/销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。

一. 核心设计思路

本进程池实现的核心逻辑:

  • 父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 子进程循环读取管道中的任务码,执行对应任务;
  • 父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。

二. 代码模块拆解

2.1 任务定义与随机任务生成

这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。

#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>

#define __MAIN__

//////////////////////////////////////////////任务测试代码/////////////////////////////////////////
// 定义任务类型:无参数无返回值的函数对象
using task_t = std::function<void()>;

// 具体任务 1:打印日志(带进程 ID 标识)
void PrintLog() {
    std::cout <<  << () << std::endl;
}


{
    std::cout <<  << () << std::endl;
}


{
    std::cout <<  << () << std::endl;
}


{
    std::cout <<  << () << std::endl;
}


std::vector<> gtasks;


{
    gtasks.(PrintLog);
    gtasks.(DownLoad);
    gtasks.(ReadMysql);
    gtasks.(WriteRedis);
}






{
     ( i = ; i < ; i++) {
        
         code = () % gtasks.();
        (); 
        out->(code);
    }
}








{
     (code) {
     LOG_TASK:
         ;
     DOWNLOAD_TASK:
         ;
     MYSQL_TASK:
         ;
     REDIES_TASK:
         ;
    :
         ;
    }
}
"我是一个打印日志的任务,pid"
getpid
// 具体任务 2:模拟下载
void DownLoad()
"我是一个下载任务,pid"
getpid
// 具体任务 3:模拟访问 MySQL
void ReadMysql()
"我是一个访问数据库的任务,pid"
getpid
// 具体任务 4:模拟访问 Redis
void WriteRedis()
"我是一个访问 redis 的任务,pid"
getpid
// 全局任务列表:存储所有可执行的任务
task_t
// 加载所有任务到全局列表
void LoadTask()
push_back
push_back
push_back
push_back
// 随机生成 50 个任务码(输出型参数 out 存储结果)
// 作用:模拟业务中随机产生的任务请求
// *: 输出型参数
// const &: 输入型参数
// &: 输入输出型
void RandomTask(std::vector<int>* out)
for
int
0
50
// 随机选择任务(0~3)
int
rand
size
usleep
23223
// 模拟任务产生的时间间隔
push_back
// 任务码枚举(增强可读性)
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIES_TASK 3
// 任务码转字符串:方便日志打印
std::string TaskToString(int code)
switch
case
return
"PrintLog"
case
return
"DownLoad"
case
return
"ReadMysql"
case
return
"WriteRedies"
default
return
"Unknown"

2.2 子进程任务处理逻辑

Work 函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务:

/////////////////////////进程池核心代码//////////////////////////
// 子进程工作函数:循环读取管道中的任务码并执行
// rfd:管道读端文件描述符
void Work(int rfd) {
    while (true) {
        int code = 0;
        // 从管道读取任务码(阻塞读)
        ssize_t n = read(rfd, &code, sizeof(code));
        // 读取成功且长度正确:执行任务
        if (n > 0 && n == sizeof(int)) {
            if (code >= 0 && code < gtasks.size()) {
                gtasks[code](); // 执行对应任务
            }
        }
        // 读取到 0:表示管道写端关闭(父进程通知退出)
        else if (n == 0) {
            break; // 子进程退出循环
        }
        // 读取错误:直接退出
        else {
            break;
        }
    }
}

2.3 通道(Channel)类:封装父子进程通信

Channel 类封装了'管道写端 + 子进程 ID'的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程):

// 通道类:管理单个子进程的通信管道和进程 ID
class Channel {
public:
    // 构造函数:初始化管道写端、子进程 ID,生成通道名称
    Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
        _name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
    }

    int Fd() { return _wfd; }
    pid_t SubId() { return _sub_process_id; }
    std::string Name() { return _name; }

    // 获取通道名称(调试用)
    // 关闭管道写端
    void Close() {
        if (_wfd >= 0) close(_wfd);
    }

    // 等待子进程退出(回收资源)
    void Wait() {
        pid_t rid = waitpid(_sub_process_id, nullptr, 0);
        (void)rid; // 屏蔽未使用变量警告
    }

    // 向子进程发送任务码(写管道)
    void SendTask(int taskcode) {
        ssize_t n = write(_wfd, &taskcode, sizeof(taskcode));
        (void)n; // 屏蔽未使用变量警告(实际场景应检查写操作是否成功)
    }

    ~Channel() {}

private:
    int _wfd;              // 管道写端文件描述符
    pid_t _sub_process_id; // 对应子进程 ID
    std::string _name;     // 通道名称(调试用)
};

2.4 进程池(ProcessPool)类:核心管理逻辑

ProcessPool 类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池:

class ProcessPool {
private:
    // 轮询选择下一个子进程(负载均衡策略)
    int Next() {
        int choice = _next_choice;
        _next_choice++;
        _next_choice %= _channels.size(); // 取模实现循环
        return choice;
    }

public:
    // 构造函数:初始化进程池大小、轮询索引
    ProcessPool(int number) : _number(number), _next_choice(0) {
        std::cout << "number: " << _number << std::endl;
    }

    // 启动进程池(父进程执行):创建指定数量的子进程和管道
    void Start() {
        for (int i = 0; i < _number; i++) {
            // 1. 创建匿名管道
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0) {
                perror("pipe");
                exit(2);
            }
            // 2. 创建子进程
            pid_t id = fork();
            if (id < 0) {
                perror("fork");
                exit(3);
            } else if (id == 0) // 子进程逻辑
            {
                // 这里后面还有些变化,为了解决下面那个 version2
                close(pipefd[1]); // 子进程关闭写端(只读)
                Work(pipefd[0]);  // 执行工作函数
                close(pipefd[0]); // 任务完成后关闭读端
                exit(0);          // 子进程退出
            } else // 父进程逻辑
            {
                close(pipefd[0]); // 父进程关闭读端(只写)
                // 创建通道对象并加入管理列表
                _channels.emplace_back(pipefd[1], id);
            }
        }
    }

    // 推送任务:选择子进程并发送任务码
    void PushTask(int taskcode) {
        // 轮询选择一个子进程
        int who = Next();
        _channels[who].SendTask(taskcode);
        // 打印任务分发日志(调试用)
        std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]" << "给:" << _channels[who].Name() << std::endl;
    }

    // 停止进程池:关闭所有管道,回收子进程
    void Stop() {
        // version1 -- 可以成功
        // 1. 批量关闭所有管道写端(通知子进程退出)
        for (auto& channel : _channels) {
            channel.Close();
            std::cout << channel.Name() << " close success!" << std::endl;
        }
        sleep(3); // 等待子进程处理完最后任务并退出
        // 2. 批量回收子进程资源
        for (auto& channel : _channels) {
            channel.Wait();
            std::cout << channel.Name() << " wait success!" << std::endl;
        }
        // version2 -- 不能成功(原因:关闭管道后立即 wait,子进程可能还未处理完读操作,导致阻塞)
        // for(auto& channel: _channels)
        // {
        //     channel.Close();
        //     channel.Wait();
        //     std::cout << channel.Name() << " close and wait success!" << std::endl;
        // }
        // version3 -- 可以成功(逆序关闭 + 回收,避免资源竞争)
        // int end = _channels.size() - 1;
        // while(end >= 0)
        // {
        //     _channels[end].Close();
        //     _channels[end].Wait();
        //     std::cout << channel.Name() << " close and wait success!" << std::endl;
        //     end--;
        // }
    }

    // 调试打印:输出所有通道信息
    void DebugPrint() {
        std::cout << "------------------------------------" << std::endl;
        for (auto& channel : _channels) {
            std::cout << channel.Fd() << std::endl;
            std::cout << channel.SubId() << std::endl;
            std::cout << channel.Name() << std::endl;
        }
        std::cout << "------------------------------------" << std::endl;
    }

    ~ProcessPool() {}

private:
    std::vector<Channel> _channels; // 管理所有子进程的通道
    int _number;                    // 进程池大小(子进程数量)
    int _next_choice;               // 轮询索引(下一个要分发任务的子进程)
};

2.5 主函数:进程池使用示例

主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池:

#ifdef __MAIN__
// 用法提示函数
static void Usage(const std::string &proc) {
    std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}

// 程序入口:./process_pool 5(5 为子进程数量)
int main(int argc, char* argv[]) {
    // 检查命令行参数
    if (argc != 2) {
        Usage(argv[0]);
        exit(1);
    }
    int number = std::stoi(argv[1]);

    // 0. 初始化:加载任务、随机生成 50 个任务码
    srand(time(nullptr) ^ getpid()); // 设置随机数种子(结合时间 + 进程 ID)
    LoadTask();
    std::vector<int> task_codes;
    RandomTask(&task_codes);

    // 1. 创建进程池对象(智能指针自动管理内存)
    std::unique_ptr<ProcessPool> pp = std::make_unique<ProcessPool>(number);

    // 2. 启动进程池(创建子进程和管道)
    pp->Start();
    sleep(2); // 等待所有子进程初始化完成

    // 3. 分发所有随机任务
    for (auto task : task_codes) {
        pp->PushTask(task);
        usleep(500000); // 模拟任务分发间隔(500ms)
    }

    // 注释部分:交互式输入任务码(调试用)
    // while(true)
    // {
    //     int code = 0;
    //     std::cout << "Please Enter Your Task# ";
    //     std::cin >> code;
    //     if(code < 0 || code > gtasks.size())
    //     {
    //         std::cout << "任务码错误,请重新输入" << std::endl;
    //         continue;
    //     }
    //     pp->PushTask(code);
    // }

    // 4. 停止进程池(回收资源)
    pp->Stop();
    return 0;
}
#endif

三. 关键知识点解析

3.1 管道通信原理

  • 匿名管道 pipe() 创建的文件描述符对 pipefd[0](读)、pipefd[1](写)是单向的;
  • 父子进程继承管道文件描述符,通过关闭不需要的端实现'父写子读';
  • 当写端关闭后,读端 read() 会返回 0,子进程通过这个信号判断退出。

3.2 轮询负载均衡

Next() 函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。

3.3 进程回收的坑

Stop() 函数中 version2 失败的原因:父进程关闭管道后立即 waitpid(),子进程可能还在阻塞读管道,此时父进程 waitpid() 会阻塞,而子进程读取到管道关闭后退出,但若所有子进程都处于这种状态,会导致死锁。version1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。

总结:
版本 2 失败的根本原因是父进程在等待一个子进程时,其他子进程的写端并未关闭 (因为都是继承了父进程,关闭了一个,但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的),导致它们无法退出,从而形成串行阻塞。版本 1 通过先关闭所有写端,让子进程并发退出,避免了这一风险。因此,在实际开发中,应当采用版本 1 或类似策略来确保进程池能够优雅地停止。

四. 完整代码展示

#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>

#define __MAIN__

//////////////////////////////////////////////任务测试代码///////////////////////////////////////
using task_t = std::function<void()>;

void PrintLog() {
    std::cout << "我是一个打印日志的任务,pid" << getpid() << std::endl;
}

void DownLoad() {
    std::cout << "我是一个下载任务,pid" << getpid() << std::endl;
}

void ReadMysql() {
    std::cout << "我是一个访问数据库的任务,pid" << getpid() << std::endl;
}

void WriteRedis() {
    std::cout << "我是一个访问 redis 的任务,pid" << getpid() << std::endl;
}

std::vector<task_t> gtasks;

void LoadTask() {
    gtasks.push_back(PrintLog);
    gtasks.push_back(DownLoad);
    gtasks.push_back(ReadMysql);
    gtasks.push_back(WriteRedis);
}

// *: 输出型参数
// const &: 输入型参数
// &: 输入输出型
void RandomTask(std::vector<int>* out) {
    for (int i = 0; i < 50; i++) {
        int code = rand() % gtasks.size();
        usleep(23223);
        out->push_back(code);
    }
}

#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIES_TASK 3

std::string TaskToString(int code) {
    switch (code) {
    case LOG_TASK:
        return "PrintLog";
    case DOWNLOAD_TASK:
        return "DownLoad";
    case MYSQL_TASK:
        return "ReadMysql";
    case REDIES_TASK:
        return "WriteRedies";
    default:
        return "Unknown";
    }
}

/////////////////////////进程池代码////////////////////////
void Work(int rfd) {
    while (true) {
        int code = 0;
        ssize_t n = read(rfd, &code, sizeof(code));
        if (n > 0 && n == sizeof(int)) {
            if (code >= 0 && code < gtasks.size()) {
                gtasks[code]();
            }
        } else if (n == 0) {
            break; // 子进程只要读到返回值为 0, 表明父进程让我退出
        } else {
            break;
        }
    }
}

class Channel {
public:
    Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
        _name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
    }

    int Fd() { return _wfd; }
    pid_t SubId() { return _sub_process_id; }
    std::string Name() { return _name; }

    void Close() {
        if (_wfd >= 0) close(_wfd);
    }

    void Wait() {
        pid_t rid = waitpid(_sub_process_id, nullptr, 0);
        (void)rid;
    }

    void SendTask(int taskcode) {
        ssize_t n = write(_wfd, &taskcode, sizeof(taskcode));
        (void)n;
    }

    ~Channel() {}

private:
    int _wfd;
    pid_t _sub_process_id;
    std::string _name;
};

class ProcessPool {
private:
    int Next() {
        int choice = _next_choice;
        _next_choice++;
        _next_choice %= _channels.size();
        return choice;
    }

public:
    ProcessPool(int number) : _number(number), _next_choice(0) {
        std::cout << "number: " << _number << std::endl;
    }

    // 父进程
    void Start() {
        for (int i = 0; i < _number; i++) {
            // 1. 创建管道
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0) {
                perror("pipe");
                exit(2);
            }
            // 2. 创建子进程
            pid_t id = fork();
            if (id < 0) {
                perror("fork");
                exit(3);
            } else if (id == 0) // 子进程
            {
                // 关闭父进程历史的 wfd!
                for (auto& channel : _channels) channel.Close();
                close(pipefd[1]);
                Work(pipefd[0]);
                close(pipefd[0]);
                exit(0);
            } else // 父进程
            {
                close(pipefd[0]);
                _channels.emplace_back(pipefd[1], id);
            }
        }
    }

    // 1. 什么任务?任务码决定
    // 2. 任务给谁?属于进程池内部操作,负载均衡 (我这里是用的轮询的机制)
    void PushTask(int taskcode) {
        // 选择一个子进程
        int who = Next();
        _channels[who].SendTask(taskcode);
        std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]" << "给:" << _channels[who].Name() << std::endl;
    }

    // 有版本存在一些问题,后续会说为什么
    void Stop() {
        // version1 -- 可以成功
        // 1. 关闭 wfd
        for (auto& channel : _channels) {
            channel.Close();
            std::cout << channel.Name() << " close success!" << std::endl;
        }
        sleep(3);
        // 2. 回收子进程
        for (auto& channel : _channels) {
            channel.Wait();
            std::cout << channel.Name() << " wait success!" << std::endl;
        }
        // version2 -- 不能成功???
        // for(auto& channel: _channels)
        // {
        //     channel.Close();
        //     channel.Wait();
        //     std::cout << channel.Name() << " close and wait success!" << std::endl;
        // }
        // version3 -- 可以成功
        // int end = _channels.size() - 1;
        // while(end >= 0)
        // {
        //     _channels[end].Close();
        //     _channels[end].Wait();
        //     std::cout << channel.Name() << " close and wait success!" << std::endl;
        //     end--;
        // }
    }

    void DebugPrint() {
        std::cout << "------------------------------------" << std::endl;
        for (auto& channel : _channels) {
            std::cout << channel.Fd() << std::endl;
            std::cout << channel.SubId() << std::endl;
            std::cout << channel.Name() << std::endl;
        }
        std::cout << "------------------------------------" << std::endl;
    }

    ~ProcessPool() {}

private:
    std::vector<Channel> _channels;
    int _number;
    int _next_choice;
};

// 父进程
#ifdef __MAIN__
static void Usage(const std::string &proc) {
    std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}

// ./process_pool 5
int main(int argc, char* argv[]) {
    if (argc != 2) {
        Usage(argv[0]);
        exit(1);
    }
    int number = std::stoi(argv[1]);

    // 0. 加载任务并随机生成任务
    srand(time(nullptr) ^ getpid());
    LoadTask();
    std::vector<int> task_codes;
    RandomTask(&task_codes);

    // 1. 创建进程池对象
    std::unique_ptr<ProcessPool> pp = std::make_unique<ProcessPool>(number);

    // 2. 启动进程池
    pp->Start();
    sleep(2);

    for (auto task : task_codes) {
        pp->PushTask(task);
        usleep(500000);
    }

    // 自己输入发送任务
    // while(true)
    // {
    //     int code = 0;
    //     std::cout << "Please Enter Your Task# ";
    //     std::cin >> code;
    //     if(code < 0 || code > gtasks.size())
    //     {
    //         std::cout << "任务码错误,请重新输入" << std::endl;
    //         continue;
    //     }
    //     pp->PushTask(code);
    // }

    pp->Stop();
    return 0;
}
#endif

五. 编译与运行(附 Makefile)

process_pool: process_pool.cc
	g++ -o$@ $^ -std=c++14

.PHONY: clean

clean:
	rm -f process_pool
  • 编译:直接 make;
  • 运行:./process_pool 5(5 为子进程数量,可自定义);
  • 输出:可以看到任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

六. 扩展与优化方向

  • 错误处理:当前代码未处理 write()/read() 的错误返回值,实际场景应增加重试、日志记录;
  • 动态扩容:支持运行时增加/减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加 SIGCHLD 信号处理,异步回收子进程,避免僵尸进程。

目录

  1. Linux 进程池实现:基于管道的任务分发系统
  2. 前言
  3. 一. 核心设计思路
  4. 二. 代码模块拆解
  5. 2.1 任务定义与随机任务生成
  6. 2.2 子进程任务处理逻辑
  7. 2.3 通道(Channel)类:封装父子进程通信
  8. 2.4 进程池(ProcessPool)类:核心管理逻辑
  9. 2.5 主函数:进程池使用示例
  10. 三. 关键知识点解析
  11. 3.1 管道通信原理
  12. 3.2 轮询负载均衡
  13. 3.3 进程回收的坑
  14. 四. 完整代码展示
  15. 五. 编译与运行(附 Makefile)
  16. 六. 扩展与优化方向
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • AI 大模型系统学习路线:从入门到精通
  • Copilot 的 Ask、Edit、Agent、Plan 模式核心区别解析
  • OpenClaw 厂商全对比:主流 AI 智能体平台深度横评
  • whisper.cpp ggml-large-v3.bin 模型参数文件下载
  • 华为 OD 机试:压缩日志查询(多语言实现)
  • WorkBuddy 使用指南:配置 QQ 机器人实现桌面智能体
  • Linux Do 注册及 Claude AI 本地工具配置教程
  • Unity VR 全景视频高分辨率播放性能优化方案
  • Webmin CVE-2019-15107 漏洞深度解析:从后门到修复
  • llama.cpp Docker 部署:容器化推理服务搭建
  • 知网 AIGC 检测价格对比与省钱策略
  • 人工智能(AI)常见面试题及答案汇总
  • 高空长航时无人机热管理系统中抗辐照 MCU 的可靠性研究
  • FPGA 工程师职业方向详解:岗位分类与核心能力
  • PHP 低代码平台插件开发与架构设计
  • 竞争自适应重加权采样(CARS)算法在光谱数据变量选择中的应用
  • Flutter WebDriver 在 OpenHarmony 环境下的适配与实战
  • 用 Miniconda 快速搭建 Python 原型开发环境
  • OpenWebUI 联网搜索实战:用 SearXNG 让本地大模型获取实时信息
  • 清华大学发布 SuperTac 仿生多模态触觉传感器,感知精度接近人类

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online