跳到主要内容
极客日志极客日志
首页博客AI提示词GitHub精选代理工具
搜索
|注册
博客列表
C++算法

Linux 管道通信实战:匿名进程池与命名管道服务端模型

Linux 进程间通信核心机制解析。涵盖匿名管道用于亲缘进程单向数据流传输,通过 pipe 函数创建,配合 fork 实现父子进程通信及进程池模型。命名管道(FIFO)解决非亲缘进程通信需求,基于文件系统路径,支持服务端客户端架构。提供完整 C/C++ 代码示例,包含文件描述符管理、读写阻塞机制及资源清理细节。

abccba发布于 2026/3/15更新于 2026/4/263 浏览
Linux 管道通信实战:匿名进程池与命名管道服务端模型

Linux 管道通信详解

一、进程间通信基础

进程间通信(IPC)是操作系统中不同进程交换信息的关键机制,涉及数据传输、资源共享、事件通知及进程控制等场景。

常见的 IPC 方式包括管道、System V IPC 和 POSIX IPC。由于进程拥有独立的地址空间,实现通信的前提是它们能访问同一份共享资源。

二、管道机制

1. 什么是管道

管道是类 Unix 系统中最古老的 IPC 方式之一。它将一个进程的输出连接到另一个进程的输入,形成单向数据流,因此也称为单工通信。

管道通信示意图

管道主要分为两类:匿名管道和命名管道。

2. 匿名管道

匿名管道(pipe)主要用于有亲缘关系的进程之间(如父子进程)。它本质上是内核管理的一块内存区域,通过一对文件描述符实现读写分离,随进程退出自动销毁。

创建管道使用 pipe 系统调用,参数为文件描述符数组。成功返回 0,失败返回错误码。通常 fd[0] 为读端,fd[1] 为写端。

pipe 函数示意图

pipe 返回值示意图

匿名管道没有文件名和路径,仅靠文件描述符传递,因此只能用于继承文件描述符表的亲缘进程间通信。

通信行为特点
  • 阻塞机制:子进程写得慢,父进程会阻塞等待;子进程写得快且管道满时,子进程也会阻塞。
  • EOF 处理:读端读完数据且写端关闭后,read 返回 0。
  • 异常终止:若写端一直写而读端关闭,操作系统会向写端发送 SIGPIPE 信号(通常为 13),导致进程异常终止。
  • 生命周期:管道是内核临时对象,当所有持有描述符的进程关闭后自动销毁。
  • 同步互斥:多进程通信自带同步机制,读空或写满时会自动阻塞。
代码示例

下面是一个简单的匿名管道通信示例,演示父子进程间的数据传输:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>

int main() {
    int pipefd[2] = {0};
    if (pipe(pipefd) != 0) {
        exit(1);
    }

    // pipefd 内容通常是 3 和 4,3 为读端,4 为写端
    printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0], pipefd[1]);

    pid_t id = fork();
    if (id == 0) {
        // 子进程:关闭读端,只写
        close(pipefd[0]);
        char *msg = "hello pipe";
        int cnt = 5;
        char outbuffer[256];
        while (cnt) {
            snprintf(outbuffer, sizeof(outbuffer), "子->父# %s %d", msg, cnt--);
            write(pipefd[1], outbuffer, strlen(outbuffer));
            sleep(1);
        }
        close(pipefd[1]);
        exit(0);
    } else {
        // 父进程:关闭写端,只读
        close(pipefd[1]);
        char inbuffer[1024];
        while (1) {
            inbuffer[0] = 0;
            ssize_t n = read(pipefd[0], inbuffer, sizeof(inbuffer) - 1);
            if (n > 0) {
                inbuffer[n] = 0; // 手动添加结束符
                printf("%s\n", inbuffer);
            } else if (n == 0) {
                printf("管道读取结束\n");
                close(pipefd[0]);
                break;
            } else {
                perror("read");
                break;
            }
        }
        waitpid(id, NULL, 0);
    }
    return 0;
}

匿名管道运行效果

3. 命名管道

如果需要在无亲缘关系的进程间通信,可以使用命名管道(FIFO)。它本质上是一个特殊类型的文件,存在于文件系统中,双方通过路径即可访问。

创建命令:mkfifo 文件名 创建函数:mkfifo(const char *pathname, mode_t mode) 删除文件:unlink("文件名")

mkfifo 函数示意图

unlink 函数示意图

匿名管道由 pipe 创建并打开,命名管道由 mkfifo 创建后用 open 打开,两者在语义上基本一致。

三、实例:匿名管道实现进程池

