跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
C++算法

Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现

综述由AI生成Linux 高级 IO 技术详解。文章介绍了基于边缘触发(ET)模式的 epoll 机制构建 Reactor 反应堆模型的 TCP 服务器实现。内容涵盖连接管理、非阻塞 IO 设置、读写缓冲区处理、事件分发器 Dispatcher 设计以及业务协议集成。通过 C++ 代码演示了 TcpServer、Connection、Epoller 等核心类的实现细节,解决了循环引用问题并实现了完整的请求响应流程。

极光发布于 2026/2/5更新于 2026/6/13.8K 浏览
Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现

Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现

前言

本文介绍基于边缘触发(ET)模式的 epoll 机制构建 Reactor 反应堆模型的 TCP 服务器实现。文章将在水平触发(LT)模式的基础上进行讲解,重点阐述如何将 epoll 设置为 ET 模式并实现完整的 Reactor 架构。

一、前置知识

  1. epoll 工作模式:epoll 默认使用 LT(Level Triggered)水平触发模式,也可设置为 ET(Edge Triggered)边缘触发模式。设置方法是在 epoll_ctl 中将事件按位与上 EPOLLET。
  2. 核心文件结构:
    • Epoller.hpp:封装 epoll 接口(创建、等待、控制),管理 epfd。
    • Log.hpp:日志模块,定义全局对象 lg。
    • Main.cc:主函数入口,调用服务器逻辑。
    • makefile:编译构建脚本。
    • nocopy.hpp:禁止拷贝基类,确保服务器和 Epoller 单例特性。
    • Socket.hpp:封装套接字原生接口。
    • TcpServer.hpp:包含连接描述类 Connection 及服务器类 TcpServer。
  3. 建议:在阅读本文前,建议先了解 LT 模式下的 epoll TCP 服务器实现,因为大部分基础逻辑相同。
  4. 实现思路:通过 epoll_ctl 将 fd 关心的事件与 EPOLLET 组合传入,即可开启 ET 模式。

二、第一阶段,基本框架的实现

Connection 类设计

在 ET 模式下,文件描述符 sock 必须设置为非阻塞,并通过循环读取直到出错来清空接收缓冲区。由于 TCP 是面向字节流的,报文可能不完整,因此需要维护用户级输入缓冲区 inbuffer 和输出缓冲区 outbuffer。

  • 成员变量:
    • _sock:文件描述符。
    • _inbuffer, _outbuffer:使用 std::string 类型,支持动态扩容。
    • _tcp_server_ptr:使用 std::weak_ptr<TcpServer> 打破循环引用。
    • _ip, _port:客户端信息。
  • 回调函数:
    • 定义 func_t = std::function<void(std::shared_ptr<Connection>)>。
    • _recv_cb, _send_cb, _except_cb:分别处理读、写、异常事件。
  • 生命周期管理:
    • 析构时关闭 socket。
    • 避免 shared_ptr 循环引用,Connection 内部对 TcpServer 使用 weak_ptr。
#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"

class Connection;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
static const int g_buffer_size = 128;
using func_t = std::function<void(std::shared_ptr<Connection>)>;

class Connection {
public:
    Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
        : _sock(sock), _tcp_server_ptr(tcp_server_ptr) {}

    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb) {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }

    void AppendInBuffer(const std::string& info) { _inbuffer += info; }
    void AppendOutBuffer(const std::string& info) { _outbuffer += info; }

    std::string& InBuffer() { return _inbuffer; }
    std::string& OutBuffer() { return _outbuffer; }

    int SockFd() { return _sock; }

    ~Connection() {
        if (_sock > 0) close(_sock);
    }

private:
    int _sock;
    std::string _inbuffer;
    std::string _outbuffer;

public:
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;
    std::weak_ptr<TcpServer> _tcp_server_ptr;
    std::string _ip;
    uint16_t _port;
};

Main.cc

使用 shared_ptr 管理 TcpServer,调用 Init 和 Loop。

#include <iostream>
#include <memory>
#include "TcpServer.hpp"

