【Linux系统】理解管道通信,匿名管道实现进程池+命名管道实现服务端客户端通信模型(附源码)
文章目录
一、进程间通信是什么
进程间通信(IPC),顾名思义,进程之间需要进行信息交换。
如:数据传输、资源共享、通知事件、进程控制。
进程间通信的方式有:管道、System V IPC、POSIX IPC。
由于进程具有独立性,进程间通信的前提就是,不同的进程能看到同一份资源。
二、管道
1. 什么是管道
管道是类Unix系统中最古老的进程间通信的方式。我们把从一个进程连接到另一个进程的数据流称为一个“管道”。

管道是单向通信的,称为单工通信。
管道分为匿名管道和命名管道。
2. 匿名管道
匿名管道(pipe)是亲缘进程间单向通信机制,本质是内核管理的一份文件,两个进程通过一对文件描述符实现一端读一端写,随进程退出自动销毁。匿名管道只能用在有血缘关系的进程之间!
系统调用pipe,用于创建一个匿名管道。

参数是一个文件描述符数组,管道创建后, fd[0]表示读端,fd[1]表示写端。
成功创建返回0,失败则返回错误码。

匿名管道是一个纯内存级的文件,不需要打开磁盘文件,没有路径,所以称为匿名管道。匿名管道没有名字、没有文件实体,只靠文件描述符来传递。这就是为什么它只能用在有血缘的进程之间,因为这些进程能拷贝文件描述符表,才能拿到同一根管道的读写端。
匿名管道通信有以下几种情况:
- 子进程写得慢,父进程就要阻塞等待。等到管道有数据,父进程才能读。
- 子进程写得快,父进程不读,管道一旦写满,子进程必须阻塞了。
- 读端一直读,写端关闭,读端读完管道中的数据时,read返回0,表明读到文件末尾。
- 写端一直写,读端关闭,操作系统会杀掉写端进程,进程异常终止,终止信号为13!
管道还有以下特点:
- 管道只能单向通信。如果想要两个进程间互相通信,需要创建两个管道。
- 匿名管道只能用在有血缘关系的进程之间,因为必须继承文件描述符表。
- 管道是面向字节流的。多次写入的字节流,在读取时可能被一次读取完,也可能被拆分成多次。
- 管道的生命周期随进程。管道是内核中的临时对象,没有持久化到磁盘。当所有持有管道文件描述符的进程都关闭后,管道会被内核自动销毁,数据也随之丢失。
- 管道通信,对于多进程,自带同步与互斥机制。读空管道时,读进程会阻塞等待数据;写满管道时,写进程会阻塞等待空间。
使用演示:
#include<stdio.h>#include<stdlib.h>#include<string.h>#include<sys/types.h>#include<unistd.h>#include<sys/wait.h>intmain(){int pipefd[2]={0};if(pipe(pipefd)!=0){exit(1);}// 根据文件描述符分配规则,这里pipefd内容应该是3 4,3为读端,4为写端printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0], pipefd[1]);// 下面测试子进程向父进程通信pid_t id =fork();if(id ==0){// 子进程中,要向管道写,所以要关闭读端,也就是关闭文件描述符pipefd[0]close(pipefd[0]);char* msg ="hello pipe";int cnt =5;char outbuffer[256];while(cnt){snprintf(outbuffer,sizeof(outbuffer),"子->父# %s %d", msg, cnt--);// 向管道中写write(pipefd[1], outbuffer,strlen(outbuffer));sleep(1);}close(pipefd[1]);exit(0);}// 父进程中,要从管道读,所以要关闭写端,也就是关闭文件描述符pipefd[1]close(pipefd[1]);char inbuffer[1024];while(1){ inbuffer[0]=0;// 从管道中读ssize_t n =read(pipefd[0], inbuffer,sizeof(inbuffer)-1);// -1为了给\0预留一个位置,避免缓冲区溢出。if(n >0){ inbuffer[n]=0;// 管道也是文件,结尾不会自动加\0,需要手动设置printf("%s\n", inbuffer);}elseif(n ==0){printf("管道读取结束\n");close(pipefd[0]);break;}else{perror("read");break;}}pid_t rid =waitpid(id,NULL,0);return0;}
3. 命名管道
匿名管道只能用在有血缘关系的进程之间。
如果我们想用在不相关的进程之间通信,可以使用命名管道(FIFO)完成!
无关的进程之间想要通信,必须看到同一份资源,所以命名管道必须有路径(名字),双方才都能看到他。
命名管道本质是一种特殊类型的文件——管道文件。
命名管道可以从命令行上创建,使用命令mkfifo 文件名
也可以在程序中创建,使用函数mkfifo,第一个参数是文件名,第二个参数是文件权限。成功创建返回0,失败返回-1:

