跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
C++算法

IO 多路转接:Reactor 框架与 Epoll 机制的封装设计

深入讲解基于 Epoll 与 Reactor 模式的高并发网络服务器设计与实现。内容涵盖水平触发与边缘触发模式差异,Socket 与 Epoll 接口封装,连接管理对象设计,事件循环任务派发流程。重点解析非阻塞 IO 配置、读写缓冲区处理、回调机制绑定,以及引入线程池时解决 shared_ptr 循环引用的 weak_ptr 方案。

橘子海发布于 2026/2/9更新于 2026/5/3030 浏览
IO 多路转接:Reactor 框架与 Epoll 机制的封装设计

前言

在高并发成为系统标配的今天,网络编程、中间件开发、分布式通信等场景中,如何高效处理海量 IO 请求始终是开发者绕不开的核心命题。传统一连接一线程的同步阻塞模型,因线程资源耗尽、CPU 上下文切换频繁、内存占用过高等问题,难以应对万级甚至十万级的并发连接;即便引入线程池优化,也无法从根本上解决等待 IO 时线程闲置的资源浪费困境。

正是在这样的需求下,基于事件驱动与 IO 多路转接的 Reactor 模式应运而生。它以少量线程监听多 IO、事件触发业务处理的核心逻辑,成为解决高并发 IO 的经典架构。小到 Netty 的网络通信内核、Redis 的事件循环,大到 Nginx 的请求处理框架、Kafka 的消息接收模块,其底层都能看到 Reactor 模式的影子。理解 Reactor 模式的实现逻辑,是掌握高并发系统设计的关键钥匙。

本文围绕 Reactor 模式实现展开:不局限于抽象原理,而是从底层技术依赖(IO 多路转接调用)切入,一步步拆解事件循环的构建、组件间的协作逻辑,帮助你构建出一个基于 Reactor 模式的服务器。

一、Epoll 的工作模式

Epoll 有两种工作模式:水平触发(Level Triggered,简称 LT)和边缘触发(Edge Triggered,简称 ET)。这两种模式的核心差异在于何时通知应用程序某个文件描述符(fd)就绪,直接影响高并发 IO 处理的效率和编程复杂度。

  1. 水平触发(LT):默认模式,状态持续触发。 当一个文件描述符(如 socket)处于就绪状态(例如:有数据可读、可写,或发生异常)时,epoll 会持续通知应用程序,直到该就绪状态被消除(例如:数据被完全读取、缓冲区被写满)。
  2. 边缘触发(ET):状态变化触发,高效但复杂。 epoll 仅在文件描述符的就绪状态发生变化瞬间通知一次,之后无论该状态是否持续,都不再通知。即只有在读写资源从没就绪到就绪的时候才会进行通知。

我们通常会认为 ET 模式的效率更高:

  • ET 当资源就绪的时候只会通知一次,并不需要反复通知。并且如果上层没有将数据读取完毕,也不会再进行通知了;
  • 因为 ET 模式只会进行通知一次,因此其会倒逼着上层在进行读取时要将数据一次全部取完,这样就可以空出一个更大的接收缓冲区,对方也可以发送更多的数据。

二、Reactor 服务器

下面我们开始进行基于 Reactor 模式设计的高性能网络服务器,通过事件驱动和 IO 多路转接技术,高效处理海量并发连接。

2.1 对网络套接字进行封装

关于网络套接字的细节并非本文重点,此处直接展示实现代码:

const std::string defaultip_ = "0.0.0.0";
enum SockErr { SOCKET_Err, BIND_Err };

class Sock {
public:
    Sock(uint16_t port) : port_(port), listensockfd_(-1) {}

    void Socket() {
        listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (listensockfd_ < 0) {
            Log(Fatal) << "socket fail";
            exit(SOCKET_Err);
        }
        Log(Info) << "socket success";
    }

    void Bind() {
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_port = htons(port_);
        inet_pton(AF_INET, defaultip_.c_str(), &server.sin_addr);
        if (bind(listensockfd_, (struct sockaddr*)&server, sizeof(server)) < 0) {
            Log(Fatal) << "bind fail";
            exit(BIND_Err);
        }
        Log(Info) << "bind success";
    }

    void Listen() {
        if (listen(listensockfd_, 10) < 0) {
            Log(Warning) << "listen fail";
        }
        Log(Info) << "listen success";
    }

    int Accept() {
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int fd = accept(listensockfd_, (sockaddr*)&client, &len);
        return fd;
    }