int main() {
    std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080));
    epoll_svr->Init();
    epoll_svr->Loop();
    return 0;
}

TcpServer 类实现

  • 继承:enable_shared_from_this<TcpServer> 以安全获取 shared_ptr,nocopy 防止拷贝。
  • 成员变量:
    • _quit:运行标志。
    • _listensock_ptr, _epoller_ptr:智能指针管理资源。
    • _connections:哈希表管理所有连接。
    • _revs:epoll 就绪数组。
  • 关键方法:
    • Loop():死循环执行 Dispatcher。
    • Dispatcher():检测事件,分发回调。
    • Init():初始化 Socket,绑定端口,监听,注册 listensock。
    • Accepter():循环 accept 获取新连接,注册到 epoll。
    • Recver/Sender/Excepter():读写及异常处理回调。
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy {
    static const int num = 64;
public:
    TcpServer(uint16_t port)
        : _quit(true), _port(port), _listensock_ptr(new Sock()), _epoller_ptr(new Epoller()) {}

    void Init() {}
    void Dispatcher(int timeout) {}
    void Loop() {
        _quit = false;
        while (!_quit) {
            Dispatcher(-1);
            PrintConnection();
        }
        _quit = true;
    }
    void PrintConnection() {
        std::cout << "_connections fd list: " << std::endl;
        for (auto& connection : _connections) {
            std::cout << connection.first << ", ";
            std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
        }
        std::cout << std::endl;
    }
    ~TcpServer() {}

private:
    bool _quit;
    uint16_t _port;
    std::shared_ptr<Sock> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _revs[num];
};
辅助功能
  • 非阻塞设置:使用 fcntl 设置 O_NONBLOCK。
  • AddConnection:创建 Connection 对象,注册到哈希表和 epoll。
  • EnableEvent:动态修改 epoll 关注的事件(如写事件)。

三、第二阶段,引入业务协议

业务层集成

引入网络版本计算器协议(Protocol.hpp, ServerCal.hpp)。TcpServer 新增 _OnMessage 回调,用于通知上层处理数据。

Recver 实现

循环读取数据至 _inbuffer,直到 EWOULDBLOCK 或错误。完成后调用 _OnMessage。

void Recver(std::shared_ptr<Connection> connection) {
    int sock = connection->SockFd();
    char buffer[g_buffer_size];
    while (true) {
        memset(buffer, 0, sizeof(buffer));
        ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
        if (n > 0) {
            connection->AppendInBuffer(buffer);
        } else if (n == 0) {
            lg(Info, "sockfd: %d, client info -> %s:%d quit...", sock,
               connection->_ip.c_str(), connection->_port);
            connection->_except_cb(connection);
            return;
        } else {
            if (errno == EWOULDBLOCK) break;
            else if (errno == EINTR) continue;
            else {
                lg(Warning, "sockfd: %d, client info -> %s:%d recv error...", sock,
                   connection->_ip.c_str(), connection->_port);
                connection->_except_cb(connection);
                return;
            }
        }
    }
    // 调用回调函数将数据交付上层处理
    _OnMessage(connection);
}

Sender 实现

循环发送 _outbuffer 数据,根据是否发送完毕动态调整写事件关注状态。

void Sender(std::shared_ptr<Connection> connection) {
    int sock = connection->SockFd();
    std::string& outbuffer = connection->OutBuffer();
    while (true) {
        ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
        if (n > 0) {
            outbuffer.erase(0, n);
            if (outbuffer.empty()) break;
        } else if (n == 0) {
            return;
        } else {
            if (errno == EWOULDBLOCK) break;
            else if (errno == EINTR) continue;
            else {
                lg(Warning, "sockfd: %d, client info -> %s:%d send error...", sock,
                   connection->_ip.c_str(), connection->_port);
                connection->_except_cb(connection);
                return;
            }
        }
    }
    if (!outbuffer.empty()) {
        EnableEvent(sock, true, true);
    } else {
        EnableEvent(sock, true, false);
    }
}

Excepter 实现

移除 epoll 关注,关闭 socket,从哈希表删除连接。

Makefile

需链接 jsoncpp 库。

