一、Reactor 模式

Reactor 模式是一种事件处理设计模式,用于处理多个并发输入事件。它通过事件驱动的方式,将事件分发给相应的处理程序,从而实现对并发事件的高效处理。Reactor 模式广泛应用于网络编程、服务器框架等领域,例如 Java 的 NIO、Netty 框架,以及 C++ 的 Boost.Asio 等。
Reactor 模式的核心思想
Reactor 模式的核心思想是:
- 事件驱动:通过事件循环(Event Loop)监听多个事件源(多个文件描述符)(如 Socket、文件描述符等),该事件驱动器可以采用 select,poll,epoll 等。
- 事件分发:当事件发生时,Reactor 将事件分发给对应的事件处理器(Event Handler)。
- 非阻塞:Reactor 模式通常与非阻塞 I/O 结合使用,避免线程阻塞,所以需要将 Socket、文件描述符等通过 fcntl 函数设置为非阻塞状态。
Reactor 模式的组成
Reactor 模式通常由以下几个组件组成:
-
Loop(反应器 / 事件循环)
- 负责监听事件源(多个文件描述符)(如 Socket、文件描述符等)。
- 当事件发生时,将事件分发给对应的事件处理器。
-
Dispatcher(事件多路分发器)
- 当事件发生时,通知 Event Handler 进行处理。
-
Event Handler(事件处理器)
- 定义处理事件的接口。
- 每个事件源对应一个事件处理器。

