跳到主要内容
极客日志极客日志
首页博客AI提示词GitHub精选代理工具
搜索
|注册
博客列表
C++算法

手搓简易 Linux 进程池:基于管道实现任务分发

综述由AI生成Linux 进程池通过预创建子进程避免频繁 fork 开销。基于匿名管道实现任务分发,父进程轮询将任务码写入管道,子进程读取执行。核心涉及管道通信机制、负载均衡策略及进程回收逻辑,重点解析了停止阶段可能出现的死锁问题及解决方案。

Pythonist发布于 2026/3/23更新于 2026/4/284 浏览
手搓简易 Linux 进程池:基于管道实现任务分发

手搓简易 Linux 进程池:基于管道实现任务分发

在 Linux 环境下,进程池是一种高效的并发编程模型。它通过预先创建一组子进程来处理任务,避免了频繁创建和销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解其核心设计思路、通信原理和任务分发机制。

核心设计思路

本进程池的核心逻辑如下:

  • 进程创建:父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 任务分发:父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 任务执行:子进程循环读取管道中的任务码,执行对应任务;
  • 资源回收:父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。

代码模块拆解

1. 任务定义与生成

这部分定义了进程池要执行的具体任务,以及随机生成任务的工具函数,模拟业务场景。

#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>

using task_t = std::function<void()>;

// 具体任务示例
void PrintLog() {
    std::cout << "我是一个打印日志的任务,pid" << getpid() << std::endl;
}

void DownLoad() {
    std::cout << "我是一个下载任务,pid" << getpid() << std::endl;
}

void ReadMysql() {
    std::cout << "我是一个访问数据库的任务,pid" << getpid() << std::endl;
}

void WriteRedis() {
    std::cout << "我是一个访问 Redis 的任务,pid" << getpid() << std::endl;
}

// 全局任务列表
std::vector<task_t> gtasks;

void LoadTask() {
    gtasks.push_back(PrintLog);
    gtasks.push_back(DownLoad);
    gtasks.push_back(ReadMysql);
    gtasks.push_back(WriteRedis);
}

// 随机生成任务码
void RandomTask(std::vector<int>* out) {
    for (int i = 0; i < 50; i++) {
        int code = rand() % gtasks.size();
        usleep(23223); // 模拟任务产生的时间间隔
        out->push_back(code);
    }
}

#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIS_TASK 3

std::string TaskToString(int code) {
    switch (code) {
        case LOG_TASK: return "PrintLog";
        case DOWNLOAD_TASK: return "DownLoad";
        case MYSQL_TASK: return "ReadMysql";
        case REDIS_TASK: return "WriteRedis";
        default: return "Unknown";
    }
}

2. 子进程工作逻辑

Work 函数是子进程的核心,负责从管道读取任务码并执行。当读到管道关闭信号时退出。

void Work(int rfd) {
    while (true) {
        int code = 0;
        // 阻塞读
        ssize_t n = read(rfd, &code, sizeof(code));
        
        if (n > 0 && n == sizeof(int)) {
            if (code >= 0 && code < gtasks.size()) {
                gtasks[code](); // 执行任务
            }
        } else if (n == 0) {
            break; // 管道写端关闭,退出
        } else {
            break; // 错误退出
        }
    }
}

3. 通道类封装

Channel 类封装了管道写端和子进程 ID 的关联,简化父进程对单个子进程的管理。

class Channel {
public:
    Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
        _name = "Channel-" + std::to_string(_sub_process_id) + "-" + std::to_string(_wfd);
    }

    int Fd() { return _wfd; }
    pid_t SubId() { return _sub_process_id; }
    std::string Name() { return _name; }

    void Close() {
        if (_wfd >= 0) close(_wfd);
    }

    void Wait() {
        pid_t rid = waitpid(_sub_process_id, nullptr, 0);
        (void)rid;
    }

    void SendTask(int taskcode) {
        ssize_t n = write(_wfd, &taskcode, sizeof(taskcode));
        (void)n;
    }

private:
    int _wfd;
    pid_t _sub_process_id;
    std::string _name;
};

4. 进程池管理逻辑

ProcessPool 类负责创建子进程、管理通道、分发任务和停止进程池。

class ProcessPool {
private:
    // 轮询选择下一个子进程
    int Next() {
        int choice = _next_choice;
        _next_choice++;
        _next_choice %= _channels.size();
        return choice;
    }

public:
    ProcessPool(int number) : _number(number), _next_choice(0) {
        std::cout << "number: " << _number << std::endl;
    }

    void Start() {
        for (int i = 0; i < _number; i++) {
            int pipefd[2];
            if (pipe(pipefd) < 0) {
                perror("pipe");
                exit(2);
            }

            pid_t id = fork();
            if (id < 0) {
                perror("fork");
                exit(3);
            } else if (id == 0) {
                // 子进程逻辑
                close(pipefd[1]); // 关闭写端
                Work(pipefd[0]);   // 执行工作
                close(pipefd[0]);
                exit(0);
            } else {
                // 父进程逻辑
                close(pipefd[0]); // 关闭读端
                _channels.emplace_back(pipefd[1], id);
            }
        }
    }