.PHONY: all
all: reactor_client reactor_server
reactor_client: ClientCal.cc
	g++ -o $@ $^ -std=c++11 -ljsoncpp
reactor_server: Main.cc
	g++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY: clean
clean: rm -f reactor_client reactor_server

四、拓展

  1. 多线程模型:主线程 Reactor 负责 Accept,子线程 Reactor 负责 IO 处理,可接入线程池。
  2. 连接管理:引入定时器,使用最小堆管理连接过期时间,清理不活跃连接。
  3. 半同步半异步:真正的 Reactor 模型通常为主线程只负责等待(半同步),子线程负责处理(半异步),以提高并发能力。

六、源代码

Comm.hpp

#pragma once
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include "Socket.hpp"

void SetNonBlockOrDie(int sock) {
    int fl = fcntl(sock, F_GETFL);
    if (fl < 0) exit(NON_BLOCK_ERR);
    fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}

Epoller.hpp

#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "nocopy.hpp"

class Epoller : public nocopy {
    static const int size = 128;
public:
    Epoller() {
        _epfd = epoll_create(size);
        if (_epfd == -1) {
            lg(Error, "epoll_create error: %s", strerror(errno));
        } else {
            lg(Info, "epoller_create success, epfd: %d", _epfd);
        }
    }
    int EpollerWait(struct epoll_event revents[], int num, int timeout) {
        int n = epoll_wait(_epfd, revents, num, timeout);
        return n;
    }
    int EpollerUpdate(int oper, int sock, uint32_t event) {
        int n = 0;
        if (oper == EPOLL_CTL_DEL) {
            n = epoll_ctl(_epfd, oper, sock, nullptr);
            if (n == -1) lg(Error, "epoll_ctl delete error");
        } else {
            struct epoll_event ev;
            ev.data.fd = sock;
            ev.events = event;
            n = epoll_ctl(_epfd, oper, sock, &ev);
            if (n == -1) lg(Error, "epoll_ctl error");
        }
        return n;
    }
    ~Epoller() {
        if (_epfd >= 0) close(_epfd);
    }
private:
    int _epfd;
};

Log.hpp

#pragma once
#include <iostream>
#include <string>
#include <ctime>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define OneFile 2
#define ClassFile 3
#define LogFile "log.txt"

class Log {
public:
    Log() { printMethod = Screen; path = "./log/"; }
    void Enable(int method) { printMethod = method; }
    ~Log() {}
    std::string levelToString(int level) {
        switch (level) {
            case Info: return "Info";
            case Debug: return "Debug";
            case Warning: return "Warning";
            case Error: return "Error";
            case Fatal: return "Fatal";
            default: return "";
        }
    }
    void operator()(int level, const char* format, ...) {
        time_t t = time(nullptr);
        struct tm* ctime = localtime(&t);
        char leftbuffer[SIZE];
        snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]",
                 levelToString(level).c_str(), ctime->tm_year + 1900,
                 ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour,
                 ctime->tm_min, ctime->tm_sec);
        va_list s;
        va_start(s, format);
        char rightbuffer[SIZE];
        vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
        va_end(s);
        char logtxt[2 * SIZE];
        snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);
        printLog(level, logtxt);
    }
    void printLog(int level, const std::string& logtxt) {
        switch (printMethod) {
            case Screen: std::cout << logtxt << std::endl; break;
            case OneFile: printOneFile(LogFile, logtxt); break;
            case ClassFile: printClassFile(level, logtxt); break;
            default: break;
        }
    }
    void printOneFile(const std::string& logname, const std::string& logtxt) {
        std::string _logname = path + logname;
        int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);
        if (fd < 0) return;
        write(fd, logtxt.c_str(), logtxt.size());
        close(fd);
    }
    void printClassFile(int level, const std::string& logtxt) {
        std::string filename = LogFile;
        filename += ".";
        filename += levelToString(level);
        printOneFile(filename, logtxt);
    }
private:
    int printMethod;
    std::string path;
};
Log lg;

Protocol.hpp

#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>

