跳到主要内容Linux epoll ET 模式下的 Reactor 网络服务器实现 | 极客日志C++
Linux epoll ET 模式下的 Reactor 网络服务器实现
综述由AI生成基于 Linux epoll 边缘触发(ET)模式构建的 Reactor 网络服务器框架,采用 C++ 语言实现。核心包含 Socket 抽象层、TCP 协议编解码、连接管理及事件分发器。利用非阻塞 IO 解决高并发场景下的资源消耗问题,通过自定义报文格式处理 TCP 粘包。代码展示了如何注册文件描述符、监听可读可写事件,并在边缘触发模式下确保数据完整读取。适合学习高性能网络编程及 IO 多路复用机制。
内存管理20 浏览 Linux epoll ET 模式下的 Reactor 网络服务器实现
Reactor 反应堆模式是一种经典的事件驱动设计模式,特别适用于高并发 I/O 场景。它通过一个事件分发器(Reactor)监听多个文件描述符上的事件,当事件发生时,将控制权分发给对应的事件处理器。在 Linux 环境下,结合 epoll 的边缘触发(Edge Triggered, ET)模式,可以显著提升系统性能,减少不必要的系统调用。
核心架构与组件
整个框架由几个关键模块组成:Socket 封装、协议编解码、连接管理以及事件分发器。
Socket 抽象层
为了屏蔽底层系统调用的差异,我们定义了一个面向对象的 Socket 基类。TcpSocket 继承自 Socket,提供了创建、绑定、监听及读写的具体实现。这里的关键改进在于所有套接字均设置为非阻塞模式,这是配合 epoll ET 模式的前提。
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Comm.hpp"
namespace socket_ns {
class ;
gbacklog = ;
socket_sptr = std::shared_ptr<Socket>;
{ SOCKET_ERROR = , BIND_ERROR, LISTEN_ERROR, USAGE_ERROR };
{
:
= ;
= ;
= ;
= ;
= ;
= ;
= ;
= ;
= ;
{
();
(addr);
();
}
{
();
(addr);
}
};
: Socket {
:
( sockfd = ) : _sockfd(sockfd) {}
{
_sockfd = (AF_INET, SOCK_STREAM, );
(_sockfd < ) {
(FATAL, );
(SOCKET_ERROR);
}
(_sockfd);
(DEBUG, , _sockfd);
}
{
local;
(&local, , (local));
local.sin_family = AF_INET;
local.sin_port = (addr.());
local.sin_addr.s_addr = (addr.().());
n = (_sockfd, ( sockaddr*)&local, (local));
(n < ) {
(FATAL, );
(BIND_ERROR);
}
(DEBUG, , _sockfd);
}
{
n = (_sockfd, gbacklog);
(n < ) {
(FATAL, );
(LISTEN_ERROR);
}
(DEBUG, , _sockfd);
}
{
peer;
len = (peer);
sockfd = (_sockfd, ( sockaddr*)&peer, &len);
*code = errno;
(sockfd < ) {
(WARNING, );
;
}
*addr = peer;
(sockfd);
sockfd;
}
{
server;
(&server, , (server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = (addr.().());
server.sin_port = (addr.());
n = (_sockfd, ( sockaddr*)&server, (server));
(n < ) {
std::cerr << << std::endl;
;
}
;
}
{
inbuffer[];
n = (_sockfd, inbuffer, (inbuffer) - , );
(n > ) {
inbuffer[n] = ;
*out += inbuffer;
}
n;
}
{
n = (_sockfd, in.(), in.(), );
n;
}
{ _sockfd; }
{
(_sockfd > ) ::(_sockfd);
}
~() {}
:
_sockfd;
};
}
Socket
const
static
int
8
using
enum
1
class
Socket
public
virtual void CreateSocketOrDie()
0
virtual void BindSocketOrDie(InetAddr& addr)
0
virtual void ListenSocketOrDie()
0
virtual int Accepter(InetAddr* addr, int* code)
0
virtual bool Connector(InetAddr &addr)
0
virtual int SockFd()
0
virtual int Recv(std::string *out)
0
virtual int Send(const std::string &in)
0
virtual void Close()
0
void BuildListenSocket(InetAddr& addr)
CreateSocketOrDie
BindSocketOrDie
ListenSocketOrDie
bool BuildClientSocket(InetAddr &addr)
CreateSocketOrDie
return
Connector
class
TcpSocket
public
public
TcpSocket
int
-1
void CreateSocketOrDie() override
socket
0
if
0
LOG
"socket error"
exit
SetNonBlock
LOG
"socket create success, sockfd is : %d\n"
void BindSocketOrDie(InetAddr& addr) override
struct
sockaddr_in
memset
0
sizeof
htons
Port
inet_addr
Ip
c_str
int
bind
struct
sizeof
if
0
LOG
"bind error"
exit
LOG
"bind success, sockfd is : %d\n"
void ListenSocketOrDie() override
int
listen
if
0
LOG
"listen error"
exit
LOG
"listen success, sockfd is : %d\n"
int Accepter(InetAddr* addr, int* code) override
struct
sockaddr_in
socklen_t
sizeof
int
accept
struct
if
0
LOG
"accept error\n"
return
-1
SetNonBlock
return
virtual bool Connector(InetAddr& addr)
struct
sockaddr_in
memset
0
sizeof
inet_addr
Ip
c_str
htons
Port
int
connect
struct
sizeof
if
0
"connect error"
return
false
return
true
int Recv(std::string *out) override
char
1024
ssize_t
recv
sizeof
1
0
if
0
0
return
int Send(const std::string &in) override
int
send
c_str
size
0
return
int SockFd() override
return
void Close() override
if
-1
close
TcpSocket
private
int
协议与业务逻辑
TCP 是流式协议,存在粘包问题。我们通过自定义的报文格式(长度 + 数据)来解决。protocol.hpp 中定义了请求和响应的序列化/反序列化逻辑,使用 JSON 作为有效载荷。
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <memory>
#include <jsoncpp/json/json.h>
namespace protocol_ns {
const std::string SEP = "\r\n";
std::string Encode(const std::string &json_str) {
int json_str_len = json_str.size();
std::string proto_str = std::to_string(json_str_len);
proto_str += SEP;
proto_str += json_str;
proto_str += SEP;
return proto_str;
}
std::string Decode(std::string &inbuffer) {
auto pos = inbuffer.find(SEP);
if (pos == std::string::npos) return std::string();
std::string len_str = inbuffer.substr(0, pos);
if (len_str.empty()) return std::string();
int packlen = std::stoi(len_str);
int total = packlen + len_str.size() + 2 * SEP.size();
if (inbuffer.size() < total) return std::string();
std::string package = inbuffer.substr(pos + SEP.size(), packlen);
inbuffer.erase(0, total);
return package;
}
class Request {
public:
Request() {}
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper) {}
bool Serialize(std::string* out) {
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter writer;
*out = writer.write(root);
return true;
}
bool DeSerialize(const std::string& in) {
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res) return false;
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
}
public:
int _x;
int _y;
char _oper;
};
class Response {
public:
Response() {}
Response(int result, int code) : _result(result), _code(code) {}
bool Serialize(std::string *out) {
Json::Value root;
root["result"] = _result;
root["code"] = _code;
Json::FastWriter writer;
*out = writer.write(root);
return true;
}
bool Deserialize(const std::string &in) {
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res) return false;
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
}
public:
int _result;
int _code;
};
}
连接管理与事件处理
Connection 类维护了单个连接的上下文,包括输入输出缓冲区。HandlerConnection 则负责具体的 IO 回调逻辑,如接收数据填充缓冲区、发送数据清空缓冲区等。这里需要特别注意非阻塞 IO 下的错误处理,例如 EWOULDBLOCK 表示当前无数据可读或缓冲区满,应暂停操作而非报错。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "Reactor.hpp"
#include "InetAddr.hpp"
#include <unistd.h>
class Connection;
class Reactor;
using func_t = std::function<void(Connection *)>;
class Connection {
public:
Connection(int sock) : _sock(sock), _R(nullptr) {}
int Sockfd() { return _sock; }
void SetEvents(int events) { _events = events; }
uint32_t Events() { return _events; }
void Register(func_t recver, func_t sender, func_t excepter) {
_recver = recver;
_sender = sender;
_excepter = excepter;
}
void SetSelf(Reactor *R) { _R = R; }
void AppendInBuffer(const std::string &buff) { _inbuffer += buff; }
std::string& Inbuffer() { return _inbuffer; }
void AppendOutBuffer(const std::string &buff) { _outbuffer += buff; }
std::string &Outbuffer() { return _outbuffer; }
bool OutbufferEmpty() { return _outbuffer.empty(); }
void OutbufferRemove(int n) { _outbuffer.erase(0, n); }
void Close() {
if (_sock >= 0) ::close(_sock);
}
~Connection() {}
func_t _recver;
func_t _sender;
func_t _excepter;
Reactor *_R;
private:
int _sock;
std::string _inbuffer;
std::string _outbuffer;
InetAddr _addr;
uint32_t _events;
};
Epoller 封装
Epoller 类是对 Linux epoll 接口的轻量级封装,负责添加、修改和删除事件监听。在 ET 模式下,必须确保一次性读取完所有数据,否则后续不会再次通知,直到有新数据到达。
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include "Log.hpp"
#include "Comm.hpp"
static const int gsize = 128;
class Epoller {
private:
bool EventMethodCore(int fd, u_int32_t events, int type) {
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
int n = ::epoll_ctl(_epfd, type, fd, &ev);
if (n < 0) {
LOG(ERROR, "epoll_ctl error!\n");
return false;
}
LOG(DEBUG, "epoll_ctl add %d success!\n", fd);
return true;
}
public:
Epoller() {
_epfd = ::epoll_create(gsize);
if (_epfd < 0) {
LOG(FATAL, "epoll create error!\n");
exit(EPOLL_CREATE_ERROR);
}
LOG(FATAL, "epoll create success, epfd: %d\n", _epfd);
}
bool AddEvent(int fd, uint32_t events) {
return EventMethodCore(fd, events, EPOLL_CTL_ADD);
}
bool ModEvent(int fd, uint32_t events) {
return EventMethodCore(fd, events, EPOLL_CTL_MOD);
}
bool DelEvent(int fd) {
return ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
}
int Wait(struct epoll_event revs[], int num, int timeout) {
int n = ::epoll_wait(_epfd, revs, num, timeout);
return n;
}
~Epoller() {
if (_epfd >= 0) ::close(_epfd);
}
private:
int _epfd;
};
Reactor 主循环
Reactor 是整个框架的核心,它维护着所有连接的映射表,并不断调用 epoll_wait 等待事件。一旦有事件就绪,便根据事件类型(读/写)调用对应的回调函数。在 ET 模式下,如果一次读取未读完,需要重新注册可写事件以便后续发送响应。
#pragma once
#include <iostream>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
class Reactor {
const static int gnum = 64;
public:
Reactor() : _isrunning(false) {}
void AddConnection(int fd, uint32_t events, func_t recver, func_t sender, func_t excepter) {
Connection *conn = new Connection(fd);
conn->SetEvents(events);
conn->Register(recver, sender, excepter);
conn->SetSelf(this);
_epller.AddEvent(conn->Sockfd(), conn->Events());
_connections.insert(std::make_pair(conn->Sockfd(), conn));
}
bool ConnectionIsExists(int sockfd) {
auto iter = _connections.find(sockfd);
return iter != _connections.end();
}
void EnableReadWrite(int sockfd, bool readable, bool writeable) {
uint32_t events = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
if (ConnectionIsExists(sockfd)) {
_connections[sockfd]->SetEvents(events);
_epller.ModEvent(sockfd, events);
}
}
void RemoveConnection(int sockfd) {
if (!ConnectionIsExists(sockfd)) return;
_epller.DelEvent(sockfd);
_connections[sockfd]->Close();
delete _connections[sockfd];
_connections.erase(sockfd);
}
void LoopOnce(int timeout) {
int n = _epller.Wait(revs, gnum, timeout);
for (int i = 0; i < n; i++) {
int sockfd = revs[i].data.fd;
uint32_t revents = revs[i].events;
if (revents & EPOLLHUP) revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLERR) revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLIN) {
if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_recver != nullptr)) {
_connections[sockfd]->_recver(_connections[sockfd]);
}
}
if (revents & EPOLLOUT) {
if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_sender != nullptr)) {
_connections[sockfd]->_sender(_connections[sockfd]);
}
}
}
}
void Dispatcher() {
_isrunning = true;
int timeout = 3000;
while (_isrunning) {
LoopOnce(timeout);
Debug();
}
_isrunning = false;
}
void Debug() {
std::cout << "------------------------------------" << std::endl;
for (auto& connection : _connections) {
std::cout << "fd : " << connection.second->Sockfd() << ", ";
uint32_t events = connection.second->Events();
if ((events & EPOLLIN) && (events & EPOLLET)) std::cout << "EPOLLIN | EPOLLET, ";
if ((events & EPOLLOUT) && (events & EPOLLET)) std::cout << "EPOLLOUT | EPOLLET";
std::cout << std::endl;
}
std::cout << "------------------------------------" << std::endl;
}
~Reactor() {}
private:
std::unordered_map<int, Connection *> _connections;
struct epoll_event revs[gnum];
Epoller _epller;
bool _isrunning;
};
主程序入口
最后,我们在 main.cc 中组装各个组件。创建一个 Reactor 实例,初始化 Listener 监听端口,并将监听套接字注册到 Reactor 中。启动后,Reactor 将进入无限循环,处理所有连接的生命周期。
#include <iostream>
#include <memory>
#include "Reactor.hpp"
#include "Connection.hpp"
#include "Listener.hpp"
#include "PackageParse.hpp"
#include "HandlerConnection.hpp"
#include "Log.hpp"
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cout << "Usage: " << argv[0] << " port" << std::endl;
return 0;
}
uint16_t port = std::stoi(argv[1]);
EnableScreen();
std::unique_ptr<Reactor> react = std::make_unique<Reactor>();
HandlerConnection hc(PackageParse::Parse);
Listener listener(port, hc);
react->AddConnection(listener.Sockfd(), EPOLLIN | EPOLLET,
std::bind(&Listener::Accepter, &listener, std::placeholders::_1),
nullptr, nullptr);
react->Dispatcher();
}
该实现展示了如何从零构建一个基于 C++ 的高性能网络服务器。通过合理运用 epoll ET 模式和面向对象的设计,能够有效支撑高并发场景下的通信需求。
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online