跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
C++算法

Linux 高性能网络模式:Reactor 反应堆模式

综述由AI生成Reactor 模式是一种事件处理设计模式,通过事件循环监听多个文件描述符并将事件分发给处理器。 Reactor 模式的核心思想、组成组件,并通过 C++ 封装 epoll、Connection、Reactor、Listener 和 IOService 实现了基于 Reactor 的计算器案例。最后分析了多路转接对写的处理及 One Thread One Loop 最佳实践。

怪力乱神发布于 2026/2/4更新于 2026/5/283.4K 浏览
Linux 高性能网络模式:Reactor 反应堆模式

文章配图

一、Reactor 模式

文章配图

Reactor 模式是一种事件处理设计模式,用于处理多个并发输入事件。它通过事件驱动的方式,将事件分发给相应的处理程序,从而实现对并发事件的高效处理。Reactor 模式广泛应用于网络编程、服务器框架等领域,例如 Java 的 NIO、Netty 框架,以及 C++ 的 Boost.Asio 等。

Reactor 模式的核心思想

Reactor 模式的核心思想是:

  • 事件驱动:通过事件循环(Event Loop)监听多个事件源(多个文件描述符)(如 Socket、文件描述符等),该事件驱动器可以采用 select,poll,epoll 等。
  • 事件分发:当事件发生时,Reactor 将事件分发给对应的事件处理器(Event Handler)。
  • 非阻塞:Reactor 模式通常与非阻塞 I/O 结合使用,避免线程阻塞,所以需要将 Socket、文件描述符等通过 fcntl 函数设置为非阻塞状态。

Reactor 模式的组成

Reactor 模式通常由以下几个组件组成:

  1. Loop(反应器 / 事件循环)

    • 负责监听事件源(多个文件描述符)(如 Socket、文件描述符等)。
    • 当事件发生时,将事件分发给对应的事件处理器。
  2. Dispatcher(事件多路分发器)

    • 当事件发生时,通知 Event Handler 进行处理。
  3. Event Handler(事件处理器)

    • 定义处理事件的接口。
    • 每个事件源对应一个事件处理器。

在这里插入图片描述

二、案例使用——基于 Reactor 的计算器

封装 epoll

#pragma once
#include <iostream>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Common.hpp"

using namespace LogModule;

namespace EpollMoudle {
class Epoller {
public:
    Epoller() : _epfd(-1) {}

    void Init() {
        _epfd = epoll_create(256);
        if (_epfd < 0) {
            LOG(LogLevel::ERROR) << "epoll_create error";
            exit(EPOLL_CREATE_ERR);
        }
        LOG(LogLevel::INFO) << "epoll_create success, epfd: " << _epfd;
    }

    int Wait(struct epoll_event revs[], int num, int timeout) // 输出就绪的 fd 和 events
    {
        int n = epoll_wait(_epfd, revs, num, timeout);
        if (n < 0) {
            LOG(LogLevel::WARNING) << "epoll_wait error";
        }
        return n;
    }

    void Ctrl(int sockfd, uint32_t events, int flag) {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = sockfd;
        int n = epoll_ctl(_epfd, flag, sockfd, &ev);
        if (n < 0) {
            LOG(LogLevel::WARNING) << "epoll_ctl error";
        }
    }

    void Add(int sockfd, uint32_t events) { Ctrl(sockfd, events, EPOLL_CTL_ADD); }
    void Update(int sockfd, uint32_t events) { Ctrl(sockfd, events, EPOLL_CTL_MOD); }
    void Delete(int sockfd) {
        int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
        if (n < 0) {
            LOG(LogLevel::WARNING) << "epoll_ctl error";
        }
    }

    ~Epoller() {}

private:
    int _epfd;
};
}

封装 Connection

所有的连接里都包含回指指针指向自己的 Reactor 模型

#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <functional>
#include "InetAddr.hpp"

class Reactor; // 循环依赖的问题
// 普通 fd,Listensockfd
// 让对 fd 的处理方式采用同一种方式
// 描述一个连接
class Connection {
public:
    Connection() : _sockfd(-1), _events(0) {
        // 自动获得当前系统的时间戳
    }

    void UpdateTime() {
        // 更新时间,重新获取时间
    }

    void SetPeerInfo(const InetAddr &peer_addr) { _peer_addr = peer_addr; }
    void SetSockfd(int sockfd) { _sockfd = sockfd; }
    int Sockfd() { return _sockfd; }
    void SetEvents(uint32_t events) { _events = events; }
    uint32_t GetEvents() { return _events; }
    void SetOwner(Reactor *owner) { _owner = owner; }
    Reactor* GetOwner() { return _owner; }

    void Append(const std::string &in) // 把收到的数据添加到自己的接受缓冲区
    { _inbuffer += in; }