const std::string black_space_sep = " ";
const std::string protocol_sep = "\n";

std::string Encode(const std::string& content) {
    std::string package = std::to_string(content.size());
    package += protocol_sep;
    package += content;
    package += protocol_sep;
    return package;
}

bool Decode(std::string& package, std::string* content) {
    size_t pos = package.find(protocol_sep);
    if (pos == std::string::npos) return false;
    std::string len_str = package.substr(0, pos);
    size_t len = std::stoi(len_str);
    int total_len = len + 1 + len_str.size() + 1;
    if (package.size() < total_len) return false;
    *content += package.substr(pos + 1, len);
    package.erase(0, total_len);
    return true;
}

class Request {
public:
    Request(int data1, int data2, char oper) : x(data1), y(data2), op(oper) {}
    Request() {}
    bool Serialize(std::string* out) {
#ifdef MySelf
        std::string s = std::to_string(x);
        s += black_space_sep;
        s += op;
        s += black_space_sep;
        s += std::to_string(y);
        *out = s;
        return true;
#else
        Json::Value root;
        root["x"] = x;
        root["y"] = y;
        root["op"] = op;
        Json::StyledWriter w;
        *out = w.write(root);
        return true;
#endif
    }
    bool Deserialize(const std::string& in) {
#ifdef MySelf
        size_t left = in.find(black_space_sep);
        if (left == std::string::npos) return false;
        std::string part_x = in.substr(0, left);
        size_t right = in.rfind(black_space_sep);
        if (right == std::string::npos) return false;
        std::string part_y = in.substr(right + 1);
        if (left + 2 != right) return false;
        x = std::stoi(part_x);
        y = std::stoi(part_y);
        op = in[left + 1];
        return true;
#else
        Json::Value root;
        Json::Reader r;
        r.parse(in, root);
        x = root["x"].asInt();
        y = root["y"].asInt();
        op = root["op"].asInt();
        return true;
#endif
    }
    void DebugPrint() {
        std::cout << "新请求构建完成:" << x << op << y << "=?" << std::endl;
    }
public:
    int x;
    int y;
    char op;
};

class Response {
public:
    Response(int res, int c) : result(res), code(c) {}
    Response() {}
    bool Serialize(std::string* out) {
#ifdef MySelf
        std::string s = std::to_string(result);
        s += black_space_sep;
        s += std::to_string(code);
        *out = s;
        return true;
#else
        Json::Value root;
        root["result"] = result;
        root["code"] = code;
        Json::StyledWriter w;
        *out = w.write(root);
        return true;
#endif
    }
    bool Deserialize(const std::string& in) {
#ifdef MySelf
        size_t pos = in.find(black_space_sep);
        if (pos == std::string::npos) return false;
        std::string part_left = in.substr(0, pos);
        std::string part_right = in.substr(pos + 1);
        result = std::stoi(part_left);
        code = std::stoi(part_right);
        return true;
#else
        Json::Value root;
        Json::Reader r;
        r.parse(in, root);
        result = root["result"].asInt();
        code = root["code"].asInt();
        return true;
#endif
    }
    void DebugPrint() {
        std::cout << "结果响应完成,result: " << result << ", code: " << code << std::endl;
    }
public:
    int result;
    int code;
};

TcpServer.hpp

#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <cerrno>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Comm.hpp"

class Connection;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
static const int g_buffer_size = 128;
using func_t = std::function<void(std::shared_ptr<Connection>)>;

class Connection {
public:
    Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
        : _sock(sock), _tcp_server_ptr(tcp_server_ptr) {}
    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb) {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
    void AppendInBuffer(const std::string& info) { _inbuffer += info; }
    void AppendOutBuffer(const std::string& info) { _outbuffer += info; }
    std::string& InBuffer() { return _inbuffer; }
    std::string& OutBuffer() { return _outbuffer; }
    int SockFd() { return _sock; }
    ~Connection() {
        if (_sock > 0) close(_sock);
    }
private:
    int _sock;
    std::string _inbuffer;
    std::string _outbuffer;
public:
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;
    std::weak_ptr<TcpServer> _tcp_server_ptr;
    std::string _ip;
    uint16_t _port;
};