    int Accept(std::string& ip, uint16_t& port) {
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int fd = accept(listensockfd_, (sockaddr*)&client, &len);
        port = ntohs(client.sin_port);
        char bufferip[64];
        inet_ntop(AF_INET, &client.sin_addr, bufferip, sizeof(bufferip) - 1);
        ip = bufferip;
        return fd;
    }

    int Get_fd() { return listensockfd_; }

    ~Sock() { close(listensockfd_); }

private:
    uint16_t port_;
    int listensockfd_;
};

2.2 对 Epoll 接口进行封装

关于 Epoll 具体的细节可参考相关文档,此处直接展示封装后的接口使用:

enum EpollErr { CREAR_Err };

class Epoll {
public:
    Epoll() {
        _epfd = epoll_create(1);
        if (_epfd < 0) {
            Log(Fatal) << "epoll_create fail";
            exit(CREAR_Err);
        }
        Log(Info) << "epoll create success";
    }

    void Add_fd(int fd, uint32_t event) {
        struct epoll_event epevt;
        epevt.events = event;
        epevt.data.fd = fd;
        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &epevt) < 0) {
            Log(Warning) << "epoll add error : " << strerror(errno);
        }
        Log(Info) << "epoll add success , fd : " << fd;
    }

    void Del_fd(int fd) {
        if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr) < 0) {
            Log(Warning) << "epoll del error : " << strerror(errno);
        }
        Log(Info) << "epoll del success , fd : " << fd;
    }

    void Mod_fd(int fd, uint32_t event) {
        struct epoll_event epevt;
        epevt.events = event;
        epevt.data.fd = fd;
        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &epevt) < 0) {
            Log(Warning) << "epoll mod error : " << strerror(errno);
        }
    }

    int Wait(struct epoll_event* ep_array, int max_size, int timeout) {
        return epoll_wait(_epfd, ep_array, max_size, timeout);
    }

private:
    int _epfd;
};

2.3 设计一个管理连接的类

因为 TCP 通信传递的是字节流,无法确定每次获取到的数据是一个有效报文,因此需要将所有获取到的数据先存储起来:

  1. 需要一个整形,存储连接对应的文件描述符;
  2. 需要两个缓冲区:输入缓冲区和输出缓冲区;
  3. 为降低代码耦合性,将不同文件描述符处理读写以及异常事件的方法也放到 Connection 类中。 这些方法的参数统一都设置为 std::shared_ptr<Connection>,保证跳转到外界执行代码时依旧可以拿到文件描述符的相关资源。
class Connection;
using func_t = std::function<void(std::shared_ptr<Connection>)>;

class Connection {
public:
    Connection(int fd, func_t recv, func_t sender, func_t exception)
        : _fd(fd), _Recv(recv), _Sender(sender), _Exception(exception) {}

private:
    int _fd; // 对应的文件描述符
    std::string _inbuffer; // 输入缓冲区
    std::string _outbuffer; // 输出缓冲区

public:
    func_t _Recv; // 处理接收的逻辑
    func_t _Sender; // 处理发送的逻辑
    func_t _Exception; // 处理出现异常时的逻辑
};

在该类中,后续需要先缓冲区中进行读写操作:

std::string& Get_Inbuffer() { return _inbuffer; }
std::string& Get_Outbuffer() { return _outbuffer; }
void Add_In(const std::string& mes) { _inbuffer += mes; }
void Add_Out(const std::string& mes) { _outbuffer += mes; }
int Get_fd() { return _fd; }

可能后续还需要使用一些操作,在后面再进行补充。

2.4 设计 Reactor 服务器类

  1. 需要一个 Sock 对象来从网络中获取客户端的连接;
  2. 需要一个 Epoll 对象来使用 epoll 多路转接的接口;
  3. 使用一个哈希表来存储每一个文件描述符与之对应的 Connection 资源,方便后面获取一个文件描述符的输入缓冲区和输出缓冲区;
  4. 还需要一个缓冲区,负责接收 epoll 模型等待结束后返回的就绪队列中的文件描述符信息。
class Rserver {
    static const int array_num_max = 1024;
public:
    Rserver(uint16_t port) : _sock_ptr(new Sock(port)), _epoll_ptr(new Epoll) {}

private:
    std::shared_ptr<Sock> _sock_ptr;
    std::shared_ptr<Epoll> _epoll_ptr;
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _epl_array[array_num_max];
};

2.5 将文件描述符设置为非阻塞