命名管道使用完需要我们手动删除,可以使用函数unlink删除文件的方式:

匿名管道由pipe函数创建并打开;命名管道由mkfifo函数创建,用open打开。它们的唯一区别在于创建与打开的方式不同,这些工作完成时候,它们具有相同的语义。
三、实例:匿名管道实现进程池
#include<cassert>#include<cstdio>#include<cstdlib>#include<ctime>#include<iostream>#include<string>#include<sys/types.h>#include<sys/wait.h>#include<unistd.h>#include<vector>// 进程池,是指提前创建好多个子进程,在需要使用时直接分配任务。省去了创建子进程的开销// 父进程需要管理“通道”,组织管理子进程enum{ OK =0, PIPE_ERR, FORK_ERR, READ_ERR, WRITE_ERR, WAIT_ERR };// 全局定义好子进程数量,任务数量constint gprocessnum =7;voidtask1(){ std::cout <<"这是下载数据任务"<< std::endl;}voidtask2(){ std::cout <<"这是打印日志任务"<< std::endl;}voidtask3(){ std::cout <<"这是刷新磁盘任务"<< std::endl;}voidtask4(){ std::cout <<"这是更新用户状态任务"<< std::endl;}typedefvoid(*task_t)();// 任务表constint gtasknum =4; task_t tasks[gtasknum]={task1, task2, task3, task4};classProcessPool{private:// 内部类维护子进程通道classChannel{private:int _wfd;// 当前子进程通道的管道写端fd pid_t _id;// 子进程id std::string channel_name;// 自定义通道的名字public:Channel(int wfd, pid_t id):_wfd(wfd),_id(id){ channel_name ="channel-"+ std::to_string(id);}voidClosePipe(){close(_wfd);}voidPrintInfo(){printf("管道wfd: %d, 通道名: %s\n", _wfd, channel_name.c_str());}intgetfd(){return _wfd;}constchar*getname(){return channel_name.c_str();}voidWait(){ pid_t rid =waitpid(_id,nullptr,0);if(rid <0){// wait出错exit(WAIT_ERR);} std::cout <<"回收子进程: "<< _id << std::endl;}};public:ProcessPool(){// 种下随机数种子srand(time(NULL));}// 初始化进程池,创建好若干个子进程通道voidInit(){CreateProcessChannels();}// 打印通道信息验证voidDebug(){for(auto& channel : channels){ channel.PrintInfo();}}// 随机分配任务执行voidRun(){int cnt =10;while(cnt--){int itask =SelectTask();int ichannel =SelectChannel();printf("向通道%s发送任务task%d...\n", channels[ichannel].getname(), itask +1);SendTask2Channel(itask, ichannel);sleep(1);}}voidQuit(){for(auto& channel : channels){ channel.ClosePipe(); channel.Wait();}}private:voidCreateProcessChannels(){for(int i =0; i < gprocessnum; i++){int pipefd[2]={0};int n =pipe(pipefd);if(n <0){// 管道创建出错exit(PIPE_ERR);} pid_t id =fork();if(id <0){// 子进程创建出错exit(FORK_ERR);}elseif(id ==0){// 子进程 read// 子进程的文件描述符表是拷贝父进程的。父进程fd表中有指向其他管道的wfd,子进程必须关闭指向其他管道的wfd!if(!channels.empty()){for(auto& channel : channels) channel.ClosePipe();}// 子进程从管道中读数据,关闭写端close(pipefd[1]);// 子进程进入待执行任务状态,将来执行完成后回来退出DoTask(pipefd[0]);exit(OK);}else{// 父进程 write// 关闭管道读端close(pipefd[0]); channels.emplace_back(pipefd[1], id);printf("创建子进程%d成功\n", id);}}}voidDoTask(int fd){// 子进程需要持续监听管道,等待父进程下发任务,直到父进程主动关闭管道写端。while(1){int task_code; ssize_t n =read(fd,&task_code,sizeof(task_code));if(n ==sizeof(task_code)){// 根据读取到的task_code从任务表中选择函数执行if(task_code >=0&& task_code < gtasknum){ tasks[task_code]();}}elseif(n ==0){// 读到了文件尾,说明管道写端关闭了printf("%d任务退出\n",getpid());break;}else{// read出错exit(READ_ERR);}}}intSelectTask(){// 随机选一个任务returnrand()% gtasknum;}intSelectChannel(){// 依次选择子进程staticint i =0;int selected = i; i++; i %= gprocessnum;return selected;}voidSendTask2Channel(int itask,int ichannel){assert(0<= itask && itask < gtasknum && ichannel >=0&& ichannel < gprocessnum); ssize_t n =write(channels[ichannel].getfd(),&itask,sizeof(itask));if(n <0){// write出错exit(WRITE_ERR);}}private:// 组织所有的子进程通道 std::vector<Channel> channels;};intmain(){ ProcessPool pp;// 初始化进程池,创建好若干个子进程通道 pp.Init();// 打印通道信息验证 pp.Debug();// 随机分配任务执行 pp.Run();// 释放管道,回收子进程 pp.Quit();return0;}效果演示:

四、实例:命名管道实现服务端客户端通信模型
// Fifo.hpp#pragmaonce#include<cstdio>#include<cstring>#include<fcntl.h>#include<iostream>#include<string>#include<sys/stat.h>#include<sys/types.h>#include<unistd.h>#defineFORREAD1#defineFORWRITE2const std::string myfifo ="./fifo";classFifo{public:Fifo(const std::string& filename = myfifo):_filename(filename),_mode(0666),_fd(-1){}// 创建管道voidBuild(){// 如果管道文件已存在,就returnif(IsExist())return;int n =mkfifo(_filename.c_str(), _mode);if(n <0){ std::cerr <<"mkfifo error: "<<strerror(errno)<< std::endl;exit(1);} std::cout <<"mkfifo success"<< std::endl;}// 打开管道voidOpen(int mode){if(mode == FORREAD){ _fd =open(_filename.c_str(), O_RDONLY);}elseif(mode == FORWRITE){ _fd =open(_filename.c_str(), O_WRONLY);}if(_fd <0){ std::cerr <<"open error: "<<strerror(errno)<< std::endl;exit(2);} std::cout <<"open success"<< std::endl;}// 删除管道voidDelete(){if(!IsExist()){return;}int n =unlink(_filename.c_str());if(n <0){ std::cerr <<"delete error: "<<strerror(errno)<< std::endl;exit(3);} std::cout <<"delete success"<< std::endl;}// 发送消息voidSend(std::string& msgin){ ssize_t n =write(_fd, msgin.c_str(), msgin.size());}// 接受消息intReceive(std::string& msgout){char buffer[128]; ssize_t n =read(_fd, buffer,sizeof(buffer)-1);if(n >0){ buffer[n]='\0'; msgout = buffer;return n;}elseif(n ==0){return0;}else{return-1;}}private:boolIsExist(){structstat st;// stat函数用于查询一个文件的属性,如果查到了返回0// 利用这一点判断管道文件是否存在int n =stat(_filename.c_str(),&st);if(n ==0){returntrue;}else{ errno =0;// 消除这次失败对后面代码的影响returnfalse;}}private: std::string _filename; mode_t _mode;int _fd;};// Server.cc#include"Fifo.hpp"intmain(){// 服务端 创建并打开管道 Fifo pipefile; pipefile.Build(); pipefile.Open(FORREAD); std::string msg;while(1){int n = pipefile.Receive(msg);if(n >0){ std::cout <<"客户端说: "<< msg << std::endl;}else{break;}} pipefile.Delete();return0;}// Client.cc#include"Fifo.hpp"intmain(){// 客户端 写入信息 Fifo fileclient; fileclient.Open(FORWRITE);while(1){ std::cout <<"请输入:"<< std::endl; std::string msg; std::getline(std::cin, msg); fileclient.Send(msg);}return0;}效果演示:
命名管道实现服务端客户端通信演示
本篇完,感谢阅读