class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy {
    static const int num = 64;
public:
    TcpServer(uint16_t port, func_t OnMessage)
        : _quit(true), _port(port), _listensock_ptr(new Sock()),
          _epoller_ptr(new Epoller()), _OnMessage(OnMessage) {}

    void Init() {
        _listensock_ptr->Socket();
        SetNonBlockOrDie(_listensock_ptr->Fd());
        _listensock_ptr->Bind(_port);
        _listensock_ptr->Listen();
        lg(Info, "create listen socket success, listensock: %d", _listensock_ptr->Fd());
        AddConnection(_listensock_ptr->Fd(), EVENT_IN,
                      std::bind(&TcpServer::Accepter, this, std::placeholders::_1),
                      nullptr, nullptr);
    }

    void AddConnection(int sock, uint32_t events, func_t recv_cb, func_t send_cb,
                       func_t except_cb, const std::string& ip = "0.0.0.0", uint16_t port = 0) {
        std::shared_ptr<Connection> new_connection(new Connection(sock, shared_from_this()));
        new_connection->SetHandler(recv_cb, send_cb, except_cb);
        new_connection->_ip = ip;
        new_connection->_port = port;
        _connections.insert(std::make_pair(sock, new_connection));
        _epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, events);
        lg(Debug, "add a new connection success, sockfd: %d", sock);
    }

    void Accepter(std::shared_ptr<Connection> connection) {
        while (true) {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);
            if (sock >= 0) {
                uint16_t port = ntohs(peer.sin_port);
                char ip[128];
                inet_ntop(AF_INET, &(peer.sin_addr), ip, sizeof(ip));
                lg(Debug, "get a new client, get info -> [%s:%d], sockfd: %d", ip, port, sock);
                SetNonBlockOrDie(sock);
                AddConnection(sock, EVENT_IN,
                              std::bind(&TcpServer::Recver, this, std::placeholders::_1),
                              std::bind(&TcpServer::Sender, this, std::placeholders::_1),
                              std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
                              ip, port);
            } else {
                if (errno == EWOULDBLOCK) break;
                else if (errno == EINTR) continue;
                else break;
            }
        }
    }

    void Recver(std::shared_ptr<Connection> connection) {
        int sock = connection->SockFd();
        char buffer[g_buffer_size];
        while (true) {
            memset(buffer, 0, sizeof(buffer));
            ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
            if (n > 0) {
                connection->AppendInBuffer(buffer);
            } else if (n == 0) {
                lg(Info, "sockfd: %d, client info -> %s:%d quit...", sock,
                   connection->_ip.c_str(), connection->_port);
                connection->_except_cb(connection);
                return;
            } else {
                if (errno == EWOULDBLOCK) break;
                else if (errno == EINTR) continue;
                else {
                    lg(Warning, "sockfd: %d, client info -> %s:%d recv error...", sock,
                       connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }
        _OnMessage(connection);
    }

    void Sender(std::shared_ptr<Connection> connection) {
        int sock = connection->SockFd();
        std::string& outbuffer = connection->OutBuffer();
        while (true) {
            ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
            if (n > 0) {
                outbuffer.erase(0, n);
                if (outbuffer.empty()) break;
            } else if (n == 0) {
                return;
            } else {
                if (errno == EWOULDBLOCK) break;
                else if (errno == EINTR) continue;
                else {
                    lg(Warning, "sockfd: %d, client info -> %s:%d send error...", sock,
                       connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }
        if (!outbuffer.empty()) {
            EnableEvent(sock, true, true);
        } else {
            EnableEvent(sock, true, false);
        }
    }

    void Excepter(std::shared_ptr<Connection> connection) {
        int fd = connection->SockFd();
        lg(Warning, "Excepter handler socket: %d, client info -> %s:%d, excepter handler",
           fd, connection->_ip.c_str(), connection->_port);
        _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
        lg(Debug, "close %d done...", fd);
        close(fd);
        lg(Debug, "remove %d from _connections...", fd);
        _connections.erase(fd);
    }

    void EnableEvent(int sock, bool readable, bool writeable) {
        uint32_t events = 0;
        events |= ((readable == true ? EPOLLIN : 0) |
                   (writeable == true ? EPOLLOUT : 0) | EPOLLET);
        _epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);
    }

    bool IsConnectionSafe(int fd) {
        auto iter = _connections.find(fd);
        if (iter == _connections.end()) return false;
        return true;
    }

    void Dispatcher(int timeout) {
        int n = _epoller_ptr->EpollerWait(_revs, num, timeout);
        for (int i = 0; i < n; i++) {
            int sock = _revs[i].data.fd;
            uint32_t events = _revs[i].events;
            if ((events & EPOLLERR) | (events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);
            if ((events & EPOLLIN) && IsConnectionSafe(sock)) {
                if (_connections[sock]->_recv_cb)
                    _connections[sock]->_recv_cb(_connections[sock]);
            }
            if ((events & EPOLLOUT) && IsConnectionSafe(sock)) {
                if (_connections[sock]->_send_cb)
                    _connections[sock]->_send_cb(_connections[sock]);
            }
        }
    }

    void Loop() {
        _quit = false;
        while (!_quit) {
            Dispatcher(-1);
            PrintConnection();
        }
        _quit = true;
    }

    void PrintConnection() {
        std::cout << "_connections fd list: " << std::endl;
        for (auto& connection : _connections) {
            std::cout << connection.first << ", ";
            std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
        }
        std::cout << std::endl;
    }

    ~TcpServer() {}

private:
    bool _quit;
    uint16_t _port;
    std::shared_ptr<Sock> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _revs[num];
    func_t _OnMessage;
};

总结

本文详细介绍了基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器的完整实现过程,涵盖了从基础框架搭建到业务协议集成的各个阶段。通过合理设计 Connection 类和 TcpServer 类,解决了循环引用和非阻塞 IO 处理等关键问题,实现了高效稳定的网络服务架构。

目录

  1. Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现
  2. 前言
  3. 一、前置知识
  4. 二、第一阶段,基本框架的实现
  5. Connection 类设计
  6. Main.cc
  7. TcpServer 类实现
  8. 辅助功能
  9. 三、第二阶段,引入业务协议
  10. 业务层集成
  11. Recver 实现
  12. Sender 实现
  13. Excepter 实现
  14. Makefile
  15. 四、拓展
  16. 六、源代码
  17. Comm.hpp
  18. Epoller.hpp
  19. Log.hpp
  20. Protocol.hpp
  21. TcpServer.hpp
  22. 总结
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • ComfyUI 按需付费部署与成本优化方案
  • C++ 在线判题系统(OJ)设计与实现
  • WordPress 基础配置与 Spring Boot + MyBatis-Plus 实战
  • 计算机网络:陆地无线电信道概述
  • HTTP 请求方式详解:GET、POST 与常用方法实战
  • ChatGPT 结构化 Prompt 高级应用指南
  • FPGA RGB 转 HDMI 显示系统原理与实现
  • OpenClaw 基于 Rust+Tauri 构建带安全沙箱的跨平台清理 Skill
  • 决策树算法在Java金融风控系统中的工程化实践
  • 信息安全认证解析:CISP、CISAW 区别及 CISSP 含金量对比
  • OpenClaw TTS 语音合成技术详解与实战配置
  • Python AI 模型构建、训练与评估实战指南
  • 6 款免费 AI 写作软件测评与去 AI 味工具推荐
  • Spring Boot 安全认证与授权核心解析
  • 智能客服系统从零搭建:基于 Python 的 NLP 实战与架构设计
  • 2020 年信奥赛 C++ 提高组 CSP-S 初赛真题解析(选择题 6-10)
  • Neo4j Desktop 安装与使用指南:本地实例、远程连接及数据导入
  • Git Commit Message 提交信息规范指南
  • 前缀和技巧详解:和为 K、被 K 整除、连续数组及矩阵区域和
  • Kotaemon 与 GraphRAG 集成:构建高效文档问答系统

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online