Linux 管道通信详解
一、进程间通信基础
进程间通信(IPC)是操作系统中不同进程交换信息的关键机制,涉及数据传输、资源共享、事件通知及进程控制等场景。
常见的 IPC 方式包括管道、System V IPC 和 POSIX IPC。由于进程拥有独立的地址空间,实现通信的前提是它们能访问同一份共享资源。
二、管道机制
1. 什么是管道
管道是类 Unix 系统中最古老的 IPC 方式之一。它将一个进程的输出连接到另一个进程的输入,形成单向数据流,因此也称为单工通信。

管道主要分为两类:匿名管道和命名管道。
2. 匿名管道
匿名管道(pipe)主要用于有亲缘关系的进程之间(如父子进程)。它本质上是内核管理的一块内存区域,通过一对文件描述符实现读写分离,随进程退出自动销毁。
创建管道使用 pipe 系统调用,参数为文件描述符数组。成功返回 0,失败返回错误码。通常 fd[0] 为读端,fd[1] 为写端。


匿名管道没有文件名和路径,仅靠文件描述符传递,因此只能用于继承文件描述符表的亲缘进程间通信。
通信行为特点
- 阻塞机制:子进程写得慢,父进程会阻塞等待;子进程写得快且管道满时,子进程也会阻塞。
- EOF 处理:读端读完数据且写端关闭后,read 返回 0。
- 异常终止:若写端一直写而读端关闭,操作系统会向写端发送 SIGPIPE 信号(通常为 13),导致进程异常终止。
- 生命周期:管道是内核临时对象,当所有持有描述符的进程关闭后自动销毁。
- 同步互斥:多进程通信自带同步机制,读空或写满时会自动阻塞。
代码示例
下面是一个简单的匿名管道通信示例,演示父子进程间的数据传输:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
int main() {
int pipefd[2] = {0};
if (pipe(pipefd) != 0) {
exit(1);
}
// pipefd 内容通常是 3 和 4,3 为读端,4 为写端
printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0], pipefd[1]);
pid_t id = fork();
if (id == 0) {
// 子进程:关闭读端,只写
close(pipefd[0]);
char *msg = "hello pipe";
int cnt = 5;
char outbuffer[256];
while (cnt) {
snprintf(outbuffer, sizeof(outbuffer), "子->父# %s %d", msg, cnt--);
write(pipefd[1], outbuffer, strlen(outbuffer));
sleep(1);
}
close(pipefd[1]);
exit(0);
} else {
// 父进程:关闭写端,只读
close(pipefd[1]);
char inbuffer[1024];
while (1) {
inbuffer[0] = 0;
ssize_t n = read(pipefd[0], inbuffer, sizeof(inbuffer) - 1);
if (n > 0) {
inbuffer[n] = 0; // 手动添加结束符
printf("%s\n", inbuffer);
} else if (n == 0) {
printf("管道读取结束\n");
close(pipefd[0]);
break;
} else {
perror("read");
break;
}
}
waitpid(id, NULL, 0);
}
return 0;
}

3. 命名管道
如果需要在无亲缘关系的进程间通信,可以使用命名管道(FIFO)。它本质上是一个特殊类型的文件,存在于文件系统中,双方通过路径即可访问。
创建命令:mkfifo 文件名
创建函数:mkfifo(const char *pathname, mode_t mode)
删除文件:unlink("文件名")


