跳到主要内容基于 Linux epoll ET 模式的 Reactor 网络服务器实现 | 极客日志C++算法
基于 Linux epoll ET 模式的 Reactor 网络服务器实现
基于 Linux epoll 边缘触发(ET)模式实现的 Reactor 网络服务器框架。内容涵盖 Reactor 模式的基本概念、核心组件及工作流程,详细展示了 Socket 抽象层、协议编解码(Protocol)、连接管理(Connection)、事件处理(HandlerConnection)以及核心的 Epoller 和 Reactor 类实现。通过 C++ 代码示例,演示了如何构建高并发 I/O 多路复用服务器,包括非阻塞套接字设置、TCP 粘包处理、事件分发循环及连接生命周期管理。
未来可期1 浏览 Reactor 反应堆模式
Reactor 反应堆模式是一种事件驱动的设计模式,通常用于处理高并发的 I/O 操作,尤其是在服务器或网络编程中。
基本概念
Reactor 模式又称之为响应器模式,基于事件多路复用机制,使得单个线程能够同时管理大量并发连接,而不需要为每个连接创建一个独立的线程。它通过一个事件分发器(Reactor)来监听和管理不同的 I/O 事件,当事件发生时,分发器会将该事件分发给对应的事件处理器来处理。
核心组件
- 事件分发器(Reactor):负责监听各种事件源(如 socket、文件描述符)并将事件分发给相应的处理器。事件分发器通常使用 I/O 多路复用机制(如 select、poll、epoll)来同时监听多个 I/O 事件。
- 事件处理器(Event Handler):定义了如何处理特定事件。当事件分发器检测到某个事件时,就会触发相应的事件处理器中的回调函数。
- 同步事件分离器(Demultiplexer):本质上是系统调用,用于监听事件源上的事件,并将事件通知给事件分发器。例如,在 Linux 中,可以使用 select、poll 或 epoll 等系统调用来实现同步事件分离器。
工作流程
- 注册事件:事件分发器注册需要监听的 I/O 事件(如连接、读写),并关联相应的事件处理器。
- 进入循环:事件分发器进入循环,使用 I/O 多路复用机制来监听注册的 I/O 事件。
- 分发事件:一旦某个 I/O 事件发生,事件分发器会将该事件分发给对应的事件处理器。
- 处理事件:事件处理器执行预定义的操作来处理该事件。处理完成后,可能会重新注册事件或关闭连接。
epoll 服务器 (ET)
服务器监听一个指定的端口,当有新的连接请求到来时,服务器接受连接并将其注册到 Reactor 中,以便处理后续的数据读写事件。
Socket.hpp
包含了一个抽象基类 Socket 和一个继承自 Socket 的具体实现类 TcpSocket。提供一个面向对象的网络套接字编程接口,允许用户通过继承和实现基类中的纯虚函数来创建不同类型的套接字(例如 TCP 套接字)。
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include
socket_ns {
;
gbacklog = ;
socket_sptr = std::shared_ptr<Socket>;
{ SOCKET_ERROR = , BIND_ERROR, LISTEN_ERROR, USAGE_ERROR };
{
:
= ;
= ;
= ;
= ;
= ;
= ;
= ;
= ;
= ;
:
{
();
(addr);
();
}
{
();
(addr);
}
};
: Socket {
:
( sockfd = ) : _sockfd(sockfd) {}
{
_sockfd = (AF_INET, SOCK_STREAM, );
(_sockfd < ) {
(FATAL, );
(SOCKET_ERROR);
}
(_sockfd);
(DEBUG, , _sockfd);
}
{
local;
(&local, , (local));
local.sin_family = AF_INET;
local.sin_port = (addr.());
local.sin_addr.s_addr = (addr.().());
n = (_sockfd, ( sockaddr*)&local, (local));
(n < ) {
(FATAL, );
(BIND_ERROR);
}
(DEBUG, , _sockfd);
}
{
n = (_sockfd, gbacklog);
(n < ) {
(FATAL, );
(LISTEN_ERROR);
}
(DEBUG, , _sockfd);
}
{
peer;
len = (peer);
sockfd = (_sockfd, ( sockaddr*)&peer, &len);
*code = errno;
(sockfd < ) {
(WARNING, );
;
}
*addr = peer;
(sockfd);
sockfd;
}
{
server;
(&server, , (server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = (addr.().());
server.sin_port = (addr.());
n = (_sockfd, ( sockaddr*)&server, (server));
(n < ) {
std::cerr << << std::endl;
;
}
;
}
{
inbuffer[];
n = (_sockfd, inbuffer, (inbuffer) - , );
(n > ) {
inbuffer[n] = ;
*out += inbuffer;
}
n;
}
{
n = (_sockfd, in.(), in.(), );
n;
}
{
_sockfd;
}
{
(_sockfd > )
::(_sockfd);
}
~() {}
:
_sockfd;
};
}
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown 转 HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
- HTML 转 Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
<cstring>
#include <pthread.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Comm.hpp"
namespace
class
Socket
const
static
int
8
using
enum
1
class
Socket
public
virtual void CreateSocketOrDie()
0
virtual void BindSocketOrDie(InetAddr& addr)
0
virtual void ListenSocketOrDie()
0
virtual int Accepter(InetAddr* addr, int* code)
0
virtual bool Connector(InetAddr &addr)
0
virtual int SockFd()
0
virtual int Recv(std::string *out)
0
virtual int Send(const std::string &in)
0
virtual void Close()
0
public
void BuildListenSocket(InetAddr& addr)
CreateSocketOrDie
BindSocketOrDie
ListenSocketOrDie
bool BuildClientSocket(InetAddr &addr)
CreateSocketOrDie
return
Connector
class
TcpSocket
public
public
TcpSocket
int
-1
void CreateSocketOrDie() override
socket
0
if
0
LOG
"socket error"
exit
SetNonBlock
LOG
"socket create success, sockfd is : %d\n"
void BindSocketOrDie(InetAddr& addr) override
struct
sockaddr_in
memset
0
sizeof
htons
Port
inet_addr
Ip
c_str
int
bind
struct
sizeof
if
0
LOG
"bind error"
exit
LOG
"bind success, sockfd is : %d\n"
void ListenSocketOrDie() override
int
listen
if
0
LOG
"listen error"
exit
LOG
"listen success, sockfd is : %d\n"
int Accepter(InetAddr* addr, int* code) override
struct
sockaddr_in
socklen_t
sizeof
int
accept
struct
if
0
LOG
"accept error\n"
return
-1
SetNonBlock
return
virtual bool Connector(InetAddr& addr)
struct
sockaddr_in
memset
0
sizeof
inet_addr
Ip
c_str
htons
Port
int
connect
struct
sizeof
if
0
"connect error"
return
false
return
true
int Recv(std::string *out) override
char
1024
ssize_t
recv
sizeof
1
0
if
0
0
return
int Send(const std::string &in) override
int
send
c_str
size
0
return
int SockFd() override
return
void Close() override
if
-1
close
TcpSocket
private
int
Calculate.hpp
#pragma once
#include <iostream>
#include "ProToCol.hpp"
using namespace protocol_ns;
class Calculate {
public:
Calculate() {}
Response Excute(const Request &req) {
Response resp(0, 0);
switch (req._oper) {
case '+': resp._result = req._x + req._y; break;
case '-': resp._result = req._x - req._y; break;
case '*': resp._result = req._x * req._y; break;
case '/': {
if (req._y == 0) {
resp._code = 1;
} else {
resp._result = req._x / req._y;
}
break;
}
case '%': {
if (req._y == 0) {
resp._code = 2;
} else {
resp._result = req._x % req._y;
}
break;
}
default: resp._code = 3; break;
}
return resp;
}
~Calculate() {}
private:
};
protocol.hpp
用于处理网络通信中数据序列化和反序列化、编码和解码以及请求和响应对象生成的类和函数。
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <memory>
#include <jsoncpp/json/json.h>
namespace protocol_ns {
const std::string SEP = "\r\n";
std::string Encode(const std::string &json_str) {
int json_str_len = json_str.size();
std::string proto_str = std::to_string(json_str_len);
proto_str += SEP;
proto_str += json_str;
proto_str += SEP;
return proto_str;
}
std::string Decode(std::string &inbuffer) {
auto pos = inbuffer.find(SEP);
if (pos == std::string::npos) return std::string();
std::string len_str = inbuffer.substr(0, pos);
if (len_str.empty()) return std::string();
int packlen = std::stoi(len_str);
int total = packlen + len_str.size() + 2 * SEP.size();
if (inbuffer.size() < total) return std::string();
std::string package = inbuffer.substr(pos + SEP.size(), packlen);
inbuffer.erase(0, total);
return package;
}
class Request {
public:
Request() {}
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper) {}
bool Serialize(std::string* out) {
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter writer;
*out = writer.write(root);
return true;
}
bool DeSerialize(const std::string& in) {
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res) return false;
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
}
public:
int _x;
int _y;
char _oper;
};
class Response {
public:
Response() {}
Response(int result, int code) : _result(result), _code(code) {}
bool Serialize(std::string *out) {
Json::Value root;
root["result"] = _result;
root["code"] = _code;
Json::FastWriter writer;
*out = writer.write(root);
return true;
}
bool Deserialize(const std::string &in) {
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res) return false;
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
}
public:
int _result;
int _code;
};
class Factory {
public:
Factory() {
srand(time(nullptr) ^ getpid());
opers = "+-*/%^&|";
}
std::shared_ptr<Request> BuildRequest() {
int x = rand() % 10 + 1;
usleep(x * 10);
int y = rand() % 5;
usleep(y * x * 5);
char oper = opers[rand() % opers.size()];
std::shared_ptr<Request> req = std::make_shared<Request>(x, y, oper);
return req;
}
std::shared_ptr<Response> BuildResponse() {
return std::make_shared<Response>();
}
~Factory() {}
private:
std::string opers;
};
}
PackageParse.hpp
负责解析从连接中接收到的报文,处理这些报文,并将响应发送回客户端。
#pragma once
#include <iostream>
#include "Connection.hpp"
#include "ProToCol.hpp"
#include "CalCulate.hpp"
using namespace protocol_ns;
class PackageParse {
public:
static void Parse(Connection *conn) {
std::string package;
Request req;
Calculate cal;
while (true) {
package = Decode(conn->Inbuffer());
if (package.empty()) break;
std::cout << "------------------------begin---------------" << std::endl;
std::cout << "resq string:\n" << package << std::endl;
req.DeSerialize(package);
Response resp = cal.Excute(req);
std::string send_str;
resp.Serialize(&send_str);
std::cout << "resp Serialize:" << std::endl;
std::cout << send_str << std::endl;
send_str = Encode(send_str);
std::cout << "resp Encode:" << std::endl;
std::cout << send_str << std::endl;
conn->AppendOutBuffer(send_str);
}
if (!conn->OutbufferEmpty() && conn->_sender != nullptr) {
conn->_sender(conn);
conn->_R->EnableReadWrite(conn->Sockfd(), true, true);
}
}
};
Comm.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
enum {
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR,
EPOLL_CREATE_ERROR,
};
void SetNonBlock(int fd) {
int fl = ::fcntl(fd, F_GETFL);
if (fl < 0) return;
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
Connection.hpp
一个网络连接,用于在客户端和服务器之间传输数据。Connection 类与 Reactor 类一起工作,实现了事件驱动的网络编程模型。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "Reactor.hpp"
#include "InetAddr.hpp"
#include <unistd.h>
class Connection;
class Reactor;
using func_t = std::function<void(Connection *)>;
class Connection {
public:
Connection(int sock) : _sock(sock), _R(nullptr) {}
int Sockfd() { return _sock; }
void SetEvents(int events) { _events = events; }
uint32_t Events() { return _events; }
void Register(func_t recver, func_t sender, func_t excepter) {
_recver = recver;
_sender = sender;
_excepter = excepter;
}
void SetSelf(Reactor *R) { _R = R; }
void AppendInBuffer(const std::string &buff) { _inbuffer += buff; }
std::string& Inbuffer() { return _inbuffer; }
void AppendOutBuffer(const std::string &buff) { _outbuffer += buff; }
std::string &Outbuffer() { return _outbuffer; }
bool OutbufferEmpty() { return _outbuffer.empty(); }
void OutbufferRemove(int n) { _outbuffer.erase(0, n); }
void Close() {
if (_sock >= 0)
::close(_sock);
}
~Connection() {}
func_t _recver;
func_t _sender;
func_t _excepter;
Reactor *_R;
private:
int _sock;
std::string _inbuffer;
std::string _outbuffer;
InetAddr _addr;
uint32_t _events;
};
HandlerConnection.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Log.hpp"
#include "Connection.hpp"
class HandlerConnection {
public:
HandlerConnection(func_t func) : _func(func) {}
void HanlderRecv(Connection *conn) {
LOG(DEBUG, "HanlderRecv fd : %d\n", conn->Sockfd());
while (true) {
errno = 0;
char buffer[1024];
ssize_t n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);
if (n > 0) {
buffer[n] = 0;
conn->AppendInBuffer(buffer);
} else {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
break;
} else if (errno == EINTR) {
continue;
} else {
conn->_excepter(conn);
return;
}
}
}
_func(conn);
}
void HanlderSend(Connection *conn) {
errno = 0;
while (true) {
ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);
if (n > 0) {
conn->OutbufferRemove(n);
if (conn->OutbufferEmpty()) break;
} else if (n == 0) {
break;
} else {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
break;
} else if (errno == EINTR) {
continue;
} else {
conn->_excepter(conn);
return;
}
}
}
if (!conn->OutbufferEmpty()) {
conn->_R->EnableReadWrite(conn->Sockfd(), true, true);
} else {
conn->_R->EnableReadWrite(conn->Sockfd(), true, false);
}
}
void HanlderExcpet(Connection *conn) {
errno = 0;
LOG(DEBUG, "client quit : %d\n", conn->Sockfd());
conn->_R->RemoveConnection(conn->Sockfd());
}
private:
func_t _func;
};
Epoller.hpp
封装了 Linux 中 epoll 接口的使用,用于高效地管理大量并发网络连接或文件描述符的事件通知。
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include "Log.hpp"
#include "Comm.hpp"
static const int gsize = 128;
class Epoller {
private:
bool EventMethodCore(int fd, u_int32_t events, int type) {
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
int n = ::epoll_ctl(_epfd, type, fd, &ev);
if (n < 0) {
LOG(ERROR, "epoll_ctl error!\n");
return false;
}
LOG(DEBUG, "epoll_ctl add %d success!\n", fd);
return true;
}
public:
Epoller() {
_epfd = ::epoll_create(gsize);
if (_epfd < 0) {
LOG(FATAL, "epoll create error!\n");
exit(EPOLL_CREATE_ERROR);
}
LOG(FATAL, "epoll create success, epfd: %d\n", _epfd);
}
bool AddEvent(int fd, uint32_t events) {
return EventMethodCore(fd, events, EPOLL_CTL_ADD);
}
bool ModEvent(int fd, uint32_t events) {
return EventMethodCore(fd, events, EPOLL_CTL_MOD);
}
bool DelEvent(int fd) {
return ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
}
int Wait(struct epoll_event revs[], int num, int timeout) {
int n = ::epoll_wait(_epfd, revs, num, timeout);
return n;
}
~Epoller() {
if (_epfd >= 0)
::close(_epfd);
}
private:
int _epfd;
};
Listener.hpp
Listener 的类:用于在指定端口上监听并接受新的连接请求的。
#pragma once
#include <iostream>
#include <memory>
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "HandlerConnection.hpp"
using namespace socket_ns;
class Listener {
public:
Listener(int port, HandlerConnection &hc) : _port(port), _listensock(std::make_unique<TcpSocket>()), _hc(hc) {
InetAddr addr("0", _port);
_listensock->BuildListenSocket(addr);
}
void Accepter(Connection *conn) {
while (true) {
InetAddr clientaddr;
int code = 0;
int sockfd = _listensock->Accepter(&clientaddr, &code);
cout << sockfd << endl;
if (sockfd >= 0) {
conn->_R->AddConnection(sockfd, EPOLLIN | EPOLLET,
std::bind(&HandlerConnection::HanlderRecv, &_hc, std::placeholders::_1),
std::bind(&HandlerConnection::HanlderSend, &_hc, std::placeholders::_1),
std::bind(&HandlerConnection::HanlderExcpet, &_hc, std::placeholders::_1));
} else {
if (code == EWOULDBLOCK || code == EAGAIN) {
LOG(DEBUG, "accepter all link!\n");
break;
} else if (code == EINTR) {
LOG(DEBUG, "accepter interupt by signal!\n");
continue;
} else {
LOG(WARNING, "accept error!\n");
break;
}
}
}
}
int Sockfd() { return _listensock->SockFd(); }
~Listener() { _listensock->Close(); }
private:
uint16_t _port;
std::unique_ptr<Socket> _listensock;
HandlerConnection &_hc;
};
Reactor.hpp
一个使用 epoll 作为底层事件通知机制的网络服务器框架的核心部分。这个类管理着网络连接,并对这些连接上的事件进行监听和处理。
#pragma once
#include <iostream>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
class Reactor {
const static int gnum = 64;
public:
Reactor() : _isrunning(false) {}
void AddConnection(int fd, uint32_t events, func_t recver, func_t sender, func_t excepter) {
Connection *conn = new Connection(fd);
conn->SetEvents(events);
conn->Register(recver, sender, excepter);
conn->SetSelf(this);
_epller.AddEvent(conn->Sockfd(), conn->Events());
_connections.insert(std::make_pair(conn->Sockfd(), conn));
}
bool ConnectionIsExists(int sockfd) {
auto iter = _connections.find(sockfd);
return iter != _connections.end();
}
void EnableReadWrite(int sockfd, bool readable, bool writeable) {
uint32_t events = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
if (ConnectionIsExists(sockfd)) {
_connections[sockfd]->SetEvents(events);
_epller.ModEvent(sockfd, events);
}
}
void RemoveConnection(int sockfd) {
if (!ConnectionIsExists(sockfd)) return;
_epller.DelEvent(sockfd);
_connections[sockfd]->Close();
delete _connections[sockfd];
_connections.erase(sockfd);
}
void LoopOnce(int timeout) {
int n = _epller.Wait(revs, gnum, timeout);
for (int i = 0; i < n; i++) {
int sockfd = revs[i].data.fd;
uint32_t revents = revs[i].events;
if (revents & EPOLLHUP) revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLERR) revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLIN) {
if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_recver != nullptr)) {
_connections[sockfd]->_recver(_connections[sockfd]);
}
}
if (revents & EPOLLOUT) {
if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_sender != nullptr)) {
_connections[sockfd]->_sender(_connections[sockfd]);
}
}
}
}
void Dispatcher() {
_isrunning = true;
int timeout = 3000;
while (_isrunning) {
LoopOnce(timeout);
Debug();
}
_isrunning = false;
}
void Debug() {
std::cout << "------------------------------------" << std::endl;
for (auto& connection : _connections) {
std::cout << "fd : " << connection.second->Sockfd() << ", ";
uint32_t events = connection.second->Events();
if ((events & EPOLLIN) && (events & EPOLLET)) std::cout << "EPOLLIN | EPOLLET, ";
if ((events & EPOLLOUT) && (events & EPOLLET)) std::cout << "EPOLLOUT | EPOLLET";
std::cout << std::endl;
}
std::cout << "------------------------------------" << std::endl;
}
~Reactor() {}
private:
std::unordered_map<int, Connection *> _connections;
struct epoll_event revs[gnum];
Epoller _epller;
bool _isrunning;
};
Main.cc
#include <iostream>
#include <memory>
#include "Reactor.hpp"
#include "Connection.hpp"
#include "Listener.hpp"
#include "PackageParse.hpp"
#include "HandlerConnection.hpp"
#include "Log.hpp"
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cout << "Usage: " << argv[0] << " port" << std::endl;
return 0;
}
uint16_t port = std::stoi(argv[1]);
EnableScreen();
std::unique_ptr<Reactor> react = std::make_unique<Reactor>();
HandlerConnection hc(PackageParse::Parse);
Listener listener(port, hc);
react->AddConnection(listener.Sockfd(), EPOLLIN | EPOLLET,
std::bind(&Listener::Accepter, &listener, std::placeholders::_1),
nullptr, nullptr);
react->Dispatcher();
}
- 创建一个 Reactor 类的智能指针实例,这是主服务组件。
- 创建一个 HandlerConnection 对象 hc,它使用 PackageParse::Parse 函数来处理数据包的解析。
- 创建一个 Listener 对象 listener,负责监听指定端口上的连接请求。
- 通过 react->AddConnection() 方法,将监听套接字注册到 Reactor 中,设置监听事件为读事件(EPOLLIN)和边缘触发模式(EPOLLET)。
- 调用 react->Dispatcher() 开始事件分发循环,不断监听事件并调用相应的事件处理函数。