    void AppendToOut(const std::string &out) { _outbuffer += out; }
    void DiscardOutString(int n) { _outbuffer.erase(0, n); }
    bool isOutBufferEmpty() { return _outbuffer.empty(); }
    std::string &OutString() { return _outbuffer; }
    std::string &InBuffer() // 故意
    { return _inbuffer; }

    void Close() {
        if (_sockfd >= 0) close(_sockfd);
    }

    // 回调方法
    virtual void Sender() = 0;
    virtual void Recver() = 0;
    virtual void Excepter() = 0;

    ~Connection() {}

private:
    int _sockfd; // 每一个文件描述符都有自己的输入输出缓冲区
    std::string _inbuffer;
    std::string _outbuffer;
    InetAddr _peer_addr; // 对应哪一个客户端
    // 添加一个指针
    Reactor *_owner;
    // 我关心的事件
    uint32_t _events; // 我这个 connection 关心的事件
    // lastmodtime
    uint64_t _timestamp;
};

封装 Reactor

#pragma once
#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"

using namespace EpollMoudle;
using connection_t = std::shared_ptr<Connection>;

class Reactor {
    const static int event_num = 64;

private:
    bool IsConnectionExists(int sockfd) {
        return _connections.find(sockfd) == _connections.end() ? false : true;
    }

public:
    Reactor() : _isrunning(false), _epoller(std::make_unique<Epoller>()) {
        _epoller->Init();
    }

    void InsertConnection(connection_t conn) {
        auto iter = _connections.find(conn->Sockfd());
        if (iter == _connections.end()) {
            // 1. 把连接,放到 unordered_map 中进行管理
            _connections.insert(std::make_pair(conn->Sockfd(), conn));
            // 2. 把新插入进来的连接,写透到内核的 epoll 中
            _epoller->Add(conn->Sockfd(), conn->GetEvents());
            // 3. 设置关联关系,让 connection 回指当前对象
            conn->SetOwner(this);
        }
    }

    void EnableReadWrite(int sockfd, bool readable, bool writeable) {
        if (IsConnectionExists(sockfd)) {
            // 修改用户层 connection 的事件
            uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
            _connections[sockfd]->SetEvents(events);
            // 写透到内核中
            _epoller->Update(sockfd, _connections[sockfd]->GetEvents());
        }
    }

    void DelConnection(int sockfd) {
        if (IsConnectionExists(sockfd)) {
            // 1. 从内核中移除对 sockfd 的关心
            _epoller->Delete(sockfd);
            // 2. 关闭特定的文件描述符
            _connections[sockfd]->Close();
            // 3. 从_connectionns 中移除对应的 connection
            _connections.erase(sockfd);
        }
    }

    // 基于事件驱动的事件派发器
    void Dispatcher(int n) {
        for (int i = 0; i < n; i++) {
            // 开始进行派发,派发给指定的模块
            int sockfd = _revs[i].data.fd;
            uint32_t revents = _revs[i].events;
            if ((revents & EPOLLERR) || (revents & EPOLLHUP)) revents = (EPOLLIN | EPOLLOUT); // 异常事件,转换成为读写事件

            if ((revents & EPOLLIN) && IsConnectionExists(sockfd)) {
                _connections[sockfd]->Recver();
            }
            if ((revents & EPOLLOUT) && IsConnectionExists(sockfd)) {
                _connections[sockfd]->Sender();
            }
        }
    }

    void LoopOnce(int timeout) {
        int n = _epoller->Wait(_revs, event_num, timeout);
        Dispatcher(n);
    }

    void DebugPrint() {
        std::cout << "Epoller 管理的 fd: ";
        for (auto &iter : _connections) {
            std::cout << iter.first << " ";
        }
        std::cout << std::endl;
    }

    void Loop() {
        _isrunning = true;
        // int timeout = -1;
        int timeout = 1000;
        while (_isrunning) {
            LoopOnce(timeout);
            DebugPrint();
            // 超时管理
            // 简单的,遍历_connections 判断当前时间 - connection 的最近访问时间 > XXX
            // 超时了
        }
        _isrunning = false;
    }

    void Stop() {
        _isrunning = false;
    }

    ~Reactor() {}

private:
    std::unique_ptr<Epoller> _epoller;
    std::unordered_map<int, connection_t> _connections; // fd: Connection 服务器内部所有的连接
    bool _isrunning;
    struct epoll_event _revs[event_num];
};

封装 Listener(专门负责获取连接的模块)

#pragma once
#include <iostream>
#include <memory>
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "IOService.hpp"
#include "Reactor.hpp"
#include "Protocol.hpp"
#include "Calculator.hpp"

using namespace SocketModule;
using namespace LogModule;