    void PushTask(int taskcode) {
        int who = Next();
        _channels[who].SendTask(taskcode);
        std::cout << "发送任务:" << TaskToString(taskcode) << "[" << taskcode << "]" 
                  << "给:" << _channels[who].Name() << std::endl;
    }

    void Stop() {
        // 版本 1:先批量关闭所有管道,再统一回收
        for (auto& channel : _channels) {
            channel.Close();
            std::cout << channel.Name() << " close success!" << std::endl;
        }
        sleep(3); // 等待子进程处理完最后任务并退出

        for (auto& channel : _channels) {
            channel.Wait();
            std::cout << channel.Name() << " wait success!" << std::endl;
        }
    }

private:
    std::vector<Channel> _channels;
    int _number;
    int _next_choice;
};

5. 主函数入口

#ifdef __MAIN__
static void Usage(const std::string &proc) {
    std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}

int main(int argc, char* argv[]) {
    if (argc != 2) {
        Usage(argv[0]);
        exit(1);
    }
    int number = std::stoi(argv[1]);

    srand(time(nullptr) ^ getpid());
    LoadTask();
    std::vector<int> task_codes;
    RandomTask(&task_codes);

    auto pp = std::make_unique<ProcessPool>(number);
    pp->Start();
    sleep(2); // 等待子进程初始化

    for (auto task : task_codes) {
        pp->PushTask(task);
        usleep(500000); // 模拟分发间隔
    }

    pp->Stop();
    return 0;
}
#endif

关键知识点解析

1. 管道通信原理

匿名管道 pipe() 创建的文件描述符对中,pipefd[0] 为读端,pipefd[1] 为写端。父子进程继承后,通过关闭不需要的端实现'父写子读'。当所有写端关闭后,读端 read() 会返回 0,子进程据此判断退出。

2. 轮询负载均衡

Next() 函数通过递增取模的方式循环选择子进程,确保任务均匀分发,避免单点过载。

3. 进程回收的坑

在 Stop() 阶段,如果父进程关闭某个管道的写端后立即调用 waitpid(),而该子进程可能还在阻塞读取其他未关闭的管道(因为 fork 继承),或者父进程在等待一个子进程时导致其他子进程无法收到退出信号,从而引发死锁或僵尸进程。

解决方案:采用'先批量关闭所有写端,等待一段时间,再统一回收'的策略(如上述代码中的版本 1)。这样能确保所有子进程同时感知到退出信号,避免串行阻塞。

编译与运行

使用 Makefile 进行编译:

process_pool: process_pool.cc
	g++ -o $@ $^ -std=c++14

.PHONY: clean
clean:
	rm -f process_pool
  • 编译:直接执行 make;
  • 运行:./process_pool 5(5 为子进程数量);
  • 输出:任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

扩展与优化方向

  • 错误处理:当前代码未处理 write()/read() 的错误返回值,实际场景应增加重试或日志记录;
  • 动态扩容:支持运行时增加或减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加 SIGCHLD 信号处理,异步回收子进程,彻底避免僵尸进程。

本文通过拆解一个极简的进程池实现,涵盖了 Linux 进程间通信、进程管理的核心知识点。这个进程池虽然简单,但包含了并发编程的关键设计思想,是学习 Linux 系统编程的绝佳案例。

目录

  1. 手搓简易 Linux 进程池:基于管道实现任务分发
  2. 核心设计思路
  3. 代码模块拆解
  4. 1. 任务定义与生成
  5. 2. 子进程工作逻辑
  6. 3. 通道类封装
  7. 4. 进程池管理逻辑
  8. 5. 主函数入口
  9. 关键知识点解析
  10. 1. 管道通信原理
  11. 2. 轮询负载均衡
  12. 3. 进程回收的坑
  13. 编译与运行
  14. 扩展与优化方向
  • 💰 8折买阿里云服务器限时8折了解详情
  • GPT-5.5 超高智商模型1元抵1刀ChatGPT中转购买
  • 代充Chatgpt Plus/pro 帐号了解详情
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 阿里开源纯前端浏览器自动化 PageAgent 技术解析
  • Virt-A-Mate (VAM) 虚拟实境模拟软件技术解析
  • AI 图形界面操作技术演进对职场岗位的影响分析
  • Python 异步编程进阶与 asyncio 高级应用
  • AKSHARE 结合 AI 实现金融数据爬取与分析
  • Flutter 集成 dart_openai 实现 OpenHarmony 应用 AI 对话
  • Flutter 2 跨平台开发实战指南:从基础到进阶
  • KingbaseES SQL 防火墙的工程化实践与性能分析
  • HS-FPN:微小目标检测的频域与空间感知架构
  • 程序员接单兼职平台盘点与选择指南
  • KingbaseES ksql 指南:创建与管理索引和视图
  • Python 数据库连接池深度解析与调优实战
  • ChatGPT Web Share 集成实战:API 优化与生产部署
  • Java 中对象的几种比较方式
  • Python 实现空间注意力神经网络(SANN)的设计与实战
  • Java 代码块详解:控制流、方法、实例、静态与同步代码块
  • DeerFlow 2.0 开源:打造真正的超级智能体
  • Stable Diffusion 本地运行硬件配置选择与优化指南
  • Whisper-medium.en 快速部署与配置指南
  • Llama-3.2V-11B-cot 数学几何题解能力演示:看图推理与定理应用

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online