在 ET 模式下,我们要保证一次将所有的资源都获取上来,因此我们需要 while 式的对资源进行读取,这就使得如果没有资源了我们也不能让其堵塞住,因此要将所有文件描述符设置为非阻塞状态。

此时使用 int fcntl(int fd, int op, ...) 接口进行设置:

int SetNoBlock(int fd) {
    int fl = fcntl(fd, F_GETFL);
    fl |= O_NONBLOCK;
    int n = fcntl(fd, F_SETFL, fl);
    return n;
}

2.6 所有文件描述符的处理方法

2.6.1 普通文件描述符的处理方法

首先就是普通文件的接收方法:

  1. 将缓冲区中的数据全部读取到 connection 中;
  2. 调用外界函数判断是否含有一个完整的报文;
  3. 含有完整报文就进行处理。

对于第二步,我们可以先向外界开放一个接口,让外界将数据进行处理,将处理好的数据再给我,由服务器进行发送,因此我们在服务端的类中添加一个成员,负责回调:

using callback_func = std::function<std::string(std::shared_ptr<Connection>)>;

class Rserver {
    static const int array_num_max = 1024;
public:
    Rserver(uint16_t port, callback_func Onmessage)
        : _sock_ptr(new Sock(port)), _epoll_ptr(new Epoll), _Onmessage(Onmessage) {}

private:
    std::shared_ptr<Sock> _sock_ptr;
    std::shared_ptr<Epoll> _epoll_ptr;
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _epl_array[array_num_max];
    callback_func _Onmessage; // 负责回调
};

关于普通文件描述符的接收问题,需要注意的就是 read 的不同返回值进行不同的处理:

void Recv(std::shared_ptr<Connection> con_ptr) {
    // 1. 将缓冲区中的数据全部读取到 Connection 中
    // 2. 调用外界函数判断是否含有一个完整的报文
    // 3. 先客户端返回结果
    char inbuffer[1024];
    while (1) {
        int n = read(con_ptr->Get_fd(), inbuffer, sizeof(inbuffer) - 1);
        if (n > 0) {
            // 有数据
            inbuffer[n] = 0;
            con_ptr->Add_In(inbuffer);
        } else if (n == 0) {
            // 对方关闭了文件,断开连接了
            // 1. 将文件描述符从 epoll 模型中移除
            // 2. 将文件描述符从哈希表中移除
            // 3. 将文件描述符关闭
            int fd = con_ptr->Get_fd();
            _epoll_ptr->Del_fd(fd);
            _connections.erase(fd);
            close(fd);
            return;
        } else {
            // 此次有两种情况:1. 数据读取完了 2. 读取出错了
            if (errno == EAGAIN) // 读取完了
                break;
            else // 出错了
            {
                // 此处调用文件对应的异常处理
                con_ptr->_Exception(con_ptr);
                return;
            }
        }
    }
    std::string ret = _Onmessage(con_ptr);
    con_ptr->Add_Out(ret);
}

接下来就是编写发送的接口:

思考:对于发送接口是否需要判断,写事件是否就绪???

在大多数时候,写事件都是就绪的;因此如果将其加入到判断中 epoll_wait 就会频繁的返回,会影响效率;所以一般不对写事件加入到等待中,除非写缓冲区满了,此时才将写加入到等待中。

  • 在代码中表现为:在调用 write 接口的时候,实际写入的大小比我字符串要小。
void Sender(std::shared_ptr<Connection> con_ptr) {
    // 进行数据的发送
    // 直接进行发送
    std::string& outbuffer = con_ptr->Get_Outbuffer();
    int fd = con_ptr->Get_fd();
    // 循环式的进行发送
    while (1) {
        int n = write(fd, outbuffer.c_str(), outbuffer.size());
        if (n > 0) {
            // 1. 将已经发送的数据从字符串中移除
            outbuffer.erase(0, n);
            if (outbuffer.empty()) break; // 已经写完了
        } else if (n == 0) {
            break;
        } else {
            if (errno == EAGAIN) // 已经写完了
                break;
            else // 出错了
            {
                // 此处调用文件对应的异常处理
                con_ptr->_Exception(con_ptr);
                return;
            }
        }
    }
    // 判断发送缓冲区中是否还有数据
    if (!outbuffer.empty()) {
        // 发送缓冲区满了
        _epoll_ptr->Mod_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET);
    } else {
        // 缓冲区没满,不需要对写事件进行检测
        _epoll_ptr->Mod_fd(fd, EPOLLIN | EPOLLET);
    }
}