// 专门负责获取连接的模块
// 连接管理器
class Listener : public Connection {
public:
    Listener(uint16_t port) : _listensock(std::make_unique<TcpSocket>()), _port(port) {
        _listensock->BuildTcpSocketMethod(_port);
        SetSockfd(_listensock->Fd());
        SetEvents(EPOLLIN | EPOLLET);
    }

    virtual void Sender() override {}
    // 我们回调到这里 天然就有父类 connection
    virtual void Recver() override {
        // 读就绪,而且是 listensock 就绪
        // IO 处理 --- 获取新连接
        // 你怎么知道 一次来的就是一个连接呢 怎么保证一次读完了 ET 工作模式 !
        while (true) {
            InetAddr peer;
            int aerrno = 0;
            // accept 非阻塞的时候,就是 IO,我们就像处理 read 一样,处理 accept
            int sockfd = _listensock->Accepter(&peer, &aerrno);
            if (sockfd > 0) {
                // success
                // 不能直接读取 !
                // sockfd 添加到 epoll !
                // epollserver 只认 connection
                LOG(LogLevel::DEBUG) << "Accepter success: " << sockfd;
                // 普通的文件描述符,处理 IO 的 也是 connection
                // 2. sockfd 包装成为 Connection !
                auto conn = std::make_shared<IOService>(sockfd);
                conn->RegisterOnMessage(HandlerRequest);
                // 3. 插入到 EpollServer
                GetOwner()->InsertConnection(conn);
            } else {
                if (aerrno == EAGAIN || aerrno == EWOULDBLOCK) {
                    LOG(LogLevel::DEBUG) << "accepter all connection ... done";
                    break;
                } else if (aerrno == EINTR) {
                    LOG(LogLevel::DEBUG) << "accepter intr by signal, continue";
                    continue;
                } else {
                    LOG(LogLevel::DEBUG) << "accepter error ... Ignore";
                    break;
                }
            }
        }
    }

    virtual void Excepter() override {}

    int Sockfd() { return _listensock->Fd(); }

    ~Listener() {
        _listensock->Close();
    }

private:
    std::unique_ptr<Socket> _listensock;
    uint16_t _port;
};

封装 IOService(只负责 IO 的)

#pragma once
#include <iostream>
#include <memory>
#include <functional>
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "Reactor.hpp"

using func_t = std::function<std::string(std::string &)>;
using namespace SocketModule;
using namespace LogModule;

// 只负责 IO
class IOService : public Connection {
    static const int size = 1024;

public:
    IOService(int sockfd) {
        // 1. 设置文件描述符非阻塞
        SetNonBlock(sockfd);
        SetSockfd(sockfd);
        SetEvents(EPOLLIN | EPOLLET);
    }

    virtual void Sender() override {
        // UpdateTime();
        // 直接写
        while (true) {
            ssize_t n = send(Sockfd(), OutString().c_str(), OutString().size(), 0);
            if (n > 0) {
                // 成功
                DiscardOutString(n); // 移除 N 个
            } else if (n == 0) {
                break;
            } else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    // 缓冲区写满了,下次再来
                    break;
                } else if (errno == EINTR) {
                    continue;
                } else {
                    Excepter();
                    return;
                }
            }
        }
        // 一种:outbuffer empty
        // 一种:发送缓冲区写满了 && outbuffer 没有 empty 写条件不满足 使能 sockfd 在 epoll 中的事件
        if (!isOutBufferEmpty()) {
            // 修改对 sockfd 的读事件关心!--开启对写事件关心
            // 按需设置!
            GetOwner()->EnableReadWrite(Sockfd(), true, true);
            // 读事件一般常设 写事件一般按需设置
        } else {
            GetOwner()->EnableReadWrite(Sockfd(), true, false);
            // 按需设置 关闭了 !!!
        }
    }

    virtual void Recver() override {
        // UpdateTime();
        // 1. 读取所有数据
        while (true) // ET 模式
        {
            char buffer[size];
            ssize_t s = recv(Sockfd(), buffer, sizeof(buffer) - 1, 0); // 非阻塞
            if (s > 0) {
                buffer[s] = 0; // 读取成功
                Append(buffer);
            } else if (s == 0) {
                // 对端关闭连接
                Excepter();
                return;
            } else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break;
                } else if (errno == EINTR) {
                    continue;
                } else {
                    // 发生错误了
                    Excepter();
                    return;
                }
            }
        }
        // 走到下面,我一定把本轮数据读完了
        std::cout << "outbuffer: \n" << InBuffer() << std::endl;
        // 你能确保你读到的消息,就是一个完整的报文吗
        // 我怎么知道 读到了完整的请求呢?? 协议 ! ! !
        std::string result;
        if (_on_message) result = _on_message(InBuffer());
        // 添加应答信息
        AppendToOut(result);
        // 如何处理写的问题 outbuffer 发送给对方的问题
        if (!isOutBufferEmpty()) {
            // 方案一:Sender(); // 直接发送,推荐做法
            // Sender();
            // 方案二:使能 writeable 即可
            GetOwner()->EnableReadWrite(Sockfd(), true, true);
        }
    }

    virtual void Excepter() override {
        // IO 读取的时候,所有的异常处理,全部都会转化成为这一个函数的调用
        // 出异常 怎么做?
        // 打印日志 差错处理 关闭连接,Reactor 异常 connection,从内核中,移除对 fd 的关心
        LOG(LogLevel::INFO) << "客户端连接可能结束,进行异常处理:" << Sockfd();
        GetOwner()->DelConnection(Sockfd());
    }

    void RegisterOnMessage(func_t on_message) {
        _on_message = on_message;
    }

    ~IOService() {}

