跳到主要内容C++ 仿 Muduo 库实战:Server 服务器模块实现(上) | 极客日志C++
C++ 仿 Muduo 库实战:Server 服务器模块实现(上)
C++ 网络编程实战,基于 Muduo 架构设计 Server 模块。涵盖 Buffer 内存管理、Socket 封装与端口复用细节、Channel 事件监控逻辑、Poller 层 epoll 集成,以及 EventLoop 线程安全机制与定时器任务调度。重点解析非阻塞 IO、事件驱动模型及跨模块协作流程,提供完整代码实现与关键问题排查思路。
云间运维1 浏览 C++ 仿 Muduo 库实战:Server 服务器模块实现(上)

一、Buffer 模块设计
本质:缓冲区模板类。
功能:负责数据的存储与取出,是网络 IO 读写的基础。
核心思想:
- 内存空间:底层使用
std::vector<char> 管理线性内存。
- 关键要素:
- 默认初始大小。
- 当前读取位置偏移量(
_read_idx)。
- 当前写入位置偏移量(
_write_idx)。
- 操作逻辑:
- 写入数据:从写位置开始写入。若剩余空闲不足,需判断是否可复用读后空闲区(前移数据)或直接扩容。
- 读取数据:从读位置开始读取,可读大小为
写位置 - 读位置。
接口设计:
class Buffer {
public:
private:
std::vector<char> _buffer;
uint64_t _read_idx;
uint64_t _write_idx;
};
代码实现:
class Buffer {
private:
char* Begin { &*_buffer.(); }
{
(len <= ());
std::((), () + len, (*)data);
(len);
}
{
(len <= ) ;
(len);
* d = < *>(data);
std::(d, d + len, ());
(len);
}
{
(buf.(), buf.());
}
{
(str.(), str.());
}
:
( size = ) : _reader_idx(), _writer_idx() {
_buffer.(size);
}
{ () + _writer_idx; }
{ () + _reader_idx; }
{
(len <= ());
_reader_idx += len;
}
{
(len <= () + ());
_writer_idx += len;
}
{
* pos = ();
(pos == ) ;
(pos - () + );
}
{ _buffer.() - _writer_idx; }
{ _reader_idx; }
{ _writer_idx - _reader_idx; }
{
(() >= len) ;
(len <= () + ()) {
readable_size = ();
std::((), () + readable_size, ());
_writer_idx = readable_size;
_reader_idx = ;
} {
new_size = _buffer.() * ;
(new_size < len) new_size *= ;
_buffer.(new_size);
}
}
{
(data, len);
(len);
}
{
(str);
(str.());
}
{
(buf);
(buf.());
}
{
(buf, len);
(len);
}
{
(len <= ());
std::string str;
str.(len);
(&str[], len);
str;
}
{
std::string str = (len);
(len);
str;
}
{
* res = (*)std::((), , ());
res;
}
{
std::string str = ();
(str.());
str;
}
{
_buffer.();
_reader_idx = ;
_writer_idx = ;
}
:
std::vector<> _buffer;
_reader_idx;
_writer_idx;
};
()
return
begin
void ReadData(void* data, uint64_t len)
assert
ReadableSize
copy
GetReadPos
GetReadPos
char
MoveReadOffset
void WriteData(const void* data, uint64_t len)
if
0
return
EnsureWriteSpace
const
char
static_cast
const
char
copy
GetWritePos
MoveWriteOffset
void WriteBuffer(Buffer& buf)
WriteData
GetReadPos
ReadableSize
void WriteString(const std::string& str)
WriteData
c_str
size
public
Buffer
uint64_t
1024
0
0
resize
char* GetWritePos()
return
Begin
char* GetReadPos()
return
Begin
void MoveReadOffset(uint64_t len)
assert
ReadableSize
void MoveWriteOffset(uint64_t len)
assert
BufferHeadSize
BufferTailSize
std::string GetLine()
char
FindCRLF
if
nullptr
return
""
return
ReadAsString
ReadPos
1
uint64_t BufferTailSize()
return
size
uint64_t BufferHeadSize()
return
uint64_t ReadableSize()
return
void EnsureWriteSpace(uint64_t len)
if
BufferTailSize
return
if
BufferHeadSize
BufferTailSize
uint64_t
ReadableSize
copy
GetReadPos
GetReadPos
Begin
0
else
uint64_t
size
2
while
2
resize
void WriteAndPush(const void* data, uint64_t len)
WriteData
MoveWriteOffset
void WriteStringAndPush(const std::string& str)
WriteString
MoveWriteOffset
size
void WriteBufferAndPush(Buffer& buf)
WriteBuffer
MoveWriteOffset
ReadableSize
void ReadAndPop(void* buf, uint64_t len)
ReadData
MoveReadOffset
std::string ReadAsString(uint64_t len)
assert
ReadableSize
resize
ReadData
0
return
std::string ReadAsStringAndPop(uint64_t len)
ReadAsString
MoveReadOffset
return
char* FindCRLF()
char
char
memchr
GetReadPos
'\n'
ReadableSize
return
std::string GetLineAndPop()
GetLine
MoveReadOffset
size
return
void clear()
clear
0
0
private
char
uint64_t
uint64_t
int main() {
Buffer buffer;
for (int i = 0; i < 300; i++) {
std::string str = "hello world" + std::to_string(i) + "\n";
buffer.WriteStringAndPush(str);
}
while (buffer.GetReadableSize() > 0) {
std::string line = buffer.GetLineAndPop();
std::cout << "Line: " << line << std::endl;
}
return 0;
}
二、日志模块
这里采用宏定义方式简化日志输出,注意宏中的变量作用域问题。
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL -1
#define LOG(level, format, ...) do { \
if (level < LOG_LEVEL) break; \
time_t t = time(nullptr); \
struct tm* tm = localtime(&t); \
char buf[64]; \
strftime(buf, sizeof(buf) - 1, "%Y-%m-%d %H:%M:%S", tm); \
printf("%s [%s:%d] " format "\n", buf, __FILE__, __LINE__, ##__VA_ARGS__); \
} while (0)
#define LOG_INFO(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define LOG_DEBUG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define LOG_ERROR(format, ...) LOG(ERR, format, ##__VA_ARGS__)
三、套接字 Socket 设计
Socket 层主要负责封装系统调用,屏蔽底层细节。
1. 代码实现
#define MAXLISTEN 1024
class Socket {
private:
bool Create() {
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0) {
LOG_ERROR("CREATE SOCKET ERROR");
return false;
}
return true;
}
bool Bind(const std::string& ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
LOG_ERROR("BIND SOCKET ERROR");
return false;
}
return true;
}
bool Listen(int backlog = MAXLISTEN) {
int ret = listen(_sockfd, backlog);
if (ret < 0) {
LOG_ERROR("LISTEN SOCKET ERROR");
return false;
}
return true;
}
bool Connect(const std::string& ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
LOG_ERROR("CONNECT SOCKET ERROR");
return false;
}
return true;
}
public:
Socket() : _sockfd(-1) {}
~Socket() { Close(); }
Socket(int sockfd) : _sockfd(sockfd) {}
Socket(const Socket&) = delete;
Socket& operator=(const Socket&) = delete;
int Fd() const { return _sockfd; }
int Accept() {
int newfd = accept(_sockfd, nullptr, nullptr);
if (newfd < 0) {
LOG_ERROR("ACCEPT SOCKET ERROR");
return -1;
}
return newfd;
}
ssize_t Recv(void* buf, size_t len, int flag = 0) {
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0) {
if (errno == EAGAIN || errno == EINTR) return 0;
LOG_ERROR("Recv SOCKET %s", strerror(errno));
return -1;
}
return ret;
}
ssize_t NonBlockRecv(void* buf, size_t len) {
return Recv(buf, len, MSG_DONTWAIT);
}
ssize_t Send(const void* buf, size_t len, int flag = 0) {
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) return 0;
LOG_ERROR("SEND SOCKET %s", strerror(errno));
return -1;
}
return ret;
}
ssize_t NonBlockSend(const void* buf, size_t len) {
if (len == 0) return 0;
return Send(buf, len, MSG_DONTWAIT);
}
void Close() {
if (_sockfd != -1) {
close(_sockfd);
_sockfd = -1;
}
}
void ReuseAddr() {
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(opt));
opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&opt, sizeof(opt));
}
void NonBlock() {
int flag = fcntl(_sockfd, F_GETFL, 0);
if (flag == -1) {
LOG_ERROR("GET SOCKET FLAG ERROR");
return;
}
int ret = fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
if (ret < 0) {
LOG_ERROR("SET SOCKET NONBLOCK ERROR");
return;
}
}
bool CreateServer(uint16_t port, const std::string& ip = "0.0.0.0", bool nonblock_flag = false) {
if (!Create()) return false;
if (nonblock_flag) NonBlock();
ReuseAddr();
if (!Bind(ip, port)) return false;
if (!Listen()) return false;
return true;
}
bool CreateClient(uint16_t port, const std::string& ip) {
if (!Create()) return false;
if (!Connect(ip, port)) return false;
return true;
}
private:
int _sockfd;
};
2. 代码检测
int main() {
Socket lst_sock;
lst_sock.CreateServer(8080);
while (1) {
int newfd = lst_sock.Accept();
if (newfd < 0) continue;
Socket cli_sock(newfd);
char buf[1024] = {0};
int ret = cli_sock.Recv(buf, 1023);
if (ret < 0) {
cli_sock.Close();
continue;
}
cli_sock.Send(buf, ret);
cli_sock.Close();
}
lst_sock.Close();
return 0;
}
int main() {
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
cli_sock.Send("Hello IsLand", strlen("Hello IsLand"));
char buf[1024] = {0};
cli_sock.Recv(buf, 1023);
LOG_DEBUG("recv data: %s", buf);
cli_sock.Close();
return 0;
}
3. 细节处理
细节 1:处理 Recv 函数时,errno 的来源以及为啥不用 EWOULDBLOCK
- errno 来源:
errno 是 C 标准库全局变量,用于存储系统调用失败时的错误码。recv 返回 -1 时,具体原因通过 errno 获取。
- EAGAIN vs EWOULDBLOCK:在 Linux 中两者值相同(均为 11),通常视为等价。当非阻塞 socket 无数据时,返回
EAGAIN。
细节 2:MSG_DONTWAIT 的概述
- 这是
recv 系统调用的标志位,用于临时启用非阻塞模式。即使 socket 本身是阻塞的,加上此标志也会立即返回。若无数据,返回 -1 并设 errno 为 EAGAIN。
细节 3:关于 ReuseAddr()
- SO_REUSEADDR:允许重用本地地址和端口,常用于服务快速重启,避免
TIME_WAIT 导致绑定失败。
- SO_REUSEPORT:允许多个套接字绑定到完全相同的地址和端口,用于多进程负载均衡。
- 建议:普通服务仅保留
SO_REUSEADDR;高并发场景再考虑 SO_REUSEPORT。
为什么默认不允许端口复用?
TCP/IP 协议规定一个 IP+ 端口组合只能被一个 socket 绑定,防止冲突。但 SO_REUSEADDR 提供了例外规则:
- 原 socket 已关闭。
- 新 socket 设置了
SO_REUSEADDR。
此时内核认为安全,允许复用。
TIME_WAIT 问题:
服务关闭后立即重启常报 bind: Address already in use。这是因为 TCP 连接进入 TIME_WAIT 状态(约 60 秒)。设置 SO_REUSEADDR 可绕过此限制。
细节 4:宏污染
日志宏中定义的局部变量 char buf[64] 可能与外部同名变量冲突。虽然作用域不同,但在特定栈布局下可能导致意外覆盖。建议宏内变量名加前缀或避免使用常见名。
四、Channel 类设计
- 事件管理:监控可读、可写、挂断等状态。
- 回调管理:事件触发后执行对应的回调函数。
EPOLLIN:可读
EPOLLOUT:可写
EPOLLRDHUP:连接断开
EPOLLPRI:优先数据
EPOLLERR:出错
EPOLLHUP:挂断
1. 代码实现
class Channel {
public:
using EventCallback = std::function<void()>;
explicit Channel(int fd) : _fd(fd), _events(0), _revents(0) {}
~Channel() {
if (_fd != -1) {
close(_fd);
_fd = -1;
}
}
int Fd() const { return _fd; }
uint32_t Events() const { return _events; }
bool ReadAble() const { return (_events & EPOLLIN); }
bool WriteAble() const { return (_events & EPOLLOUT); }
void SetReadCallback(const EventCallback& cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback& cb) { _write_callback = cb; }
void SetCloseCallback(const EventCallback& cb) { _close_callback = cb; }
void SetErrorCallback(const EventCallback& cb) { _error_callback = cb; }
void SetEventCallback(const EventCallback& cb) { _event_callback = cb; }
void SetREvents(uint32_t events) { _revents = events; }
void EnableRead() { _events |= EPOLLIN; }
void EnableWrite() { _events |= EPOLLOUT; }
void DisableRead() { _events &= ~EPOLLIN; }
void DisableWrite() { _events &= ~EPOLLOUT; }
void DisableAll() { _events = 0; }
void Remove() {}
void Update() {}
void HandleEvent() {
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
if (_event_callback) _event_callback();
if (_read_callback) _read_callback();
} else if (_revents & EPOLLOUT) {
if (_event_callback) _event_callback();
if (_write_callback) _write_callback();
} else if (_revents & EPOLLERR) {
if (_error_callback) _error_callback();
} else if (_revents & EPOLLHUP) {
if (_close_callback) _close_callback();
}
}
private:
int _fd;
uint32_t _events;
uint32_t _revents;
EventCallback _read_callback;
EventCallback _write_callback;
EventCallback _close_callback;
EventCallback _error_callback;
EventCallback _event_callback;
};
2. 细节处理
细节 1:HandleEvent 中使用 if-else if 结构
使用 if-else if 是为了保障资源安全和明确事件优先级。
- 优先级:错误 (
EPOLLERR) 和挂起 (EPOLLHUP) 优先级更高,应优先处理,避免在无效连接上继续读写。
- 资源安全:写事件处理可能触发连接关闭,若后续继续处理其他事件可能导致访问已释放对象。
else if 能阻断后续逻辑。
五、Poller 模块实现
意义:通过 epoll 实现对描述符的 IO 事件监控。
- 添加/修改描述符的事件监控。
- 移除描述符的事件监控。
- 拥有 epoll 句柄
_epfd。
- 维护活跃事件数组
_events。
- 使用 hash 表管理
fd -> Channel 映射。
1. 代码实现
#define MAX_EPOLLER_EVENTS 1024
class Poller {
private:
void Update(Channel* channel, int op) {
int fd = channel->Fd();
struct epoll_event event;
event.data.fd = fd;
event.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &event);
if (ret < 0) LOG_ERROR("EPOLL_CTL ERROR");
}
bool HasChannel(Channel* channel) {
return _channels.find(channel->Fd()) != _channels.end();
}
public:
Poller() {
_epfd = epoll_create1(EPOLL_CLOEXEC);
if (_epfd < 0) {
LOG_ERROR("EPOLL_CREATE ERROR");
abort();
}
}
Poller(const Poller&) = delete;
Poller& operator=(const Poller&) = delete;
void UpdateEvent(Channel* channel) {
bool ret = HasChannel(channel);
if (!ret) {
_channels.insert(std::make_pair(channel->Fd(), channel));
return Update(channel, EPOLL_CTL_ADD);
}
return Update(channel, EPOLL_CTL_MOD);
}
void RemoveEvent(Channel* channel) {
auto it = _channels.find(channel->Fd());
if (it != _channels.end()) _channels.erase(it);
Update(channel, EPOLL_CTL_DEL);
}
void Poll(std::vector<Channel*>* active) {
int nfds = epoll_wait(_epfd, _events, MAX_EPOLLER_EVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) return;
LOG_ERROR("EPOLL_WAIT ERROR: %s\n", strerror(errno));
abort();
}
for (int i = 0; i < nfds; ++i) {
int fd = _events[i].data.fd;
auto it = _channels.find(fd);
assert(it != _channels.end());
Channel* channel = it->second;
channel->SetREvents(_events[i].events);
active->push_back(channel);
}
}
private:
int _epfd;
struct epoll_event _events[MAX_EPOLLER_EVENTS];
std::unordered_map<int, Channel*> _channels;
};
2. 细节处理
- LT vs ET:默认水平触发(LT),数据未读完会持续通知;边缘触发(ET)仅在状态变化时通知一次,需配合非阻塞 IO。
- Poll 返回值:遍历
epoll_wait 结果,填充 active 列表并设置 Channel 的 _revents。
- 多线程支持:不支持多线程同时调用
Poll,需每个线程独立实例或使用锁。
- 超时时间 -1:表示无限等待,适合服务器模型,但需结合定时任务机制调整。
3. 与 Channel 的整合测试
需修改 Channel 以持有 Poller 指针,实现 Remove 和 Update 的委托调用。
#include "../../source/server.hpp"
void HandleClose(Channel* channel) {
std::cout << "HandleClose: " << channel->Fd() << std::endl;
channel->Remove();
delete channel;
}
void HandleRead(Channel* channel) {
int fd = channel->Fd();
char buf[1024] = {0};
ssize_t ret = recv(fd, buf, 1023, 0);
if (ret <= 0) return HandleClose(channel);
std::cout << buf << std::endl;
channel->EnableWrite();
}
void HandleWrite(Channel* channel) {
int fd = channel->Fd();
const char* data = "I miss You";
ssize_t ret = send(fd, data, strlen(data), 0);
if (ret < 0) return HandleClose(channel);
channel->DisableWrite();
}
void HandleError(Channel* channel) { return HandleClose(channel); }
void HandlEvent(Channel* channel) { std::cout << "有了一个事件" << std::endl; }
void Acceptor(Poller* poller, Channel* lst_channel) {
int fd = lst_channel->Fd();
int newfd = accept(fd, nullptr, nullptr);
if (newfd < 0) return;
Channel* channel = new Channel(poller, newfd);
channel->SetReadCallback(std::bind(HandleRead, channel));
channel->SetWriteCallback(std::bind(HandleWrite, channel));
channel->SetCloseCallback(std::bind(HandleClose, channel));
channel->SetErrorCallback(std::bind(HandleError, channel));
channel->SetEventCallback(std::bind(HandlEvent, channel));
channel->EnableRead();
}
int main() {
Poller poller;
Socket lst_sock;
lst_sock.CreateServer(8080);
Channel channel(&poller, lst_sock.Fd());
channel.SetReadCallback(std::bind(Acceptor, &poller, &channel));
channel.EnableRead();
while (1) {
std::vector<Channel*> actives;
poller.Poll(&actives);
for (auto& a : actives) a->HandleEvent();
}
lst_sock.Close();
return 0;
}
六、EventLoop 模块实现
1. 关于 eventfd 函数
eventfd 是 Linux 提供的轻量级 IPC 机制,用于进程间事件通知。它像一个计数器,支持读写操作。
initval:初始计数值。
flags:如 EFD_CLOEXEC(exec 时关闭)、EFD_NONBLOCK(非阻塞)。
write:累加计数器。
read:读取并清零(或减 1),若为 0 且非阻塞则返回 EAGAIN。
2. EventLoop 模块概述
EventLoop 负责事件监控及处理,关键点在于线程亲和性:一个连接的所有操作必须在同一个线程中执行,以避免线程安全问题。
- 在线程中监控描述符。
- 有就绪事件则处理。
- 所有就绪事件处理完后,执行任务队列中的任务。
唤醒机制:
为防止 IO 阻塞导致任务队列无法执行,引入 eventfd。当有新任务时,向 eventfd 写入数据,唤醒 epoll_wait。
class EventLoop {
public:
using Functor = std::function<void()>;
void RunAllTask() {
std::vector<Functor> tasks;
{
std::lock_guard<std::mutex> lock(_mutex);
tasks.swap(_tasks);
}
for (auto& t : tasks) t();
}
static int CreateEventfd() {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (efd < 0) {
LOG_ERROR("CREATE EVENTFD ERROR");
abort();
}
return efd;
}
void ReadEventFd() {
uint64_t data = 0;
ssize_t ret = read(_event_fd, &data, sizeof(data));
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) return;
LOG_ERROR("READ EVENTFD ERROR");
abort();
}
}
void WakeupEventFd() {
uint64_t data = 1;
ssize_t ret = write(_event_fd, &data, sizeof(data));
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) return;
LOG_ERROR("WRITE EVENTFD ERROR");
abort();
}
}
EventLoop() : _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventfd()),
_event_channel(new Channel(this, _event_fd)) {
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
_event_channel->EnableRead();
}
bool IsInLoop() { return _thread_id == std::this_thread::get_id(); }
void UpdateEvent(Channel* channel) {
assert(IsInLoop());
_poller.UpdateEvent(channel);
}
void RemoveEvent(Channel* channel) {
assert(IsInLoop());
_poller.RemoveEvent(channel);
}
void Start() {
std::vector<Channel*> actives;
_poller.Poll(&actives);
for (auto& channel : actives) channel->HandleEvent();
RunAllTask();
}
void QueueInLoop(const Functor& cb) {
{
std::lock_guard<std::mutex> lock(_mutex);
_tasks.emplace_back(cb);
}
WakeupEventFd();
}
void RunInLoop(const Functor& cb) {
if (IsInLoop()) {
cb();
} else {
QueueInLoop(cb);
}
}
private:
std::thread::id _thread_id;
int _event_fd;
std::unique_ptr<Channel> _event_channel;
Poller _poller;
std::vector<Functor> _tasks;
std::mutex _mutex;
};
3. 与 TimeWheel 模块整合
定时器任务需绑定到 EventLoop,确保回调在事件循环线程中执行。
- 绑定关系:
TimerWheel 依赖 EventLoop 实例。
- 事件驱动:通过
timerfd 的可读事件触发 OnTime。
- 线程安全:利用
RunInLoop 保证操作串行化。
class EventLoop {
public:
void AddTimer(uint64_t id, uint32_t delay, const TaskFunc& cb) {
return _timer_wheel.AddTimer(id, delay, cb);
}
void RefreshTimer(uint64_t id) {
return _timer_wheel.RefreshTimer(id);
}
void CancelTimer(uint64_t id) {
return _timer_wheel.CancelTimer(id);
}
private:
TimerWheel _timer_wheel;
};
class TimerWheel {
public:
void AddTimer(uint64_t id, uint32_t delay, const TaskFunc& cb) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void RefreshTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void CancelTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerCanceInLoop, this, id));
}
private:
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
void OnTime() {
int times = ReadTimerfd();
for (int i = 0; i < times; ++i) RunTimerTask();
}
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
auto& tasks = _wheel[_tick];
for (auto& task : tasks) {
if (!task->_canceled) task->_cb();
task->_release();
}
tasks.clear();
}
static int CreateTimerfd() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0) {
LOG_ERROR("Create timerfd error");
abort();
}
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
timerfd_settime(timerfd, 0, &itime, nullptr);
return timerfd;
}
int ReadTimerfd() {
uint64_t times = 0;
ssize_t ret = read(_timerfd, ×, sizeof(times));
if (ret < 0) {
LOG_ERROR("READ TIMERFD ERROR");
abort();
}
return times;
}
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
const int _capacity;
int _tick;
int _timerfd;
std::unique_ptr<Channel> _timer_channel;
EventLoop* _loop;
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<uint64_t, WeakTask> _timers;
};
4. 代码测试
#include "../../source/server.hpp"
void HandleClose(Channel* channel) {
LOG_DEBUG("close fd: %d", channel->Fd());
channel->Remove();
delete channel;
}
void HandleRead(Channel* channel) {
int fd = channel->Fd();
char buf[1024] = {0};
ssize_t ret = recv(fd, buf, 1023, 0);
if (ret <= 0) return HandleClose(channel);
LOG_DEBUG("Read: %s", buf);
channel->EnableWrite();
}
void HandleWrite(Channel* channel) {
int fd = channel->Fd();
const char* data = "I miss You";
ssize_t ret = send(fd, data, strlen(data), 0);
if (ret < 0) return HandleClose(channel);
channel->DisableWrite();
}
void HandleError(Channel* channel) { return HandleClose(channel); }
void HandlEvent(EventLoop* loop, Channel* channel, uint64_t timerid) {
loop->RefreshTimer(timerid);
}
void Acceptor(EventLoop* loop, Channel* lst_channel) {
int fd = lst_channel->Fd();
int newfd = accept(fd, nullptr, nullptr);
if (newfd < 0) return;
uint64_t timerid = rand() % 1000;
Channel* channel = new Channel(loop, newfd);
channel->SetReadCallback(std::bind(HandleRead, channel));
channel->SetWriteCallback(std::bind(HandleWrite, channel));
channel->SetCloseCallback(std::bind(HandleClose, channel));
channel->SetErrorCallback(std::bind(HandleError, channel));
channel->SetEventCallback(std::bind(HandlEvent, loop, channel, timerid));
loop->AddTimer(timerid, 5, std::bind(HandleClose, channel));
channel->EnableRead();
}
int main() {
srand(time(nullptr));
EventLoop loop;
Socket lst_sock;
lst_sock.CreateServer(8080);
Channel channel(&loop, lst_sock.Fd());
channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));
channel.EnableRead();
while (1) {
loop.Start();
}
lst_sock.Close();
return 0;
}
5. 细节分析
细节 1:定时器任务中异步执行回调
之前实现中若在析构函数中直接 detach 子线程执行回调,会导致回调在非 EventLoop 线程中运行,进而触发 assert(IsInLoop()) 失败。正确做法是将回调提交到 EventLoop 的任务队列中执行。
细节 2:服务器端关闭再启动的文件描述符(fd)不变
Linux 遵循'最小可用原则'分配 fd。只要连接关闭时正确调用 close(fd) 并释放 Channel 对象,系统会回收 fd 并在下次连接时复用。无需担心 fd 冲突。
细节 3:Channel 类中的 Remove 和 Update 方法为何调用 EventLoop 的接口?
职责分离:Channel 仅负责事件注册,Poller 负责底层监控,EventLoop 统一管理增删改,确保状态一致性。
细节 4:如何避免定时器任务的重复添加?
通过 HasTimer 检查或刷新机制(RefreshTimer)替代新增,确保线程安全。
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online