进程池是指提前创建好多个子进程,需要任务时直接分配,避免频繁创建销毁的开销。

核心思路:父进程维护多个通道(管道),每个通道对应一个子进程。父进程随机选择通道发送任务,子进程监听通道接收任务执行。

#include <cassert>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <vector>

// 状态定义
enum { OK = 0, PIPE_ERR, FORK_ERR, READ_ERR, WRITE_ERR, WAIT_ERR };

const int gprocessnum = 7;

void task1() { std::cout << "这是下载数据任务" << std::endl; }
void task2() { std::cout << "这是打印日志任务" << std::endl; }
void task3() { std::cout << "这是刷新磁盘任务" << std::endl; }
void task4() { std::cout << "这是更新用户状态任务" << std::endl; }

typedef void (*task_t)();
const int gtasknum = 4;
task_t tasks[gtasknum] = {task1, task2, task3, task4};

class ProcessPool {
private:
    class Channel {
    private:
        int _wfd;
        pid_t _id;
        std::string channel_name;
    public:
        Channel(int wfd, pid_t id) : _wfd(wfd), _id(id) {
            channel_name = "channel-" + std::to_string(id);
        }
        void ClosePipe() { close(_wfd); }
        void PrintInfo() {
            printf("管道 wfd: %d, 通道名:%s\n", _wfd, channel_name.c_str());
        }
        int getfd() { return _wfd; }
        const char* getname() { return channel_name.c_str(); }
        void Wait() {
            pid_t rid = waitpid(_id, nullptr, 0);
            if (rid < 0) exit(WAIT_ERR);
            std::cout << "回收子进程:" << _id << std::endl;
        }
    };

public:
    ProcessPool() { srand(time(NULL)); }

    void Init() { CreateProcessChannels(); }
    void Debug() {
        for (auto& channel : channels) channel.PrintInfo();
    }
    void Run() {
        int cnt = 10;
        while (cnt--) {
            int itask = SelectTask();
            int ichannel = SelectChannel();
            printf("向通道%s发送任务 task%d...\n", channels[ichannel].getname(), itask + 1);
            SendTask2Channel(itask, ichannel);
            sleep(1);
        }
    }
    void Quit() {
        for (auto& channel : channels) {
            channel.ClosePipe();
            channel.Wait();
        }
    }

private:
    void CreateProcessChannels() {
        for (int i = 0; i < gprocessnum; i++) {
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0) exit(PIPE_ERR);
            pid_t id = fork();
            if (id < 0) exit(FORK_ERR);
            else if (id == 0) {
                // 子进程逻辑
                if (!channels.empty()) {
                    for (auto& channel : channels) channel.ClosePipe();
                }
                close(pipefd[1]);
                DoTask(pipefd[0]);
                exit(OK);
            } else {
                // 父进程逻辑
                close(pipefd[0]);
                channels.emplace_back(pipefd[1], id);
                printf("创建子进程%d成功\n", id);
            }
        }
    }

    void DoTask(int fd) {
        while (1) {
            int task_code;
            ssize_t n = read(fd, &task_code, sizeof(task_code));
            if (n == sizeof(task_code)) {
                if (task_code >= 0 && task_code < gtasknum) {
                    tasks[task_code]();
                }
            } else if (n == 0) {
                printf("%d任务退出\n", getpid());
                break;
            } else {
                exit(READ_ERR);
            }
        }
    }

    int SelectTask() { return rand() % gtasknum; }
    int SelectChannel() {
        static int i = 0;
        int selected = i;
        i++;
        i %= gprocessnum;
        return selected;
    }

    void SendTask2Channel(int itask, int ichannel) {
        assert(0 <= itask && itask < gtasknum && ichannel >= 0 && ichannel < gprocessnum);
        ssize_t n = write(channels[ichannel].getfd(), &itask, sizeof(itask));
        if (n < 0) exit(WRITE_ERR);
    }

    std::vector<Channel> channels;
};

int main() {
    ProcessPool pp;
    pp.Init();
    pp.Debug();
    pp.Run();
    pp.Quit();
    return 0;
}

进程池运行效果

四、实例:命名管道实现服务端客户端通信模型

命名管道适合非亲缘进程间的通信,典型应用如服务端与客户端模型。

1. 管道封装类

// Fifo.hpp
#pragma once
#include <cstdio>
#include <cstring>
#include <fcntl.h>
#include <iostream>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#define FOR_READ 1
#define FOR_WRITE 2

const std::string myfifo = "./fifo";

class Fifo {
public:
    Fifo(const std::string& filename = myfifo) : _filename(filename), _mode(0666), _fd(-1) {}