private:
    func_t _on_message;
};

Main.cc

#include <iostream>
#include <string>
#include "Log.hpp"
#include "Listener.hpp"
#include "Connection.hpp"
#include "Reactor.hpp"

using namespace LogModule;

int main(int argc, char *argv[]) {
    ENABLE_CONSOLE_LOG();
    if (argc != 2) {
        std::cout << "Usage: " << argv[0] << " port" << std::endl;
        return 0;
    }
    uint16_t local_port = std::stoi(argv[1]);
    Reactor reactor;
    auto conn = std::make_shared<Listener>(local_port);
    reactor.InsertConnection(conn);
    reactor.Loop();
    return 0;
}

三、复盘分析

多路转接对写的处理

  1. 写事件是否就绪:发送缓冲器,是否有空间!

有空间。sockfd,发送缓冲区中的空间,默认就是有的!!!,只有当他被填写满了,写条件才不具备

  1. 如何正确处理写入:直接写 (不会被阻塞),写缓冲区可能会越来越短,写满了,写条件不具备,才托管给 epoll,让它帮我关心!!!
  • 直接写!写入默认就是就绪的 !
  • 写入失败,才托管给 epoll!
  • 多路转接的方案设计的时候,写事件关心,永远不能常开启。
  • 写事件关心,按需设置 !
  • 读事件一般常设,写事件一般按需设置 !
  • 只要使能开启写,内核+epoll 会自动事件派发处理底层的发送任务。
  • 从此往后,我们专注于业务即可 !

最佳实践 One Thread One loop

一个 fd (Connection) 的全生命周期,只能由一个线程统一管理,这样就不会存在任何 (过多) 并发问题

在这里插入图片描述

🧊一个 loop 就是 一个 Reactor

在这里插入图片描述

四、总结

以上就是我对 【Linux】高性能网络模式:Reactor 反应堆模式 的理解。

目录

  1. 一、Reactor 模式
  2. Reactor 模式的核心思想
  3. Reactor 模式的组成
  4. 二、案例使用——基于 Reactor 的计算器
  5. 封装 epoll
  6. 封装 Connection
  7. 封装 Reactor
  8. 封装 Listener(专门负责获取连接的模块)
  9. 封装 IOService(只负责 IO 的)
  10. Main.cc
  11. 三、复盘分析
  12. 多路转接对写的处理
  13. 最佳实践 One Thread One loop
  14. 四、总结
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • Java结构体实战:歌唱比赛评分与旗鼓相当对手匹配
  • 【大模型 】API 对接指南:OpenAI/Claude/LLaMA 3 调用技巧
  • 2026年最新全球AI大模型深度研究报告
  • MySQL 在 C/C++ 中的使用
  • (5)ModalAI VOXL2
  • Java 二分查找算法详解、复杂度分析与 LeetCode 实战
  • OpenClaw 智能体实战:从零搭建你的第一个 AI 员工
  • C++ Qt 窗口框架与菜单栏 QMenuBar 使用指南
  • AI 大模型通信机制:流式传输与数据封装逻辑
  • 基于 PySide6 和 VTK 的 STL 文件曲面选择器与导出工具
  • 算法实战:Z 字形变换与外观数列模拟解法
  • VS Code Copilot 完整使用教程
  • 基于 FPGA 的图像形态学腐蚀处理 Verilog 开发与硬件测试
  • 大模型行业三大核心竞争力:资金、人才与数据
  • ComfyUI-BrushNet 图像编辑实战指南
  • C++ 哈希桶链地址法实现
  • ToDesk、顺网云与海马云部署 DeepSeek 对比评测
  • C++ 智能指针:使用场景、实现原理与内存泄漏防治
  • Xilinx FPGA 开发:Vivado 与 Vitis 流程解析
  • SpringDoc 基本使用指南

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online