匿名管道由 pipe 创建并打开,命名管道由 mkfifo 创建后用 open 打开,两者在语义上基本一致。
三、实例:匿名管道实现进程池
进程池是指提前创建好多个子进程,需要任务时直接分配,避免频繁创建销毁的开销。
核心思路:父进程维护多个通道(管道),每个通道对应一个子进程。父进程随机选择通道发送任务,子进程监听通道接收任务执行。
#include <cassert>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <vector>
// 状态定义
enum { OK = 0, PIPE_ERR, FORK_ERR, READ_ERR, WRITE_ERR, WAIT_ERR };
const int gprocessnum = 7;
void task1() { std::cout << "这是下载数据任务" << std::endl; }
void task2() { std::cout << "这是打印日志任务" << std::endl; }
void task3() { std::cout << "这是刷新磁盘任务" << std::endl; }
void task4() { std::cout << "这是更新用户状态任务" << std::endl; }
typedef void (*task_t)();
const int gtasknum = 4;
task_t tasks[gtasknum] = {task1, task2, task3, task4};
class ProcessPool {
private:
class Channel {
private:
int _wfd;
pid_t _id;
std::string channel_name;
public:
Channel(int wfd, pid_t id) : _wfd(wfd), _id(id) {
channel_name = "channel-" + std::to_string(id);
}
void ClosePipe() { close(_wfd); }
void PrintInfo() {
printf("管道 wfd: %d, 通道名:%s\n", _wfd, channel_name.c_str());
}
int getfd() { return _wfd; }
const char* getname() { return channel_name.c_str(); }
void Wait() {
pid_t rid = waitpid(_id, nullptr, 0);
if (rid < 0) exit(WAIT_ERR);
std::cout << "回收子进程:" << _id << std::endl;
}
};
public:
ProcessPool() { srand(time(NULL)); }
void Init() { CreateProcessChannels(); }
void Debug() {
for (auto& channel : channels) channel.PrintInfo();
}
void Run() {
int cnt = 10;
while (cnt--) {
int itask = SelectTask();
int ichannel = SelectChannel();
printf("向通道%s发送任务 task%d...\n", channels[ichannel].getname(), itask + 1);
SendTask2Channel(itask, ichannel);
sleep(1);
}
}
void Quit() {
for (auto& channel : channels) {
channel.ClosePipe();
channel.Wait();
}
}
private:
void CreateProcessChannels() {
for (int i = 0; i < gprocessnum; i++) {
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0) exit(PIPE_ERR);
pid_t id = fork();
if (id < 0) exit(FORK_ERR);
else if (id == 0) {
// 子进程逻辑
if (!channels.empty()) {
for (auto& channel : channels) channel.ClosePipe();
}
close(pipefd[1]);
DoTask(pipefd[0]);
exit(OK);
} else {
// 父进程逻辑
close(pipefd[0]);
channels.emplace_back(pipefd[1], id);
printf("创建子进程%d成功\n", id);
}
}
}
void DoTask(int fd) {
while (1) {
int task_code;
ssize_t n = read(fd, &task_code, sizeof(task_code));
if (n == sizeof(task_code)) {
if (task_code >= 0 && task_code < gtasknum) {
tasks[task_code]();
}
} else if (n == 0) {
printf("%d任务退出\n", getpid());
break;
} else {
exit(READ_ERR);
}
}
}
int SelectTask() { return rand() % gtasknum; }
int SelectChannel() {
static int i = 0;
int selected = i;
i++;
i %= gprocessnum;
return selected;
}
void SendTask2Channel(int itask, int ichannel) {
assert(0 <= itask && itask < gtasknum && ichannel >= 0 && ichannel < gprocessnum);
ssize_t n = write(channels[ichannel].getfd(), &itask, sizeof(itask));
if (n < 0) exit(WRITE_ERR);
}
std::vector<Channel> channels;
};
int main() {
ProcessPool pp;
pp.Init();
pp.Debug();
pp.Run();
pp.Quit();
return 0;
}

四、实例:命名管道实现服务端客户端通信模型
命名管道适合非亲缘进程间的通信,典型应用如服务端与客户端模型。
1. 管道封装类
// Fifo.hpp
#pragma once
#include <cstdio>
#include <cstring>
#include <fcntl.h>
#include <iostream>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#define FOR_READ 1
#define FOR_WRITE 2
const std::string myfifo = "./fifo";
class Fifo {
public:
Fifo(const std::string& filename = myfifo) : _filename(filename), _mode(0666), _fd(-1) {}
void Build() {
if (IsExist()) return;
int n = mkfifo(_filename.c_str(), _mode);
if (n < 0) {
std::cerr << "mkfifo error: " << strerror(errno) << std::endl;
exit(1);
}
std::cout << "mkfifo success" << std::endl;
}
void Open(int mode) {
if (mode == FOR_READ) _fd = open(_filename.c_str(), O_RDONLY);
else if (mode == FOR_WRITE) _fd = open(_filename.c_str(), O_WRONLY);
if (_fd < 0) {
std::cerr << "open error: " << strerror(errno) << std::endl;
exit(2);
}
std::cout << "open success" << std::endl;
}
void Delete() {
if (!IsExist()) return;
int n = unlink(_filename.c_str());
if (n < 0) {
std::cerr << "delete error: " << strerror(errno) << std::endl;
exit(3);
}
std::cout << "delete success" << std::endl;
}
void Send(std::string& msgin) {
ssize_t n = write(_fd, msgin.c_str(), msgin.size());
}
int Receive(std::string& msgout) {
char buffer[128];
ssize_t n = read(_fd, buffer, sizeof(buffer) - 1);
if (n > 0) {
buffer[n] = '\0';
msgout = buffer;
return n;
} else if (n == 0) return 0;
else return -1;
}
private:
bool IsExist() {
struct stat st;
int n = stat(_filename.c_str(), &st);
if (n == 0) return true;
else {
errno = 0;
return false;
}
}
private:
std::string _filename;
mode_t _mode;
int _fd;
};
2. 服务端代码
// Server.cc
#include "Fifo.hpp"
int main() {
Fifo pipefile;
pipefile.Build();
pipefile.Open(FOR_READ);
std::string msg;
while (1) {
int n = pipefile.Receive(msg);
if (n > 0) {
std::cout << "客户端说:" << msg << std::endl;
} else {
break;
}
}
pipefile.Delete();
return 0;
}
3. 客户端代码
// Client.cc
#include "Fifo.hpp"
int main() {
Fifo fileclient;
fileclient.Open(FOR_WRITE);
while (1) {
std::cout << "请输入:" << std::endl;
std::string msg;
std::getline(std::cin, msg);
fileclient.Send(msg);
}
return 0;
}
运行上述代码,即可看到命名管道在服务端和客户端之间的实时通信效果。