    void Build() {
        if (IsExist()) return;
        int n = mkfifo(_filename.c_str(), _mode);
        if (n < 0) {
            std::cerr << "mkfifo error: " << strerror(errno) << std::endl;
            exit(1);
        }
        std::cout << "mkfifo success" << std::endl;
    }

    void Open(int mode) {
        if (mode == FOR_READ) _fd = open(_filename.c_str(), O_RDONLY);
        else if (mode == FOR_WRITE) _fd = open(_filename.c_str(), O_WRONLY);
        if (_fd < 0) {
            std::cerr << "open error: " << strerror(errno) << std::endl;
            exit(2);
        }
        std::cout << "open success" << std::endl;
    }

    void Delete() {
        if (!IsExist()) return;
        int n = unlink(_filename.c_str());
        if (n < 0) {
            std::cerr << "delete error: " << strerror(errno) << std::endl;
            exit(3);
        }
        std::cout << "delete success" << std::endl;
    }

    void Send(std::string& msgin) {
        ssize_t n = write(_fd, msgin.c_str(), msgin.size());
    }

    int Receive(std::string& msgout) {
        char buffer[128];
        ssize_t n = read(_fd, buffer, sizeof(buffer) - 1);
        if (n > 0) {
            buffer[n] = '\0';
            msgout = buffer;
            return n;
        } else if (n == 0) return 0;
        else return -1;
    }

private:
    bool IsExist() {
        struct stat st;
        int n = stat(_filename.c_str(), &st);
        if (n == 0) return true;
        else {
            errno = 0;
            return false;
        }
    }

private:
    std::string _filename;
    mode_t _mode;
    int _fd;
};

2. 服务端代码

// Server.cc
#include "Fifo.hpp"

int main() {
    Fifo pipefile;
    pipefile.Build();
    pipefile.Open(FOR_READ);
    std::string msg;
    while (1) {
        int n = pipefile.Receive(msg);
        if (n > 0) {
            std::cout << "客户端说:" << msg << std::endl;
        } else {
            break;
        }
    }
    pipefile.Delete();
    return 0;
}

3. 客户端代码

// Client.cc
#include "Fifo.hpp"

int main() {
    Fifo fileclient;
    fileclient.Open(FOR_WRITE);
    while (1) {
        std::cout << "请输入:" << std::endl;
        std::string msg;
        std::getline(std::cin, msg);
        fileclient.Send(msg);
    }
    return 0;
}

运行上述代码,即可看到命名管道在服务端和客户端之间的实时通信效果。

目录

  1. Linux 管道通信详解
  2. 一、进程间通信基础
  3. 二、管道机制
  4. 1. 什么是管道
  5. 2. 匿名管道
  6. 通信行为特点
  7. 代码示例
  8. 3. 命名管道
  9. 三、实例:匿名管道实现进程池
  10. 四、实例:命名管道实现服务端客户端通信模型
  11. 1. 管道封装类
  12. 2. 服务端代码
  13. 3. 客户端代码
  • 💰 8折买阿里云服务器限时8折了解详情
  • 💰 8折买阿里云服务器限时8折购买
  • 🦞 5分钟部署阿里云小龙虾了解详情
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • AIGC 背景下图文内容社区数据指标体系构建实践
  • Flutter anthropic_sdk_dart 鸿蒙化适配指南
  • JavaSE 核心知识点总结:面向对象、String 及 IO 流
  • 主流 AI 编程辅助工具横向对比:Cursor、Copilot 与国产方案解析
  • 用老 Mac 跑本地 AI:OpenClaw 环境一键搭建
  • AIGC 22 个基本概念详解:从原理到应用
  • LangChain4J Java 集成指南:从入门到高级应用
  • GLM-4.7-Flash 本地 Copilot 工具构建实战教程
  • Z-Image-Turbo WebUI 本地部署与使用指南
  • MCP Apps:重构 Web 应用,开启 AI 助手的小程序时代
  • Python 入门与应用:从基础语法到数据分析实战
  • C++与Linux基础:虚拟文件系统 VFS 详解
  • 归并排序与数组中的逆序对算法解析
  • VSCode + Continue + Ollama 搭建本地 AI 编程助手
  • OpenLLaMA 智能文案生成系统构建指南
  • N46Whisper 云端日语语音转字幕工具指南
  • GOPLA框架在Stretch 3机器人上实现空间常识突破
  • baoyu-skills:使用 AI 辅助技术文章配图与排版
  • OpenClaw 与本地千问模型搭建电脑 AI 助理
  • Ollama 本地大模型 WebAPI 调用实战指南

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online