最后一步就是对异常情况的处理了:

  1. 打印日志信息;
  2. 将文件描述符从 epoll 模型从移除;
  3. 将文件描述符从哈希表中移除;
  4. 关闭文件描述符。
void Exception(std::shared_ptr<Connection> con_ptr) {
    int fd = con_ptr->Get_fd();
    _epoll_ptr->Del_fd(fd);
    _connections.erase(fd);
    close(fd);
}
2.6.2 套接字的处理方法

对于套接字来说,只需要负责将建立好的链接拿上来就行了,不需要进行写入和异常处理。

在创建为新的文件描述符创建 Connection 对象的时候,我们需要传入可执行对象,但是我们在进行统一接口的时候参数都是 std::shared_ptr<Connection>,并且上述的 Recv, Sender, Expection 都是类成员函数,都有一个隐含的参数 this 指针,所以对于可调用对象在进行传参的是否要使用 bind 进行绑定。

void Accept(std::shared_ptr<Connection> con_ptr) {
    // 1. 获取文件描述符
    while (1) {
        int newfd = _sock_ptr->Accept();
        if (newfd >= 0) {
            // 有新连接
            // 2. 将文件描述符设置为非阻塞
            // 3. 将文件加入到 epoll 模型中
            // 4. 将文件描述符加入到哈希表中
            if (SetNoBlock(newfd) < 0) {
                Log(Warning) << "set no block fail";
                continue;
            }
            _epoll_ptr->Add_fd(newfd, EPOLLIN | EPOLLET);
            std::shared_ptr<Connection> con_ptr(new Connection(
                newfd,
                std::bind(&Rserver::Recv, this, std::placeholders::_1),
                std::bind(&Rserver::Sender, this, std::placeholders::_1),
                std::bind(&Rserver::Exception, this, std::placeholders::_1)
            ));
            _connections.emplace(newfd, con_ptr);
        } else {
            if (errno == EAGAIN)
                break;
            else {
                // 出错了
                Log(Warning) << "accept fail";
            }
        }
    }
}

2.7 初始化服务器

  1. 创建套接字;
  2. 进行绑定;
  3. 设置监听模式;
  4. 将网络套接字加入到 epoll 模型中,并创建 connection 加入到_connections 中进行管理;
  5. 在创建 Connection 对象的时候,我们还需要设计一个套接字的 Recv 方法。
void Init() {
    // 1. 创建套接字
    // 2. 进行绑定
    // 3. 设置监听模式
    // 4. 将网络套接字加入到 epoll 模型中,并创建 Connection 加入到_connections 中进行管理
    _sock_ptr->Socket();
    _sock_ptr->Bind();
    _sock_ptr->Listen();

    int listensock = _sock_ptr->Get_fd();
    SetNoBlock(listensock);
    _epoll_ptr->Add_fd(listensock, EPOLLIN | EPOLLET);

    std::shared_ptr<Connection> conptr(new Connection(
        listensock,
        std::bind(&Rserver::Accept, this, std::placeholders::_1),
        nullptr,
        nullptr
    ));
    _connections.emplace(listensock, conptr);

    // 将 IP 和端口号设置为可复用的
    int opt = 1;
    setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}

2.8 进行任务派发

因为我们之前已经将每个文件描述符对应的处理方法加入到了 Connection 对象中了,因此直接进行调用即可。

在进行任务派发的时候有一个细节:可以将异常处理嫁接到读写事件中的异常处理,这样就不需要再单独对异常进行处理了。

void Dispatcher(int n) {
    for (int i = 0; i < n; i++) {
        int fd = _epl_array[i].data.fd;
        short events = _epl_array[i].events;
        auto& con_ptr = _connections[fd];
        // 将异常处理,转化为读写处理
        if (events & EPOLLERR) {
            events |= (EPOLLIN | EPOLLOUT);
        }
        if (_connections.count(fd) && con_ptr->_Recv) {
            con_ptr->_Recv(con_ptr);
        }
        if (_connections.count(fd) && con_ptr->_Sender) {
            con_ptr->_Sender(con_ptr);
        }
    }
}

服务器的主循环

服务器的主循环就比较简单了,直接进行 epoll_wait 即可,将操作系统中的就绪队列拿到:

void Run() {
    while (1) {
        int n = _epoll_ptr->Wait(_epl_array, array_num_max, -1);
        if (n > 0) {
            Dispatcher(n);
        } else if (n == 0) {
            Log(Info) << "no message";
        } else {
            Log(Warning) << "epoll wait fail";
        }
    }
}

以上就是整个服务器的实现过程了,下面我们对服务器接入一下事件,让服务器能够处理一些业务。

