Linux 进程通信核心原理与实战:匿名管道、命名管道、进程池
Linux 进程通信通过操作系统提供的共享内存空间实现。主要方式包括匿名管道(适用于有血缘关系的父子进程,基于 fork 共享文件描述符)和命名管道(FIFO,适用于无血缘关系进程,基于磁盘特殊文件)。进程池利用管道实现任务分发与负载均衡,需注意文件描述符继承导致的资源泄漏问题,正确关闭写端并回收子进程以避免僵尸进程。涵盖原理、特性、实操代码及常见 Bug 解决方案。

Linux 进程通信通过操作系统提供的共享内存空间实现。主要方式包括匿名管道(适用于有血缘关系的父子进程,基于 fork 共享文件描述符)和命名管道(FIFO,适用于无血缘关系进程,基于磁盘特殊文件)。进程池利用管道实现任务分发与负载均衡,需注意文件描述符继承导致的资源泄漏问题,正确关闭写端并回收子进程以避免僵尸进程。涵盖原理、特性、实操代码及常见 Bug 解决方案。

如果未来进程之间要协同,一个进程要把自己的数据交给另一个进程,或者一个进程要命令另外一个进程做其他事。但是,由于进程之间是具有独立性的,如果想把一个进程的数据交给另一个进程,基本不可能。
由此诞生出如何实现进程通信。
进程之间是有独立性的?还需要保证这两者之间可以进行通信,由此我们被迫使用第三者,他可以对这两个进程通信起来。那他是谁呢?无疑我们第一个想到的便是操作系统。
结论:进程间通信的前提是:让不同的进程看到同一份资源。这份资源由 OS 提供,而资源一定是某种形式的内存空间!
既然有了对进程通信概念理解,那么让进程实现通信就落实到资源这一块问题了。
进程通信分类如图所示。
我们前面有接触过管道,但是并没有谈及具体含义,现在就可以来解决这个问题了。
大家都知道,我们程序员是非常'偷懒'的,有可以复用的代码绝不会多写一遍,实际上这也是一种'巧智',从某方面提高了可维护性。
管道就是一种'取巧'的方式,它是基于文件的通信方法。
但是随着时代的发展,大家发现管道并不能解决所有问题,由于一批新问题的产生导致程序员必须创造一个真正的资源,供进程间通信,由此产生了 System V 标准和消息队列等...
System V 进程间通信 --> 单独设计通信模块 --> 制定标准了 --> 只能进行本地通信 (自己的电脑)。
这里谈及下 System V 的生态问题:
System V 标准并非由单一公司定制,而是由 AT&T(美国电话电报公司)主导开发的一套 UNIX 操作系统标准。
实际上定标准和实现标准对应的代码是两批人。
一般定标准的人都是不需要定标准的,而是有另一层进行代码编写。那为什么另一层凭什么听从定标准的人呢?因为定标准的人在技术上一定是处于技术领先的地位 --> 这意味着他的产品领先 --> 我所定下的标准,时代会跟着走。如果你不跟着我的标准走,那么你就被时代淘汰,就会没落。反之,如果你跟着我定的方向走,就能继续发展。--> 从而使你被迫听从定标准的这批人。
我们把从一个进程连接到另一个进程的一个数据流称为一个'管道'。
如何证明两边都是进程呢?
我们之前写过让父子进程同时向显示器进行打印内容,这是如何做到的呢?
让父子进程看到了同一份资源。在上图中,父进程创建子进程,子进程会以拷贝父进程的内核数据结构,此时父子都指向一个 struct file,那么 struct file 里是什么呢?里面不是有个文件缓冲区吗?这不意味着父子看到了同一份资源?那么就可以基于这个原理进行进程的通信。
父子进程可以看到同一份资源了,但是会出现一个问题:如果父进程写到 101 位置时,此时位置是停留在 101 这个位置的,此时子进程读是往 101 后读,因此会读到空内容。为了实现父子进程之间的通信,我们不得不让 struct file 也拷贝一份,这样便能实现父子进程间的通信。实际上,这便是管道的设计原理。
管道的定义:管道是一个基于文件系统的一个内存级的单向通信的文件,主要用来进程间通信 (IPC, Inter-Process Communication) 的。
看待管道,就如同看待文件一样!管道的使用和文件一致,迎合了'Linux 一切皆文件思想'。
创建匿名管道 [0] 表示:读 fd,[1] 表示:写 fd 成功返回 0,失败返回错误代码。
父子进程打开了这个文件,同时可以实现读写功能。但是,我们的这个文件是没有文件名的啊!这说明该文件是内存级文件!没有名字 --> 称为匿名管道。
既然是内存级文件,那么就不存在所谓的向磁盘刷新的概念,因此也不需要路径,文件名概念。
实际上,这种通信方式存在一定的缺陷:
父进程定义全局数据,本来就可以被子进程所看到(初始可见,但不是'共享修改',父子进程的内存空间是相互隔离的)啊!父子进程之间的缓冲区互相都看不到,修改各自的副本,无法通信。如果要对数据进行修改呢,如何做呢?所以就有了管道,但我们说管道是一种单向通信,因为如果父写子写就会出现数据交织的情况。
数据只能从写端流向读端,无法反向流动。
管道里面没有数据,读端就会被阻塞!
管道被写满了的话,就不在写入了!
read 会读到返回值为 0,表示读到文件结尾!
OS 会自动杀掉写进程!为什么这样做呢?OS 不会做无效动作,不会做浪费时间的事!
生活中,我们常常会听遣父母的安排,母亲对大儿说:最近工作有些累,帮母亲锤下背;对二女说:劳烦女儿今天扫扫地,倒下垃圾;对三孩说:再过一会要炒菜了,把刚买的白菜清洗下。三个孩子都等待着父母给他们派发任务。而在计算机中,匿名管道也可以实现这种需求。
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <functional>
// 4 种任务
// task_t[4];
using task_t = std::function<void()>;
void Download() {
std::cout << "我是一个 downlowd 任务" << std::endl;
}
void MySql() {
std::cout << "我是一个 MySQL 任务" << std::endl;
}
void Sync() {
std::cout << "我是一个数据刷新同步的任务" << std::endl;
}
void Log() {
std::cout << "我是一个日志保存任务" << std::endl;
}
std::vector<task_t> tasks; // 任务表
class Init {
public:
Init() {
tasks.push_back(Download);
tasks.push_back(MySql);
tasks.push_back(Sync);
tasks.push_back(Log);
}
};
Init ginit;
ProcessPool.hpp
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
#include <iostream>
#include <cstdlib>
#include <string>
#include <vector>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctime>
#include "Task.hpp"
const int gdefault_process_num = 5;
// typedef std::function<void (int fd)> func_t;
using callback_t = std::function<void(int fd)>;
// 先描述
class Channel {
public:
Channel() { }
Channel(int fd, const std::string &name, pid_t id) : _wfd(fd), _name(name), _sub_target(id) { }
void DebugPrint() { printf("channel name: %s, wfd: %d, target pid: %d\n", _name.(), _wfd, _sub_target); }
~() {}
{ _wfd; }
{ _name; }
{ _sub_target; }
{ (_wfd); }
{ rid = (_sub_target, , ); ()rid; }
:
_wfd;
std::string _name;
_sub_target;
};
{
:
{
who = index;
index++;
index %= _channels.();
x = () % tasks.();
std::cout << << _channels[who].() << << _channels[who].() << std::endl;
(_channels[who].(), &x, (x));
();
}
:
( num = gdefault_process_num) : _processnum(num) {
(() ^ () ^ );
}
~() { }
{
( i = ; i < _processnum; i++) {
();
pipefd[] = {};
n = (pipefd);
(n < ) ;
id = fork();
(id < ) ;
(id == ) {
(pipefd[]);
(pipefd[]);
();
}
(pipefd[]);
std::string name = + std::(i);
_channels.(pipefd[], name, id);
}
;
}
{
index = ;
() {
(index);
}
}
{
(count < ) ;
index = ;
(count) {
(index);
count--;
}
}
{ }
{ }
{
std::cout << ;
( &c : _channels) std::cout << c.() << ;
std::cout << std::endl;
}
{
( &c : _channels) {
c.();
c.();
}
}
:
_processnum;
};
main.cc
#include "ProcessPool.hpp"
int main() {
// 1. 创建进程池
ProcessPool pp(5);
// 2. 初始化进程池
pp.InitProcessPool([](int fd){
while(true) {
int code = 0;
//std::cout << "子进程阻塞:" << getpid() << std::endl;
ssize_t n = read(fd, &code, sizeof(code));
if(n == sizeof(code)) // 任务码
{
std::cout << "子进程被唤醒:" << getpid() << ", fd: " << fd << std::endl;
if(code >= 0 && code < tasks.size()) {
tasks[code]();
} else {
std::cerr << "父进程给我的任务码是不对的:" << code << std::endl;
}
} else if(n == 0) {
std::cout << "子进程应该退出了:" << getpid() << std::endl;
break;
} else {
std::cerr << "read fd: " << fd << ", error" << std::endl;
break;
}
}
});
pp.ProcessPoolPrintFd();
// 3. 控制进程池
pp.PollingCtrlSubProcess();
pp.();
std::cout << << std::endl;
;
}
// 1. 先让所有子进程结束
for (auto &c : _channels) {
c.Close();
}
// 2. 你在回收所有的子进程僵尸状态
for (auto &c : _channels) {
c.Wait();
std::cout << "回收子进程:" << c.Target() << std::endl;
}
在回收子进程的时候,我们让子进程的读端关闭,此时写端是正常写的,但是 OS 不会做浪费时间的事,因此会杀掉该进程,而我们利用这一情况,将所有子进程的读端全部关闭,再全部进行回收。
但实际上,我们会发现这两者是可以合并在一起的啊!!!可以边关闭对应的子进程的读端,边进行回收,完全是可以放在一个循环体里的。
for (auto &c : _channels) {
c.Close();
c.Wait();
}
但是回收子进程却发现报错了,这是为什么?实际上这个原因很难发现,因为要对底层细节剖析的非常明白才能解决,接下来小编说明出 bug 的原因:
既然最后一个管道由子进程连着,那么我们倒着关管道,同时回收子进程不就可以了?
for(int end = _channels.size()-1; end >= 0; end--) {
_channels[end].Close();
_channels[end].Wait();
}
但是这种实现方式不太符合逻辑,且实现的不够优雅,实在想正着回收子进程,该如何做呢?
// 子进程除了要关闭自己的 w,同时也要关闭,自己从父进程哪里继承下来的所有的之前进程 w 端
// 我的子进程,要关闭的,从父进程哪里继承下来的 wfd 都在哪里??
// _channels 本身是被子进程继承下去的.
// 1. 子进程不要担心,父进程会影响自己的_channels.
// 2. fork 之后,当前进程,只会看到所有的历史进程的 wfd,并不受后续父进程 emplace_backd 的影响
std::cout << "进程:" << getpid() << ", 关闭了:";
for(auto &c : _channels) {
std::cout << c.Fd() << " ";
c.Close();
}
std::cout <<"\n";
打开规则:
如果当前打开操作是为读而打开 FIFO 时:O_NONBLOCK disable:阻塞直到有相应进程为写而打开该 FIFO O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开 FIFO 时:O_NONBLOCK disable:阻塞直到有相应进程为读而打开该 FIFO O_NONBLOCK enable:立刻返回失败,错误码为 ENXIO
匿名管道是在内存级进行申请资源的特殊处理,而命名管道则是在磁盘上申请的特殊文件。当该文件被打开时会被系统做特殊处理,其表现是这个文件不需要往磁盘上刷新。
int mkfifo(const char *pathname, mode_t mode);
client 创建管道 tp,往 abc 文件读取内容,打开管道往管道里面写入内容;server 端打开管道并进行读取内容,再往 abc.backup 写入内容,完成文件之间的拷贝。
client.cpp
//读取管道,写入命名管道
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include"common.hpp"
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
int main(int argc, char *argv[]) {
mkfifo("tp", 0644);
int infd;
infd = open("abc", O_RDONLY);
if (infd == -1) ERR_EXIT("open");
int outfd;
outfd = open("tp", O_WRONLY);
if (outfd == -1) ERR_EXIT("open");
char buf[1024];
int n;
while ((n = read(infd, buf, 1024)) > 0) {
write(outfd, buf, n);
}
close(infd);
close(outfd);
return 0;
}
server.cpp
//读取管道,写入目标文件
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include"common.hpp"
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
int main(int argc, char *argv[]) {
int outfd;
outfd = open("abc.bak", O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (outfd == -1) ERR_EXIT("open");
int infd;
infd = open("tp", O_RDONLY);
if (outfd == -1) ERR_EXIT("open");
char buf[1024];
int n;
while ((n = read(infd, buf, 1024)) > 0) {
write(outfd, buf, n);
}
close(infd);
close(outfd);
unlink("tp");
return ;
}
common.hpp
#ifndef __COMMON_HPP__
#define __COMMON_HPP__
#include<iostream>
#include <cstdio>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
const std::string fifoname = "fifo";
mode_t mode = 0666;
//int size = 128;在 Linux 是能这样的
#define SIZE 128
#endif
NamePipe.hpp
#pragma once
#include "common.hpp"
const int defaultfd = -1;
class NamedPipe {
public:
NamedPipe(const std::string &name) : _name(name), _fd(defaultfd) { }
~NamedPipe() { }
bool Create() {
int n = mkfifo(_name.c_str(), mode);
if (n == 0) {
std::cout << "mkfifo success" << std::endl;
} else {
std::cout << "mkfifo failed" << std::endl;
perror("mkfifo");
return false;
}
return true;
}
void Close() {
if (_fd == defaultfd) return;
else close(_fd);
}
bool OpenForRead() {
_fd = open(_name.c_str(), O_RDONLY);
if (_fd < 0) {
perror("open");
return false;
}
std::cout << << std::endl;
;
}
{
_fd = (_name.(), O_WRONLY);
(_fd < ) {
();
;
}
;
}
{
buffer[SIZE] = {};
num = (_fd, buffer, (buffer) - );
(num > ) {
buffer[num] = ;
*out = buffer;
} (num == ) {
;
} {
;
}
;
}
{
(_fd, in.(), in.());
}
{
m = (_name.());
()m;
}
:
std::string _name;
_fd;
};
client.cpp
#include "NamedPipe.hpp"
int main() {
NamedPipe named_pipe(fifoname);
named_pipe.OpenForWrite();
while (true) {
std::cout << "Please Enter# ";
std::string line;
std::getline(std::cin, line);
named_pipe.Write(line);
}
named_pipe.Close();
return 0;
}
server.cpp
#include "NamedPipe.hpp"
int main() {
NamedPipe pp(fifoname);
pp.Create();
pp.OpenForRead();
std::string message;
while (true) {
bool res = pp.Read(&message);
if (!res) break;
std::cout << "client say@" << message << std::endl;
}
// 归还资源
pp.Close();
pp.Remove();
return 0;
}
因此,命名管道主要解决的是毫无关系的进程之间,进行文件级进程通信!!!**
客户端关闭,写端还在读,返回值为 0.
ls -l 查看文件权限

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online