跳到主要内容Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现 | 极客日志C++算法
Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现
Linux 高级 IO 技术详解。文章介绍了基于边缘触发(ET)模式的 epoll 机制构建 Reactor 反应堆模型的 TCP 服务器实现。内容涵盖连接管理、非阻塞 IO 设置、读写缓冲区处理、事件分发器 Dispatcher 设计以及业务协议集成。通过 C++ 代码演示了 TcpServer、Connection、Epoller 等核心类的实现细节,解决了循环引用问题并实现了完整的请求响应流程。
极光3.8K 浏览 Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现
前言
本文介绍基于边缘触发(ET)模式的 epoll 机制构建 Reactor 反应堆模型的 TCP 服务器实现。文章将在水平触发(LT)模式的基础上进行讲解,重点阐述如何将 epoll 设置为 ET 模式并实现完整的 Reactor 架构。
一、前置知识
- epoll 工作模式:epoll 默认使用 LT(Level Triggered)水平触发模式,也可设置为 ET(Edge Triggered)边缘触发模式。设置方法是在
epoll_ctl 中将事件按位与上 EPOLLET。
- 核心文件结构:
Epoller.hpp:封装 epoll 接口(创建、等待、控制),管理 epfd。
Log.hpp:日志模块,定义全局对象 lg。
Main.cc:主函数入口,调用服务器逻辑。
makefile:编译构建脚本。
nocopy.hpp:禁止拷贝基类,确保服务器和 Epoller 单例特性。
Socket.hpp:封装套接字原生接口。
TcpServer.hpp:包含连接描述类 Connection 及服务器类 TcpServer。
- 建议:在阅读本文前,建议先了解 LT 模式下的 epoll TCP 服务器实现,因为大部分基础逻辑相同。
- 实现思路:通过
epoll_ctl 将 fd 关心的事件与 EPOLLET 组合传入,即可开启 ET 模式。
二、第一阶段,基本框架的实现
Connection 类设计
在 ET 模式下,文件描述符 sock 必须设置为非阻塞,并通过循环读取直到出错来清空接收缓冲区。由于 TCP 是面向字节流的,报文可能不完整,因此需要维护用户级输入缓冲区 inbuffer 和输出缓冲区 outbuffer。
- 成员变量:
_sock:文件描述符。
_inbuffer, _outbuffer:使用 std::string 类型,支持动态扩容。
_tcp_server_ptr:使用 std::weak_ptr<TcpServer> 打破循环引用。
_ip, _port:客户端信息。
- 回调函数:
- 定义
func_t = std::function<void(std::shared_ptr<Connection>)>。
_recv_cb, _send_cb, _except_cb:分别处理读、写、异常事件。
- 生命周期管理:
- 析构时关闭 socket。
- 避免
shared_ptr 循环引用,Connection 内部对 TcpServer 使用 weak_ptr。
#include <iostream>
;
;
EVENT_IN = (EPOLLIN | EPOLLET);
EVENT_OUT = (EPOLLOUT | EPOLLET);
g_buffer_size = ;
= std::function<(std::shared_ptr<Connection>)>;
{
:
( sock, std::weak_ptr<TcpServer> tcp_server_ptr)
: _sock(sock), _tcp_server_ptr(tcp_server_ptr) {}
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
{ _inbuffer += info; }
{ _outbuffer += info; }
{ _inbuffer; }
{ _outbuffer; }
{ _sock; }
~() {
(_sock > ) (_sock);
}
:
_sock;
std::string _inbuffer;
std::string _outbuffer;
:
_recv_cb;
_send_cb;
_except_cb;
std::weak_ptr<TcpServer> _tcp_server_ptr;
std::string _ip;
_port;
};
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
class
Connection
class
TcpServer
uint32_t
uint32_t
static
const
int
128
using
func_t
void
class
Connection
public
Connection
int
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
void AppendInBuffer(const std::string& info)
void AppendOutBuffer(const std::string& info)
std::string& InBuffer()
return
std::string& OutBuffer()
return
int SockFd()
return
Connection
if
0
close
private
int
public
func_t
func_t
func_t
uint16_t
Main.cc
使用 shared_ptr 管理 TcpServer,调用 Init 和 Loop。
#include <iostream>
#include <memory>
#include "TcpServer.hpp"
int main() {
std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080));
epoll_svr->Init();
epoll_svr->Loop();
return 0;
}
TcpServer 类实现
- 继承:
enable_shared_from_this<TcpServer> 以安全获取 shared_ptr,nocopy 防止拷贝。
- 成员变量:
_quit:运行标志。
_listensock_ptr, _epoller_ptr:智能指针管理资源。
_connections:哈希表管理所有连接。
_revs:epoll 就绪数组。
- 关键方法:
Loop():死循环执行 Dispatcher。
Dispatcher():检测事件,分发回调。
Init():初始化 Socket,绑定端口,监听,注册 listensock。
Accepter():循环 accept 获取新连接,注册到 epoll。
Recver/Sender/Excepter():读写及异常处理回调。
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy {
static const int num = 64;
public:
TcpServer(uint16_t port)
: _quit(true), _port(port), _listensock_ptr(new Sock()), _epoller_ptr(new Epoller()) {}
void Init() {}
void Dispatcher(int timeout) {}
void Loop() {
_quit = false;
while (!_quit) {
Dispatcher(-1);
PrintConnection();
}
_quit = true;
}
void PrintConnection() {
std::cout << "_connections fd list: " << std::endl;
for (auto& connection : _connections) {
std::cout << connection.first << ", ";
std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
}
std::cout << std::endl;
}
~TcpServer() {}
private:
bool _quit;
uint16_t _port;
std::shared_ptr<Sock> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event _revs[num];
};
辅助功能
- 非阻塞设置:使用
fcntl 设置 O_NONBLOCK。
- AddConnection:创建 Connection 对象,注册到哈希表和 epoll。
- EnableEvent:动态修改 epoll 关注的事件(如写事件)。
三、第二阶段,引入业务协议
业务层集成
引入网络版本计算器协议(Protocol.hpp, ServerCal.hpp)。TcpServer 新增 _OnMessage 回调,用于通知上层处理数据。
Recver 实现
循环读取数据至 _inbuffer,直到 EWOULDBLOCK 或错误。完成后调用 _OnMessage。
void Recver(std::shared_ptr<Connection> connection) {
int sock = connection->SockFd();
char buffer[g_buffer_size];
while (true) {
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (n > 0) {
connection->AppendInBuffer(buffer);
} else if (n == 0) {
lg(Info, "sockfd: %d, client info -> %s:%d quit...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
} else {
if (errno == EWOULDBLOCK) break;
else if (errno == EINTR) continue;
else {
lg(Warning, "sockfd: %d, client info -> %s:%d recv error...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
_OnMessage(connection);
}
Sender 实现
循环发送 _outbuffer 数据,根据是否发送完毕动态调整写事件关注状态。
void Sender(std::shared_ptr<Connection> connection) {
int sock = connection->SockFd();
std::string& outbuffer = connection->OutBuffer();
while (true) {
ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
if (n > 0) {
outbuffer.erase(0, n);
if (outbuffer.empty()) break;
} else if (n == 0) {
return;
} else {
if (errno == EWOULDBLOCK) break;
else if (errno == EINTR) continue;
else {
lg(Warning, "sockfd: %d, client info -> %s:%d send error...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
if (!outbuffer.empty()) {
EnableEvent(sock, true, true);
} else {
EnableEvent(sock, true, false);
}
}
Excepter 实现
移除 epoll 关注,关闭 socket,从哈希表删除连接。
Makefile
.PHONY: all
all: reactor_client reactor_server
reactor_client: ClientCal.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
reactor_server: Main.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY: clean
clean: rm -f reactor_client reactor_server
四、拓展
- 多线程模型:主线程 Reactor 负责 Accept,子线程 Reactor 负责 IO 处理,可接入线程池。
- 连接管理:引入定时器,使用最小堆管理连接过期时间,清理不活跃连接。
- 半同步半异步:真正的 Reactor 模型通常为主线程只负责等待(半同步),子线程负责处理(半异步),以提高并发能力。
六、源代码
Comm.hpp
#pragma once
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include "Socket.hpp"
void SetNonBlockOrDie(int sock) {
int fl = fcntl(sock, F_GETFL);
if (fl < 0) exit(NON_BLOCK_ERR);
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
Epoller.hpp
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "nocopy.hpp"
class Epoller : public nocopy {
static const int size = 128;
public:
Epoller() {
_epfd = epoll_create(size);
if (_epfd == -1) {
lg(Error, "epoll_create error: %s", strerror(errno));
} else {
lg(Info, "epoller_create success, epfd: %d", _epfd);
}
}
int EpollerWait(struct epoll_event revents[], int num, int timeout) {
int n = epoll_wait(_epfd, revents, num, timeout);
return n;
}
int EpollerUpdate(int oper, int sock, uint32_t event) {
int n = 0;
if (oper == EPOLL_CTL_DEL) {
n = epoll_ctl(_epfd, oper, sock, nullptr);
if (n == -1) lg(Error, "epoll_ctl delete error");
} else {
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
n = epoll_ctl(_epfd, oper, sock, &ev);
if (n == -1) lg(Error, "epoll_ctl error");
}
return n;
}
~Epoller() {
if (_epfd >= 0) close(_epfd);
}
private:
int _epfd;
};
Log.hpp
#pragma once
#include <iostream>
#include <string>
#include <ctime>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define OneFile 2
#define ClassFile 3
#define LogFile "log.txt"
class Log {
public:
Log() { printMethod = Screen; path = "./log/"; }
void Enable(int method) { printMethod = method; }
~Log() {}
std::string levelToString(int level) {
switch (level) {
case Info: return "Info";
case Debug: return "Debug";
case Warning: return "Warning";
case Error: return "Error";
case Fatal: return "Fatal";
default: return "";
}
}
void operator()(int level, const char* format, ...) {
time_t t = time(nullptr);
struct tm* ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]",
levelToString(level).c_str(), ctime->tm_year + 1900,
ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour,
ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
char logtxt[2 * SIZE];
snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);
printLog(level, logtxt);
}
void printLog(int level, const std::string& logtxt) {
switch (printMethod) {
case Screen: std::cout << logtxt << std::endl; break;
case OneFile: printOneFile(LogFile, logtxt); break;
case ClassFile: printClassFile(level, logtxt); break;
default: break;
}
}
void printOneFile(const std::string& logname, const std::string& logtxt) {
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);
if (fd < 0) return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string& logtxt) {
std::string filename = LogFile;
filename += ".";
filename += levelToString(level);
printOneFile(filename, logtxt);
}
private:
int printMethod;
std::string path;
};
Log lg;
Protocol.hpp
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
const std::string black_space_sep = " ";
const std::string protocol_sep = "\n";
std::string Encode(const std::string& content) {
std::string package = std::to_string(content.size());
package += protocol_sep;
package += content;
package += protocol_sep;
return package;
}
bool Decode(std::string& package, std::string* content) {
size_t pos = package.find(protocol_sep);
if (pos == std::string::npos) return false;
std::string len_str = package.substr(0, pos);
size_t len = std::stoi(len_str);
int total_len = len + 1 + len_str.size() + 1;
if (package.size() < total_len) return false;
*content += package.substr(pos + 1, len);
package.erase(0, total_len);
return true;
}
class Request {
public:
Request(int data1, int data2, char oper) : x(data1), y(data2), op(oper) {}
Request() {}
bool Serialize(std::string* out) {
#ifdef MySelf
std::string s = std::to_string(x);
s += black_space_sep;
s += op;
s += black_space_sep;
s += std::to_string(y);
*out = s;
return true;
#else
Json::Value root;
root["x"] = x;
root["y"] = y;
root["op"] = op;
Json::StyledWriter w;
*out = w.write(root);
return true;
#endif
}
bool Deserialize(const std::string& in) {
#ifdef MySelf
size_t left = in.find(black_space_sep);
if (left == std::string::npos) return false;
std::string part_x = in.substr(0, left);
size_t right = in.rfind(black_space_sep);
if (right == std::string::npos) return false;
std::string part_y = in.substr(right + 1);
if (left + 2 != right) return false;
x = std::stoi(part_x);
y = std::stoi(part_y);
op = in[left + 1];
return true;
#else
Json::Value root;
Json::Reader r;
r.parse(in, root);
x = root["x"].asInt();
y = root["y"].asInt();
op = root["op"].asInt();
return true;
#endif
}
void DebugPrint() {
std::cout << "新请求构建完成:" << x << op << y << "=?" << std::endl;
}
public:
int x;
int y;
char op;
};
class Response {
public:
Response(int res, int c) : result(res), code(c) {}
Response() {}
bool Serialize(std::string* out) {
#ifdef MySelf
std::string s = std::to_string(result);
s += black_space_sep;
s += std::to_string(code);
*out = s;
return true;
#else
Json::Value root;
root["result"] = result;
root["code"] = code;
Json::StyledWriter w;
*out = w.write(root);
return true;
#endif
}
bool Deserialize(const std::string& in) {
#ifdef MySelf
size_t pos = in.find(black_space_sep);
if (pos == std::string::npos) return false;
std::string part_left = in.substr(0, pos);
std::string part_right = in.substr(pos + 1);
result = std::stoi(part_left);
code = std::stoi(part_right);
return true;
#else
Json::Value root;
Json::Reader r;
r.parse(in, root);
result = root["result"].asInt();
code = root["code"].asInt();
return true;
#endif
}
void DebugPrint() {
std::cout << "结果响应完成,result: " << result << ", code: " << code << std::endl;
}
public:
int result;
int code;
};
TcpServer.hpp
#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <cerrno>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Comm.hpp"
class Connection;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
static const int g_buffer_size = 128;
using func_t = std::function<void(std::shared_ptr<Connection>)>;
class Connection {
public:
Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
: _sock(sock), _tcp_server_ptr(tcp_server_ptr) {}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb) {
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
void AppendInBuffer(const std::string& info) { _inbuffer += info; }
void AppendOutBuffer(const std::string& info) { _outbuffer += info; }
std::string& InBuffer() { return _inbuffer; }
std::string& OutBuffer() { return _outbuffer; }
int SockFd() { return _sock; }
~Connection() {
if (_sock > 0) close(_sock);
}
private:
int _sock;
std::string _inbuffer;
std::string _outbuffer;
public:
func_t _recv_cb;
func_t _send_cb;
func_t _except_cb;
std::weak_ptr<TcpServer> _tcp_server_ptr;
std::string _ip;
uint16_t _port;
};
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy {
static const int num = 64;
public:
TcpServer(uint16_t port, func_t OnMessage)
: _quit(true), _port(port), _listensock_ptr(new Sock()),
_epoller_ptr(new Epoller()), _OnMessage(OnMessage) {}
void Init() {
_listensock_ptr->Socket();
SetNonBlockOrDie(_listensock_ptr->Fd());
_listensock_ptr->Bind(_port);
_listensock_ptr->Listen();
lg(Info, "create listen socket success, listensock: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN,
std::bind(&TcpServer::Accepter, this, std::placeholders::_1),
nullptr, nullptr);
}
void AddConnection(int sock, uint32_t events, func_t recv_cb, func_t send_cb,
func_t except_cb, const std::string& ip = "0.0.0.0", uint16_t port = 0) {
std::shared_ptr<Connection> new_connection(new Connection(sock, shared_from_this()));
new_connection->SetHandler(recv_cb, send_cb, except_cb);
new_connection->_ip = ip;
new_connection->_port = port;
_connections.insert(std::make_pair(sock, new_connection));
_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, events);
lg(Debug, "add a new connection success, sockfd: %d", sock);
}
void Accepter(std::shared_ptr<Connection> connection) {
while (true) {
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);
if (sock >= 0) {
uint16_t port = ntohs(peer.sin_port);
char ip[128];
inet_ntop(AF_INET, &(peer.sin_addr), ip, sizeof(ip));
lg(Debug, "get a new client, get info -> [%s:%d], sockfd: %d", ip, port, sock);
SetNonBlockOrDie(sock);
AddConnection(sock, EVENT_IN,
std::bind(&TcpServer::Recver, this, std::placeholders::_1),
std::bind(&TcpServer::Sender, this, std::placeholders::_1),
std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
ip, port);
} else {
if (errno == EWOULDBLOCK) break;
else if (errno == EINTR) continue;
else break;
}
}
}
void Recver(std::shared_ptr<Connection> connection) {
int sock = connection->SockFd();
char buffer[g_buffer_size];
while (true) {
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (n > 0) {
connection->AppendInBuffer(buffer);
} else if (n == 0) {
lg(Info, "sockfd: %d, client info -> %s:%d quit...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
} else {
if (errno == EWOULDBLOCK) break;
else if (errno == EINTR) continue;
else {
lg(Warning, "sockfd: %d, client info -> %s:%d recv error...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
_OnMessage(connection);
}
void Sender(std::shared_ptr<Connection> connection) {
int sock = connection->SockFd();
std::string& outbuffer = connection->OutBuffer();
while (true) {
ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
if (n > 0) {
outbuffer.erase(0, n);
if (outbuffer.empty()) break;
} else if (n == 0) {
return;
} else {
if (errno == EWOULDBLOCK) break;
else if (errno == EINTR) continue;
else {
lg(Warning, "sockfd: %d, client info -> %s:%d send error...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
if (!outbuffer.empty()) {
EnableEvent(sock, true, true);
} else {
EnableEvent(sock, true, false);
}
}
void Excepter(std::shared_ptr<Connection> connection) {
int fd = connection->SockFd();
lg(Warning, "Excepter handler socket: %d, client info -> %s:%d, excepter handler",
fd, connection->_ip.c_str(), connection->_port);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
lg(Debug, "close %d done...", fd);
close(fd);
lg(Debug, "remove %d from _connections...", fd);
_connections.erase(fd);
}
void EnableEvent(int sock, bool readable, bool writeable) {
uint32_t events = 0;
events |= ((readable == true ? EPOLLIN : 0) |
(writeable == true ? EPOLLOUT : 0) | EPOLLET);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);
}
bool IsConnectionSafe(int fd) {
auto iter = _connections.find(fd);
if (iter == _connections.end()) return false;
return true;
}
void Dispatcher(int timeout) {
int n = _epoller_ptr->EpollerWait(_revs, num, timeout);
for (int i = 0; i < n; i++) {
int sock = _revs[i].data.fd;
uint32_t events = _revs[i].events;
if ((events & EPOLLERR) | (events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);
if ((events & EPOLLIN) && IsConnectionSafe(sock)) {
if (_connections[sock]->_recv_cb)
_connections[sock]->_recv_cb(_connections[sock]);
}
if ((events & EPOLLOUT) && IsConnectionSafe(sock)) {
if (_connections[sock]->_send_cb)
_connections[sock]->_send_cb(_connections[sock]);
}
}
}
void Loop() {
_quit = false;
while (!_quit) {
Dispatcher(-1);
PrintConnection();
}
_quit = true;
}
void PrintConnection() {
std::cout << "_connections fd list: " << std::endl;
for (auto& connection : _connections) {
std::cout << connection.first << ", ";
std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
}
std::cout << std::endl;
}
~TcpServer() {}
private:
bool _quit;
uint16_t _port;
std::shared_ptr<Sock> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event _revs[num];
func_t _OnMessage;
};
总结
本文详细介绍了基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器的完整实现过程,涵盖了从基础框架搭建到业务协议集成的各个阶段。通过合理设计 Connection 类和 TcpServer 类,解决了循环引用和非阻塞 IO 处理等关键问题,实现了高效稳定的网络服务架构。