三、补充

3.1 实现在线计算器

此处我们引入手动实现序列化和反序列化的代码,来实现一个在线计算器:

std::string Onmessage(std::shared_ptr<Connection> con_ptr) {
    static Calculator cal;
    std::string& inbuffer = con_ptr->Get_Inbuffer();
    std::string ret = cal(inbuffer); // 对请求进行处理,返回一个序列化后的字符串
    return ret;
}

3.2 引入线程池

对于引入线程池,此代码就需要进行重构了。在 Connection 对象中我们需要存储一个 Server 的回指指针,但是此处不能直接使用 shared_ptr<> 否则会出现循环引用,因此要采用 weak_ptr 来实现。

但是注意:我们是在类的成员函数中使用其 this 指针来构建一个 shared_ptr,从而初始化 weak_ptr;

如果在类的成员函数中,直接通过 this 指针创建新的 shared_ptr,会导致两个独立的 shared_ptr 管理同一个对象,但它们的引用计数是分开的:

  • 原有的 shared_ptr(创建服务器时候的)计数减到 0 时,会释放对象;
  • 新创建的 shared_ptr(this 指针创建的)计数减到 0 时,会再次尝试释放已被销毁的对象,导致双重释放(double free)或未定义行为。

此处我们需要使用 enable_shared_from_this 继承来进行解决:

  • 当类 T 继承 enable_shared_from_this 后,该类会隐式包含一个 weak_ptr 成员(内部维护)。当 T 的对象被 shared_ptr 管理时,这个 weak_ptr 会与管理该对象的 shared_ptr 共享控制块(记录引用计数的结构)。

此时,通过调用 shared_from_this() 方法,可返回一个指向自身的 shared_ptr,这个新的 shared_ptr 会复用原有的引用计数,避免双重释放。

服务器类定义:

class Rserver : public std::enable_shared_from_this<Rserver> {
public:
    // ......
};

在 Connection 类中增加一个成员:weak_ptr _loop_svr.

对于创建 Connection 对象部分也要进行修改:

std::shared_ptr<Connection> conptr(new Connection(
    listensock,
    shared_from_this(),
    std::bind(&Rserver::Accept, this, std::placeholders::_1),
    nullptr,
    nullptr
));

在第二个实参中,传入 this 指针来构建 Connection 中的 weak_ptr。

关于线程池部分的代码因为文章篇幅就不再叙述了,读者可在此基础上扩展线程池功能以进一步提升性能。

目录

  1. 前言
  2. 一、Epoll 的工作模式
  3. 二、Reactor 服务器
  4. 2.1 对网络套接字进行封装
  5. 2.2 对 Epoll 接口进行封装
  6. 2.3 设计一个管理连接的类
  7. 2.4 设计 Reactor 服务器类
  8. 2.5 将文件描述符设置为非阻塞
  9. 2.6 所有文件描述符的处理方法
  10. 2.6.1 普通文件描述符的处理方法
  11. 2.6.2 套接字的处理方法
  12. 2.7 初始化服务器
  13. 2.8 进行任务派发
  14. 服务器的主循环
  15. 三、补充
  16. 3.1 实现在线计算器
  17. 3.2 引入线程池
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • openYuanrong 分布式强化学习 Agent 训练实战指南
  • Web 端 IM 聊天信息加密的三种实现方案
  • 渗透测试基础概念、流程与内网渗透技术详解
  • 华为云码道 CodeArts 介绍
  • C++ 100 道高频经典面试题及答案解析
  • Python 日常高频写法与内置函数速查
  • C++上位机利用Snap7实现西门子S7-200与合信M226ES通信
  • 2026 毕业季 AIGC 检测标准解读与应对策略
  • Java 泛型核心概念:类、方法与上下限
  • 大语言模型 LoRA 技术综述:原理、变体与实战应用
  • macOS 终端工具实战:MobaXterm 替代方案与组合建议
  • Neovim + LazyVim 现代化配置指南 (Linux)
  • LeetCode 热题 100 算法回顾
  • MySQL 索引详解
  • Spring Boot 核心模块详解:12 个关键组件功能分析
  • AI 驱动的产品经理工作流:从需求挖掘到上线全流程管控
  • 从零搭建支持 ChatGPT 等多平台的 GEO 监控系统
  • Microsoft C++ Build Tools 安装与验证指南
  • Sora2 文生视频生成指南:漫剧创作与 Python 实现
  • Linux 系统版本控制工具 Git 基础教程

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online