跳到主要内容
Linux 进程池原理与实现:从核心逻辑到代码实战 | 极客日志
C++ 算法
Linux 进程池原理与实现:从核心逻辑到代码实战 Linux 进程池通过预创建子进程并复用,解决高并发场景下频繁创建销毁进程的系统开销问题。基于 C++ 和 Linux 系统调用(fork、pipe),从零实现一个基础进程池。核心包括任务封装、子进程工作循环、主从进程通信(Channel)及负载均衡策略。通过轮询分配任务,避免资源浪费,并妥善处理僵尸进程回收。代码涵盖完整流程,适合理解 IPC 机制与多进程管理。
KernelLab 发布于 2026/3/26 更新于 2026/5/21 13 浏览Linux 进程池原理与实现:从核心逻辑到代码实战
在 Linux 后台开发中,频繁创建和销毁进程会带来巨大的系统开销。进程创建需要分配资源、初始化 PCB,销毁则需要回收资源,尤其在高并发场景(如 Web 服务器、任务调度)中,这种开销会严重影响程序性能。而进程池,就是解决这个问题的利器。
今天我们就来从零实现一个简单但可复用的 Linux 进程池,从原理拆解到代码实现,带你搞懂进程池的核心逻辑,学会如何通过进程复用提升程序效率。
一、先搞懂:进程池是什么?
进程池,顾名思义,就是提前创建一定数量的子进程,通过匿名管道 统一管理、复用这些子进程来处理任务,而不是每次有任务就创建新进程、任务结束就销毁进程。
核心优势
降低系统开销 :避免频繁创建/销毁进程的资源消耗,子进程可重复使用
提升响应速度 :任务到来时,无需等待进程创建,直接分配空闲子进程处理
便于管理 :统一管理子进程的创建、销毁、任务分配,避免子进程泄露
二、手搓进程池:分步实现
1. 前期准备——定义任务类型与测试任务
首先定义任务的统一类型,封装各类测试任务,为后续进程池执行任务奠定基础。这是进程池'可处理多类型任务'的核心前提。
#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <memory>
#include <sys/wait.h>
#define __MAIN__
using = std::function< ()>;
{
std::cout << << () << std::endl;
}
{
std::cout << << () << std::endl;
}
{
std::cout << << () << std::endl;
}
{
std::cout << << () << std::endl;
}
std::vector< > gtasks;
{
gtasks. (printlog);
gtasks. (download);
gtasks. (readmysql);
gtasks. (writeredis);
}
{
( i= ; i< ; i++) {
code = () % gtasks. ();
( );
out-> (code);
}
}
{
(code) {
LOG_TASK: ;
DOWNLOAD_TASK: ;
MYSQL_TASK: ;
REDIS_TASK: ;
: ;
}
}
task_t
void
void printlog ()
"我是一个打印日志的任务,pid: "
getpid
void download ()
"我是一个下载任务,pid: "
getpid
void readmysql ()
"我是一个访问数据库的操作,pid: "
getpid
void writeredis ()
"我是一个访问 redis 的任务,pid: "
getpid
task_t
void LoadTask ()
push_back
push_back
push_back
push_back
void RandomTask (std::vector<int > *out)
for
int
0
50
int
rand
size
usleep
23223
push_back
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIS_TASK 3
std::string Task2String (int code)
switch
case
return
"printlog"
case
return
"download"
case
return
"readmysql"
case
return
"writeredis"
default
return
"unknown"
这里有个细节要注意:用 std::function<void()> 定义任务类型,实现'任务解耦',后续可轻松添加新任务,无需修改进程池核心逻辑。任务码(0-3)对应不同任务,通过 RandomTask 模拟真实场景中随机到来的任务请求。
2. 实现子进程工作逻辑 子进程的核心作用是'等待任务、执行任务',通过读取管道中的任务码,调用对应的任务函数,这是进程复用的核心逻辑。
void Work (int rfd) {
while (true ) {
int code = 0 ;
ssize_t n = read (rfd, &code, sizeof (code));
if (n == sizeof (int )) {
if (code >= 0 && code < gtasks.size ()) {
gtasks[code]();
}
} else if (n == 0 ) {
break ;
} else {
break ;
}
}
}
子进程通过 read 从管道读端获取任务码,阻塞等待任务,实现'空闲时待命、有任务时执行'。只有读取到完整的任务码(4 字节,int 类型),才执行任务,避免任务错乱。当主进程关闭管道写端,子进程读取到 n=0,主动退出,避免僵尸进程。
3. 封装 Channel 类——管理通信与子进程 Channel 类是进程池的'通信桥梁',封装了主进程与单个子进程的管道写端、子进程 PID,统一管理任务发送、管道关闭、子进程回收,简化进程池的管理逻辑。
class Channel {
public :
Channel (int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
_name = "Channel-" + std::to_string (_sub_process_id) + "-" + std::to_string (_wfd);
}
int Fd () { return _wfd; }
pid_t SubId () { return _sub_process_id; }
std::string Name () { return _name; }
void Close () {
if (_wfd >= 0 ) close (_wfd);
}
void Wait () {
pid_t rid = waitpid (_sub_process_id, nullptr , 0 );
(void )rid;
}
void SendTask (int taskcode) {
ssize_t n = write (_wfd, &taskcode, sizeof (taskcode));
(void )n;
}
~Channel () {}
private :
int _wfd;
pid_t _sub_process_id;
std::string _name;
};
每个子进程对应一个 Channel 对象,主进程通过 Channel 的 SendTask 发送任务,通过 Close 和 Wait 回收子进程。封装后,进程池无需直接操作管道和子进程 PID,降低耦合度。
4. 封装 ProcessPool 类——核心管理逻辑 ProcessPool 类是整个进程池的'管理者',负责创建子进程、管理所有 Channel、实现负载均衡分配任务、终止进程池,是面向对象设计的核心。
class ProcessPool {
private :
int Next () {
int choice = _next_choice;
_next_choice++;
_next_choice %= _channels.size ();
return choice;
}
public :
ProcessPool (int number) :_number(number), _next_choice(0 ) {}
void Start () {
for (int i = 0 ; i < _number; i++) {
int pipefd[2 ];
int n = pipe (pipefd);
if (n < 0 ) {
perror ("pipe:" );
exit (2 );
}
pid_t id = fork();
if (id < 0 ) {
perror ("fork:" );
exit (3 );
} else if (id == 0 ) {
close (pipefd[1 ]);
Work (pipefd[0 ]);
close (pipefd[0 ]);
exit (0 );
} else {
close (pipefd[0 ]);
_channels.emplace_back (pipefd[1 ], id);
}
}
}
void PushTask (int taskcode) {
int who = Next ();
_channels[who].SendTask (taskcode);
std::cout << "发送任务:" << Task2String (taskcode) << "[" << taskcode << "]" << "给:" << _channels[who].Name () << std::endl;
}
void Stop () {
int end = _channels.size () - 1 ;
while (end >= 0 ) {
_channels[end].Close ();
_channels[end].Wait ();
std::cout << _channels[end].Name () << " close and wait success!" << std::endl;
end--;
}
}
void DebugPrint () {
std::cout << "--------------------------------" << std::endl;
for (auto &channel : _channels) {
std::cout << channel.Fd () << std::endl;
std::cout << channel.SubId () << std::endl;
std::cout << channel.Name () << std::endl;
}
std::cout << "--------------------------------" << std::endl;
}
~ProcessPool () {}
private :
std::vector<Channel> _channels;
int _number;
int _next_choice;
};
Start() 函数批量创建子进程和管道,主进程关闭管道读端、保存写端到 Channel,子进程关闭写端、进入 Work 循环等待任务。Next() 函数通过轮询分配任务,确保所有子进程负载均匀。Stop() 函数从后往前关闭管道写端、回收子进程,确保所有资源被正确回收。
5. 主函数测试 主函数实现进程池的完整调用流程:解析命令行参数、加载任务、生成随机任务、启动进程池、推送任务、终止进程池。
#ifdef __MAIN__
static void Usage (const std::string &proc) {
std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}
int main (int argc, char *argv[]) {
if (argc != 2 ) {
Usage (argv[0 ]);
exit (1 );
}
int number = std::stoi (argv[1 ]);
srand (time (nullptr )^getpid ());
LoadTask ();
std::vector<int > task_codes;
RandomTask (&task_codes);
std::unique_ptr<ProcessPool> pp = std::make_unique <ProcessPool>(number);
pp->Start ();
sleep (2 );
for (auto task : task_codes) {
pp->PushTask (task);
usleep (500000 );
}
pp->Stop ();
return 0 ;
}
#endif
用 std::unique_ptr 管理 ProcessPool 对象,自动释放内存,避免手动管理内存导致的泄漏。命令行参数解析:输入进程池大小(如 ./process_pool 5,表示创建 5 个子进程),符合 Linux 程序的使用习惯。
三、编译运行与结果分析 process_pool: process_pool.cc
g++ -o $@ $^ -std=c++14
.PHONY : clean
clean:
rm -f process_pool
编译 :直接 make;
运行 :./process_pool 5(5 为子进程数量,可自定义);
5 个子进程循环复用,处理 50 个任务,无需频繁创建/销毁子进程;
销毁进程池时,所有子进程正常退出,资源被回收,无僵尸进程。
四、完整代码展示 为了方便大家直接测试和修改,以下是整合后的完整代码:
#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <memory>
#include <sys/wait.h>
#define __MAIN__
using task_t = std::function<void ()>;
void printlog () {
std::cout << "我是一个打印日志的任务,pid: " << getpid () << std::endl;
}
void download () {
std::cout << "我是一个下载任务,pid: " << getpid () << std::endl;
}
void readmysql () {
std::cout << "我是一个访问数据库的操作,pid: " << getpid () << std::endl;
}
void writeredis () {
std::cout << "我是一个访问 redis 的任务,pid: " << getpid () << std::endl;
}
std::vector<task_t > gtasks;
void LoadTask () {
gtasks.push_back (printlog);
gtasks.push_back (download);
gtasks.push_back (readmysql);
gtasks.push_back (writeredis);
}
void RandomTask (std::vector<int > *out) {
for (int i=0 ; i<50 ; i++) {
int code = rand () % gtasks.size ();
usleep (23223 );
out->push_back (code);
}
}
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIS_TASK 3
std::string Task2String (int code) {
switch (code) {
case LOG_TASK: return "printlog" ;
case DOWNLOAD_TASK: return "download" ;
case MYSQL_TASK: return "readmysql" ;
case REDIS_TASK: return "writeredis" ;
default : return "unknown" ;
}
}
void Work (int rfd) {
while (true ) {
int code = 0 ;
ssize_t n = read (rfd, &code, sizeof (code));
if (n == sizeof (int )) {
if (code >= 0 && code < gtasks.size ()) {
gtasks[code]();
}
} else if (n == 0 ) {
break ;
} else {
break ;
}
}
}
class Channel {
public :
Channel (int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) {
_name = "Channel-" + std::to_string (_sub_process_id) + '-' + std::to_string (_wfd);
}
int Fd () { return _wfd; }
pid_t SubId () { return _sub_process_id; }
std::string Name () { return _name; }
void Close () { if (_wfd >= 0 ) close (_wfd); }
void Wait () { pid_t rid = waitpid (_sub_process_id, nullptr , 0 ); (void )rid; }
void SendTask (int taskcode) { ssize_t n = write (_wfd, &taskcode, sizeof (taskcode)); (void )n; }
~Channel () {}
private :
int _wfd;
pid_t _sub_process_id;
std::string _name;
};
class ProcessPool {
private :
int Next () {
int choice = _next_choice;
_next_choice++;
_next_choice %= _channels.size ();
return choice;
}
public :
ProcessPool (int number) :_number(number), _next_choice(0 ) {}
void Start () {
for (int i = 0 ; i < _number; i++) {
int pipefd[2 ];
int n = pipe (pipefd);
if (n < 0 ) { perror ("pipe:" ); exit (2 ); }
pid_t id = fork();
if (id < 0 ) { perror ("fork:" ); exit (3 ); }
else if (id == 0 ) {
close (pipefd[1 ]);
Work (pipefd[0 ]);
close (pipefd[0 ]);
exit (0 );
} else {
close (pipefd[0 ]);
_channels.emplace_back (pipefd[1 ], id);
}
}
}
void PushTask (int taskcode) {
int who = Next ();
_channels[who].SendTask (taskcode);
std::cout << "发送任务:" << Task2String (taskcode) << "[" << taskcode << "]" << "给:" << _channels[who].Name () << std::endl;
}
void Stop () {
int end = _channels.size () - 1 ;
while (end >= 0 ) {
_channels[end].Close ();
_channels[end].Wait ();
std::cout << _channels[end].Name () << "close and wait success!" << std::endl;
end--;
}
}
void DebugPrint () {
std::cout << "--------------------------------" << std::endl;
for (auto &channel : _channels) {
std::cout << channel.Fd () << std::endl;
std::cout << channel.SubId () << std::endl;
std::cout << channel.Name () << std::endl;
}
std::cout << "--------------------------------" << std::endl;
}
~ProcessPool () {}
private :
std::vector<Channel> _channels;
int _number;
int _next_choice;
};
#ifdef __MAIN__
static void Usage (const std::string &proc) {
std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}
int main (int argc, char *argv[]) {
if (argc != 2 ) {
Usage (argv[0 ]);
exit (1 );
}
int number = std::stoi (argv[1 ]);
srand (time (nullptr )^getpid ());
LoadTask ();
std::vector<int > task_codes;
RandomTask (&task_codes);
std::unique_ptr<ProcessPool> pp = std::make_unique <ProcessPool>(number);
pp->Start ();
sleep (2 );
for (auto task : task_codes) {
pp->PushTask (task);
usleep (500000 );
}
pp->Stop ();
return 0 ;
}
#endif
五、进阶优化:让进程池更实用 我们手搓的是基础版进程池,实际生产中可根据需求优化以下几点:
动态调整进程池大小 :根据任务量自动增加/减少子进程(避免空闲子进程浪费资源);
任务类型扩展 :支持传入函数指针和参数,让进程池能处理不同类型的任务(而非固定打印);
非阻塞添加任务 :任务队列满时,返回错误而非阻塞,提高主进程灵活性;
信号处理优化 :处理 SIGCHLD 信号,及时回收异常退出的子进程,并重新创建子进程,保证进程池稳定性;
使用更高效的 IPC :将管道替换为消息队列(支持消息优先级)或共享内存(更高吞吐量)。
六、常见坑点与注意事项
管道读写阻塞 :子进程 read() 会阻塞,主进程 write() 在管道满时也会阻塞,需根据需求调整为非阻塞;
僵尸进程 :必须用 waitpid() 回收子进程,尤其是子进程异常退出时,避免僵尸进程占用资源;
管道关闭顺序 :主进程需先关闭写端,子进程才能读取到 EOF 并正常退出;
内存泄漏 :进程池销毁时,必须释放所有分配的内存;
任务队列溢出 :需设置合理的 max_task,避免任务堆积导致内存溢出。
七、总结 通过手搓这个基础版 Linux 进程池,我们搞懂了进程池的核心逻辑——提前创建子进程、复用子进程、统一管理任务 ,本质上是用空间换时间,减少进程创建/销毁的开销。
本文的代码虽然简单,但涵盖了进程池的核心流程,你可以在此基础上进行扩展,适配实际业务场景(如 Web 服务器的请求处理、后台任务调度等)。
如果运行过程中遇到问题,可重点检查管道操作、子进程回收和任务队列的逻辑,这些都是实现进程池的关键。动手敲一遍代码,你会对 Linux 进程控制和 IPC 有更深刻的理解~
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online