二、案例使用——基于 Reactor 的计算器
封装 epoll
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Common.hpp"
using namespace LogModule;
namespace EpollMoudle {
class Epoller {
public:
Epoller() : _epfd(-1) {}
void Init() {
_epfd = epoll_create(256);
if (_epfd < 0) {
LOG(LogLevel::ERROR) << "epoll_create error";
exit(EPOLL_CREATE_ERR);
}
LOG(LogLevel::INFO) << "epoll_create success, epfd: " << _epfd;
}
int Wait(struct epoll_event revs[], int num, int timeout) // 输出就绪的 fd 和 events
{
int n = epoll_wait(_epfd, revs, num, timeout);
if (n < 0) {
LOG(LogLevel::WARNING) << "epoll_wait error";
}
return n;
}
void Ctrl(int sockfd, uint32_t events, int flag) {
struct epoll_event ev;
ev.events = events;
ev.data.fd = sockfd;
int n = epoll_ctl(_epfd, flag, sockfd, &ev);
if (n < 0) {
LOG(LogLevel::WARNING) << "epoll_ctl error";
}
}
void Add(int sockfd, uint32_t events) { Ctrl(sockfd, events, EPOLL_CTL_ADD); }
void Update(int sockfd, uint32_t events) { Ctrl(sockfd, events, EPOLL_CTL_MOD); }
void Delete(int sockfd) {
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
if (n < 0) {
LOG(LogLevel::WARNING) << "epoll_ctl error";
}
}
~Epoller() {}
private:
int _epfd;
};
}
封装 Connection
所有的连接里都包含回指指针指向自己的 Reactor 模型
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <functional>
#include "InetAddr.hpp"
class Reactor; // 循环依赖的问题
// 普通 fd,Listensockfd
// 让对 fd 的处理方式采用同一种方式
// 描述一个连接
class Connection {
public:
Connection() : _sockfd(-1), _events(0) {
// 自动获得当前系统的时间戳
}
void UpdateTime() {
// 更新时间,重新获取时间
}
void SetPeerInfo(const InetAddr &peer_addr) { _peer_addr = peer_addr; }
void SetSockfd(int sockfd) { _sockfd = sockfd; }
int Sockfd() { return _sockfd; }
void SetEvents(uint32_t events) { _events = events; }
uint32_t GetEvents() { return _events; }
void SetOwner(Reactor *owner) { _owner = owner; }
Reactor* GetOwner() { return _owner; }
void Append(const std::string &in) // 把收到的数据添加到自己的接受缓冲区
{ _inbuffer += in; }
void AppendToOut(const std::string &out) { _outbuffer += out; }
void DiscardOutString(int n) { _outbuffer.erase(0, n); }
bool isOutBufferEmpty() { return _outbuffer.empty(); }
std::string &OutString() { return _outbuffer; }
std::string &InBuffer() // 故意
{ return _inbuffer; }
void Close() {
if (_sockfd >= 0) close(_sockfd);
}
// 回调方法
virtual void Sender() = 0;
virtual void Recver() = 0;
virtual void Excepter() = 0;
~Connection() {}
private:
int _sockfd; // 每一个文件描述符都有自己的输入输出缓冲区
std::string _inbuffer;
std::string _outbuffer;
InetAddr _peer_addr; // 对应哪一个客户端
// 添加一个指针
Reactor *_owner;
// 我关心的事件
uint32_t _events; // 我这个 connection 关心的事件
// lastmodtime
uint64_t _timestamp;
};
封装 Reactor
#pragma once
#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
using namespace EpollMoudle;
using connection_t = std::shared_ptr<Connection>;
class Reactor {
const static int event_num = 64;
private:
bool IsConnectionExists(int sockfd) {
return _connections.find(sockfd) == _connections.end() ? false : true;
}
public:
Reactor() : _isrunning(false), _epoller(std::make_unique<Epoller>()) {
_epoller->Init();
}
void InsertConnection(connection_t conn) {
auto iter = _connections.find(conn->Sockfd());
if (iter == _connections.end()) {
// 1. 把连接,放到 unordered_map 中进行管理
_connections.insert(std::make_pair(conn->Sockfd(), conn));
// 2. 把新插入进来的连接,写透到内核的 epoll 中
_epoller->Add(conn->Sockfd(), conn->GetEvents());
// 3. 设置关联关系,让 connection 回指当前对象
conn->SetOwner(this);
}
}
void EnableReadWrite(int sockfd, bool readable, bool writeable) {
if (IsConnectionExists(sockfd)) {
// 修改用户层 connection 的事件
uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
_connections[sockfd]->SetEvents(events);
// 写透到内核中
_epoller->Update(sockfd, _connections[sockfd]->GetEvents());
}
}
void DelConnection(int sockfd) {
if (IsConnectionExists(sockfd)) {
// 1. 从内核中移除对 sockfd 的关心
_epoller->Delete(sockfd);
// 2. 关闭特定的文件描述符
_connections[sockfd]->Close();
// 3. 从_connectionns 中移除对应的 connection
_connections.erase(sockfd);
}
}
// 基于事件驱动的事件派发器
void Dispatcher(int n) {
for (int i = 0; i < n; i++) {
// 开始进行派发,派发给指定的模块
int sockfd = _revs[i].data.fd;
uint32_t revents = _revs[i].events;
if ((revents & EPOLLERR) || (revents & EPOLLHUP)) revents = (EPOLLIN | EPOLLOUT); // 异常事件,转换成为读写事件
if ((revents & EPOLLIN) && IsConnectionExists(sockfd)) {
_connections[sockfd]->Recver();
}
if ((revents & EPOLLOUT) && IsConnectionExists(sockfd)) {
_connections[sockfd]->Sender();
}
}
}
void LoopOnce(int timeout) {
int n = _epoller->Wait(_revs, event_num, timeout);
Dispatcher(n);
}
void DebugPrint() {
std::cout << "Epoller 管理的 fd: ";
for (auto &iter : _connections) {
std::cout << iter.first << " ";
}
std::cout << std::endl;
}
void Loop() {
_isrunning = true;
// int timeout = -1;
int timeout = 1000;
while (_isrunning) {
LoopOnce(timeout);
DebugPrint();
// 超时管理
// 简单的,遍历_connections 判断当前时间 - connection 的最近访问时间 > XXX
// 超时了
}
_isrunning = false;
}
void Stop() {
_isrunning = false;
}
~Reactor() {}
private:
std::unique_ptr<Epoller> _epoller;
std::unordered_map<int, connection_t> _connections; // fd: Connection 服务器内部所有的连接
bool _isrunning;
struct epoll_event _revs[event_num];
};
封装 Listener(专门负责获取连接的模块)
#pragma once
#include <iostream>
#include <memory>
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "IOService.hpp"
#include "Reactor.hpp"
#include "Protocol.hpp"
#include "Calculator.hpp"
using namespace SocketModule;
using namespace LogModule;
// 专门负责获取连接的模块
// 连接管理器
class Listener : public Connection {
public:
Listener(uint16_t port) : _listensock(std::make_unique<TcpSocket>()), _port(port) {
_listensock->BuildTcpSocketMethod(_port);
SetSockfd(_listensock->Fd());
SetEvents(EPOLLIN | EPOLLET);
}
virtual void Sender() override {}
// 我们回调到这里 天然就有父类 connection
virtual void Recver() override {
// 读就绪,而且是 listensock 就绪
// IO 处理 --- 获取新连接
// 你怎么知道 一次来的就是一个连接呢 怎么保证一次读完了 ET 工作模式 !
while (true) {
InetAddr peer;
int aerrno = 0;
// accept 非阻塞的时候,就是 IO,我们就像处理 read 一样,处理 accept
int sockfd = _listensock->Accepter(&peer, &aerrno);
if (sockfd > 0) {
// success
// 不能直接读取 !
// sockfd 添加到 epoll !
// epollserver 只认 connection
LOG(LogLevel::DEBUG) << "Accepter success: " << sockfd;
// 普通的文件描述符,处理 IO 的 也是 connection
// 2. sockfd 包装成为 Connection !
auto conn = std::make_shared<IOService>(sockfd);
conn->RegisterOnMessage(HandlerRequest);
// 3. 插入到 EpollServer
GetOwner()->InsertConnection(conn);
} else {
if (aerrno == EAGAIN || aerrno == EWOULDBLOCK) {
LOG(LogLevel::DEBUG) << "accepter all connection ... done";
break;
} else if (aerrno == EINTR) {
LOG(LogLevel::DEBUG) << "accepter intr by signal, continue";
continue;
} else {
LOG(LogLevel::DEBUG) << "accepter error ... Ignore";
break;
}
}
}
}
virtual void Excepter() override {}
int Sockfd() { return _listensock->Fd(); }
~Listener() {
_listensock->Close();
}
private:
std::unique_ptr<Socket> _listensock;
uint16_t _port;
};
封装 IOService(只负责 IO 的)
#pragma once
#include <iostream>
#include <memory>
#include <functional>
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "Reactor.hpp"
using func_t = std::function<std::string(std::string &)>;
using namespace SocketModule;
using namespace LogModule;
// 只负责 IO
class IOService : public Connection {
static const int size = 1024;
public:
IOService(int sockfd) {
// 1. 设置文件描述符非阻塞
SetNonBlock(sockfd);
SetSockfd(sockfd);
SetEvents(EPOLLIN | EPOLLET);
}
virtual void Sender() override {
// UpdateTime();
// 直接写
while (true) {
ssize_t n = send(Sockfd(), OutString().c_str(), OutString().size(), 0);
if (n > 0) {
// 成功
DiscardOutString(n); // 移除 N 个
} else if (n == 0) {
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 缓冲区写满了,下次再来
break;
} else if (errno == EINTR) {
continue;
} else {
Excepter();
return;
}
}
}
// 一种:outbuffer empty
// 一种:发送缓冲区写满了 && outbuffer 没有 empty 写条件不满足 使能 sockfd 在 epoll 中的事件
if (!isOutBufferEmpty()) {
// 修改对 sockfd 的读事件关心!--开启对写事件关心
// 按需设置!
GetOwner()->EnableReadWrite(Sockfd(), true, true);
// 读事件一般常设 写事件一般按需设置
} else {
GetOwner()->EnableReadWrite(Sockfd(), true, false);
// 按需设置 关闭了 !!!
}
}
virtual void Recver() override {
// UpdateTime();
// 1. 读取所有数据
while (true) // ET 模式
{
char buffer[size];
ssize_t s = recv(Sockfd(), buffer, sizeof(buffer) - 1, 0); // 非阻塞
if (s > 0) {
buffer[s] = 0; // 读取成功
Append(buffer);
} else if (s == 0) {
// 对端关闭连接
Excepter();
return;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else if (errno == EINTR) {
continue;
} else {
// 发生错误了
Excepter();
return;
}
}
}
// 走到下面,我一定把本轮数据读完了
std::cout << "outbuffer: \n" << InBuffer() << std::endl;
// 你能确保你读到的消息,就是一个完整的报文吗
// 我怎么知道 读到了完整的请求呢?? 协议 ! ! !
std::string result;
if (_on_message) result = _on_message(InBuffer());
// 添加应答信息
AppendToOut(result);
// 如何处理写的问题 outbuffer 发送给对方的问题
if (!isOutBufferEmpty()) {
// 方案一:Sender(); // 直接发送,推荐做法
// Sender();
// 方案二:使能 writeable 即可
GetOwner()->EnableReadWrite(Sockfd(), true, true);
}
}
virtual void Excepter() override {
// IO 读取的时候,所有的异常处理,全部都会转化成为这一个函数的调用
// 出异常 怎么做?
// 打印日志 差错处理 关闭连接,Reactor 异常 connection,从内核中,移除对 fd 的关心
LOG(LogLevel::INFO) << "客户端连接可能结束,进行异常处理:" << Sockfd();
GetOwner()->DelConnection(Sockfd());
}
void RegisterOnMessage(func_t on_message) {
_on_message = on_message;
}
~IOService() {}
private:
func_t _on_message;
};
Main.cc
#include <iostream>
#include <string>
#include "Log.hpp"
#include "Listener.hpp"
#include "Connection.hpp"
#include "Reactor.hpp"
using namespace LogModule;
int main(int argc, char *argv[]) {
ENABLE_CONSOLE_LOG();
if (argc != 2) {
std::cout << "Usage: " << argv[0] << " port" << std::endl;
return 0;
}
uint16_t local_port = std::stoi(argv[1]);
Reactor reactor;
auto conn = std::make_shared<Listener>(local_port);
reactor.InsertConnection(conn);
reactor.Loop();
return 0;
}
三、复盘分析
多路转接对写的处理
- 写事件是否就绪:发送缓冲器,是否有空间!
有空间。sockfd,发送缓冲区中的空间,默认就是有的!!!,只有当他被填写满了,写条件才不具备
- 如何正确处理写入:直接写 (不会被阻塞),写缓冲区可能会越来越短,写满了,写条件不具备,才托管给
epoll,让它帮我关心!!!
- 直接写!写入默认就是就绪的 !
- 写入失败,才托管给
epoll! - 多路转接的方案设计的时候,写事件关心,永远不能常开启。
- 写事件关心,按需设置 !
- 读事件一般常设,写事件一般按需设置 !
- 只要使能开启写,
内核+epoll会自动事件派发处理底层的发送任务。 - 从此往后,我们专注于业务即可 !
最佳实践 One Thread One loop
一个 fd (Connection) 的全生命周期,只能由一个线程统一管理,这样就不会存在任何 (过多) 并发问题

🧊一个
loop就是 一个Reactor

四、总结
以上就是我对 【Linux】高性能网络模式:Reactor 反应堆模式 的理解。


