跳到主要内容C++ 仿 Muduo 库:Server 服务器模块实现(上) | 极客日志C++
C++ 仿 Muduo 库:Server 服务器模块实现(上)
C++ 仿 Muduo 库 Server 服务器模块实现,涵盖 Buffer 缓冲区、Socket 封装、Channel 事件描述符、Poller 模块及 EventLoop 事件循环。重点解析非阻塞 IO、epoll 水平/边缘触发、端口复用 SO_REUSEADDR、定时器轮盘 TimeWheel 及线程安全机制。通过完整代码示例展示模块整合与测试,解决 TIME_WAIT、宏污染、文件描述符生命周期等关键问题,构建高性能网络服务框架。
奶糖兔29 浏览 C++ 仿 Muduo 库:Server 服务器模块实现(上)
一、Buffer 模块
本质:缓冲区模板
功能:存储数据,取出数据
实现思想:
- 实现缓冲区得有一块内存空间,采用
vector<char>,vector 底层其实使用的是一个线性的内存空间。
- 要素:
- 默认的空间大小
- 当前的读取数据位置
- 当前的写入数据位置
- 操作:
- 写入数据:当前写入位置指向哪里,就从哪里开始写入。如果后续剩余空闲空间不够了,考虑缓冲区空闲空间是否足够(因为读位置也会向后偏移,也就是说前面也可能有空闲空间)。
- 足够:将数据移动到起始位置即可。
- 不够:扩容,从当前写位置开始扩容足够大小。
- 数据一旦写入成功,当前写位置就要往后偏移。
- 读取数据:当前读取位置指向哪里,就从哪里开始读取,前提是有数据可读。
设计如下:
class Buffer {
public:
private:
std::vector<char> _buffer;
uint64_t _read_idx;
uint64_t _write_idx;
};
代码如下:
class Buffer {
private:
char *Begin() { &*_buffer.(); }
{
(len <= ());
std::((), () + len, (*)data);
(len);
}
{
(len <= ) ;
(len);
*d = < *>(data);
std::(d, d + len, ());
(len);
}
{
(buf.(), buf.());
}
{
(str.(), str.());
}
:
( size = ) : _reader_idx(), _writer_idx() {
_buffer.(size);
}
{ () + _writer_idx; }
{ () + _reader_idx; }
{
(len <= ());
_reader_idx += len;
}
{
(len <= () + ());
_writer_idx += len;
}
{
*pos = ();
(pos == ) { ; }
(pos - () + );
}
{ _buffer.() - _writer_idx; }
{ _reader_idx; }
{ _writer_idx - _reader_idx; }
{
(() >= len) ;
(len <= () + ()) {
readable_size = ();
std::((), () + readable_size, ());
_writer_idx = readable_size;
_reader_idx = ;
} {
new_size = _buffer.() * ;
(new_size < len) { new_size *= ; }
_buffer.(new_size);
}
}
{
(data, len);
(len);
}
{
(str);
(str.());
}
{
(buf);
(buf.());
}
{
(buf, len);
(len);
}
{
(len <= ());
std::string str;
str.(len);
(&str[], len);
str;
}
{
std::string str = (len);
(len);
str;
}
{
*res = (*)std::((), , ());
res;
}
{
std::string str = ();
(str.());
str;
}
{
_buffer.();
_reader_idx = ;
_writer_idx = ;
}
:
std::vector<> _buffer;
_reader_idx;
_writer_idx;
};
return
begin
void ReadData(void *data, uint64_t len)
assert
ReadableSize
copy
GetReadPos
GetReadPos
char
MoveReadOffset
void WriteData(const void *data, uint64_t len)
if
0
return
EnsureWriteSpace
const
char
static_cast
const
char
copy
GetWritePos
MoveWriteOffset
void WriteBuffer(Buffer &buf)
return
WriteData
GetReadPos
ReadableSize
void WriteString(const std::string &str)
return
WriteData
c_str
size
public
Buffer
uint64_t
1024
0
0
resize
char *GetWritePos()
return
Begin
char *GetReadPos()
return
Begin
void MoveReadOffset(uint64_t len)
assert
ReadableSize
void MoveWriteOffset(uint64_t len)
assert
BufferHeadSize
BufferTailSize
std::string GetLine()
char
FindCRLF
if
nullptr
return
""
return
ReadAsString
ReadPos
1
uint64_t BufferTailSize()
return
size
uint64_t BufferHeadSize()
return
uint64_t ReadableSize()
return
void EnsureWriteSpace(uint64_t len)
if
BufferTailSize
return
if
BufferHeadSize
BufferTailSize
uint64_t
ReadableSize
copy
GetReadPos
GetReadPos
Begin
0
else
uint64_t
size
2
while
2
resize
void WriteAndPush(const void *data, uint64_t len)
WriteData
MoveWriteOffset
void WriteStringAndPush(const std::string &str)
WriteString
MoveWriteOffset
size
void WriteBufferAndPush(Buffer &buf)
WriteBuffer
MoveWriteOffset
ReadableSize
void ReadAndPop(void *buf, uint64_t len)
ReadData
MoveReadOffset
std::string ReadAsString(uint64_t len)
assert
ReadableSize
resize
ReadData
0
return
std::string ReadAsStringAndPop(uint64_t len)
ReadAsString
MoveReadOffset
return
char *FindCRLF()
char
char
memchr
GetReadPos
'\n'
ReadableSize
return
std::string GetLineAndPop()
GetLine
MoveReadOffset
size
return
void clear()
clear
0
0
private
char
uint64_t
uint64_t
int main() {
Buffer buffer;
for (int i = 0; i < 300; i++) {
std::string str = "hello world" + std::to_string(i) + "\n";
buffer.WriteStringAndPush(str);
}
while (buffer.GetReadableSize() > 0) {
std::string line = buffer.GetLineAndPop();
std::cout << "Line: " << line << std::endl;
}
return 0;
}
二、日志模块
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL -1
#define LOG(level, format, ...) do { \
if (level < LOG_LEVEL) break; \
time_t t = time(nullptr); \
struct tm* tm = localtime(&t); \
char buf[64]; \
strftime(buf, sizeof(buf) - 1, "%Y-%m-%d %H:%M:%S", tm); \
printf("%s [%s:%d] " format "\n", buf, __FILE__, __LINE__, ##__VA_ARGS__); \
} while (0)
#define LOG_INFO(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define LOG_DEBUG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define LOG_ERROR(format, ...) LOG(ERR, format, ##__VA_ARGS__)
三、套接字 Socket 设计
1. 代码实现
#define MAXLISTEN 1024
class Socket {
private:
bool Create() {
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0) {
LOG_ERROR("CREATE SOCKET ERROR");
return false;
}
return true;
}
bool Bind(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
LOG_ERROR("BIND SOCKET ERROR");
return false;
}
return true;
}
bool Listen(int backlog = MAXLISTEN) {
int ret = listen(_sockfd, backlog);
if (ret < 0) {
LOG_ERROR("LISTEN SOCKET ERROR");
return false;
}
return true;
}
bool Connect(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
LOG_ERROR("CONNECT SOCKET ERROR");
return false;
}
return true;
}
public:
Socket() : _sockfd(-1) {}
~Socket() { Close(); }
Socket(int sockfd) : _sockfd(sockfd) {}
Socket(const Socket&) = delete;
Socket& operator=(const Socket&) = delete;
int Fd() const { return _sockfd; }
int Accept() {
int newfd = accept(_sockfd, nullptr, nullptr);
if (newfd < 0) {
LOG_ERROR("ACCEPT SOCKET ERROR");
return -1;
}
return newfd;
}
ssize_t Recv(void* buf, size_t len, int flag = 0) {
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0) {
if (errno == EAGAIN || errno == EINTR) {
return 0;
}
LOG_ERROR("Recv SOCKET %s", strerror(errno));
return -1;
}
return ret;
}
ssize_t NonBlockRecv(void* buf, size_t len) {
return Recv(buf, len, MSG_DONTWAIT);
}
ssize_t Send(const void* buf, size_t len, int flag = 0) {
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) {
return 0;
}
LOG_ERROR("SEND SOCKET %s", strerror(errno));
return -1;
}
return ret;
}
ssize_t NonBlockSend(const void* buf, size_t len) {
if (len == 0) return 0;
return Send(buf, len, MSG_DONTWAIT);
}
void Close() {
if (_sockfd != -1) {
close(_sockfd);
_sockfd = -1;
}
}
void ReuseAddr() {
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(opt));
opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&opt, sizeof(opt));
}
void NonBlock() {
int flag = fcntl(_sockfd, F_GETFL, 0);
if (flag == -1) {
LOG_ERROR("GET SOCKET FLAG ERROR");
return;
}
int ret = fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
if (ret < 0) {
LOG_ERROR("SET SOCKET NONBLOCK ERROR");
return;
}
}
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool nonblock_flag = false) {
if (!Create()) return false;
if (nonblock_flag) NonBlock();
ReuseAddr();
if (!Bind(ip, port)) return false;
if (!Listen()) return false;
return true;
}
bool CreateClient(uint16_t port, const std::string &ip) {
if (!Create()) return false;
if (!Connect(ip, port)) return false;
return true;
}
private:
int _sockfd;
};
2. 细节处理
细节 1:处理 Recv 函数时,errno 的来源以及为啥不用 EWOULDBLOCK
errno 的来源:errno 是 C 标准库中定义的全局变量,用于存储系统调用或库函数失败时的错误码。当 recv 返回 -1 时,具体的错误原因通过 errno 传递。
EAGAIN 和 EWOULDBLOCK 的关系:在 Linux 系统中,两者的值相同(均为 11),定义在 /usr/include/asm-generic/errno-base.h 中。
细节 2:MSG_DONTWAIT 的概述
在 NonBlockRecv 和 NonBlockSend 函数中,使用了 MSG_DONTWAIT 标志来实现非阻塞接收。即使当前套接字本身是阻塞模式,也会让本次 recv 调用立即返回。如果此时接收缓冲区中没有数据,recv 会返回 -1,并将 errno 设置为 EAGAIN 或 EWOULDBLOCK。
细节 3:关于 ReuseAddr()
SO_REUSEADDR:安全复用,允许同一地址和端口被多个套接字绑定(常用于快速重启服务)。
SO_REUSEPORT:多进程共享端口,允许多个套接字绑定到完全相同的地址和端口(需所有套接字均设置此选项),用于负载均衡。
- 建议:若无需多进程/线程共享端口,仅保留
SO_REUSEADDR。
为什么默认不允许端口复用?
每个 TCP/UDP 连接由五元组唯一标识。如果多个 socket 绑定到相同的地址和端口,系统将无法判断哪个 socket 应该处理新连接。
- 用途:快速重启服务,避免 "Address already in use" 错误。
- 原理:允许绑定到已被其他 socket 使用的地址,但前提是原 socket 已关闭或也设置了该选项。
SO_REUSEPORT:允许多个 socket 同时绑定到相同地址和端口
- 用途:负载均衡,高并发场景。
- 原理:多个 socket 可以绑定到相同的地址和端口,内核负责将连接请求分发给各个 socket。
细节 4:宏污染
由于最开始的时候,日志实现代码和测试代码都用了相同的局部变量 char buf。虽然从语法上看,宏中的 buf 是局部变量,不会影响外部的 buf,但在某些编译器或特定优化条件下,栈内存的布局可能会导致 buf 被意外覆盖。建议修改宏内部变量名以避免冲突。
四、Channel 类设计
- 事件管理:描述符是否可读写,对描述符的监控可读可写,解除事件监控。
- 事件触发后处理的管理:需要处理的事件(可读,可写,挂断,错误,任意),事件处理的回调函数。
epoll 进行事件监控:EPOLLIN(可读),EPOLLOUT(可写),EPOLLRDHUP(连接断开),EPOLLPRI(优先数据),EPOLLERR(出错),EPOLLHUP(挂断)。
1. 代码实现
class Channel {
public:
using EventCallback = std::function<void()>;
explicit Channel(Poller* poller, int fd) : _fd(fd), _events(0), _revents(0), _poller(poller) {}
~Channel() {
if (_fd != -1) {
close(_fd);
_fd = -1;
}
}
int Fd() const { return _fd; }
uint32_t Events() const { return _events; }
bool ReadAble() const { return (_events & EPOLLIN); }
bool WriteAble() const { return (_events & EPOLLOUT); }
void SetReadCallback(const EventCallback& cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback& cb) { _write_callback = cb; }
void SetCloseCallback(const EventCallback& cb) { _close_callback = cb; }
void SetErrorCallback(const EventCallback& cb) { _error_callback = cb; }
void SetEventCallback(const EventCallback& cb) { _event_callback = cb; }
void SetREvents(uint32_t events) { _revents = events; }
void EnableRead() { _events |= EPOLLIN; Update(); }
void EnableWrite() { _events |= EPOLLOUT; Update(); }
void DisableRead() { _events &= ~EPOLLIN; Update(); }
void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
void DisableAll() { _events = 0; Update(); }
void Remove() { return _poller->RemoveEvent(this); }
void Update() { return _poller->UpdateEvent(this); }
void HandleEvent() {
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
if (_event_callback) _event_callback();
if (_read_callback) _read_callback();
} else if (_revents & EPOLLOUT) {
if (_event_callback) _event_callback();
if (_write_callback) _write_callback();
} else if (_revents & EPOLLERR) {
if (_error_callback) _error_callback();
} else if (_revents & EPOLLHUP) {
if (_close_callback) _close_callback();
}
}
private:
int _fd;
uint32_t _events;
uint32_t _revents;
Poller* _poller;
EventCallback _read_callback;
EventCallback _write_callback;
EventCallback _close_callback;
EventCallback _error_callback;
EventCallback _event_callback;
};
2. 细节处理
细节 1:在 HandleEvent 函数中使用 if-else if 结构而非多个独立的 if
使用 if-else if 是为了保障资源安全、明确事件优先级,并避免因同时处理多个事件导致的未定义行为。错误和挂起事件的优先级更高,且互斥处理可避免访问已释放对象。
五、Poller 模块实现
意义:通过 epoll 实现对描述符的 IO 事件监控。
- 添加 / 修改描述符的事件监控。
- 移除描述符的事件监控。
- 必须拥有一个 epoll 的操作句柄。
- 拥有一个
struct epoll_event 的结构数组,监控时保存所有的活跃事件。
- 使用 hash 表管理描述符与描述符对应的事件管理
Channel 对象。
1. 代码实现
#define MAX_EPOLLER_EVENTS 1024
class Poller {
private:
void Update(Channel* channel, int op) {
int fd = channel->Fd();
struct epoll_event event;
event.data.fd = fd;
event.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &event);
if (ret < 0) {
LOG_ERROR("EPOLL_CTL ERROR");
}
return;
}
bool HasChannel(Channel* channel) {
return _channels.find(channel->Fd()) != _channels.end();
}
public:
Poller() {
_epfd = epoll_create1(EPOLL_CLOEXEC);
if (_epfd < 0) {
LOG_ERROR("EPOLL_CREATE ERROR");
abort();
}
}
Poller(const Poller&) = delete;
Poller& operator=(const Poller&) = delete;
void UpdateEvent(Channel* channel) {
bool ret = HasChannel(channel);
if (!ret) {
_channels.insert(std::make_pair(channel->Fd(), channel));
return Update(channel, EPOLL_CTL_ADD);
}
return Update(channel, EPOLL_CTL_MOD);
}
void RemoveEvent(Channel* channel) {
auto it = _channels.find(channel->Fd());
if (it != _channels.end()) {
_channels.erase(it);
}
Update(channel, EPOLL_CTL_DEL);
}
void Poll(std::vector<Channel*>* active) {
int nfds = epoll_wait(_epfd, _events, MAX_EPOLLER_EVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) {
return;
}
LOG_ERROR("EPOLL_WAIT ERROR: %s\n", strerror(errno));
abort();
}
for (int i = 0; i < nfds; ++i) {
int fd = _events[i].data.fd;
auto it = _channels.find(fd);
assert(it != _channels.end());
Channel* channel = it->second;
channel->SetREvents(_events[i].events);
active->push_back(channel);
}
}
private:
int _epfd;
struct epoll_event _events[MAX_EPOLLER_EVENTS];
std::unordered_map<int, Channel*> _channels;
};
2. 细节处理
细节 1:epoll 是水平触发(LT)还是边缘触发(ET)?区别是什么?
答案:默认是 LT,LT 在数据未处理完时会持续通知;ET 仅在状态变化时通知一次,需配合非阻塞 I/O 使用。
细节 2:Poll 方法返回的活跃事件是如何处理的?
答案:遍历 epoll_wait 返回的事件,填充到 active 列表中,并设置 Channel 的 _revents。
细节 3:Poller 是否支持多线程同时调用 Poll 方法?
答案:不支持,需通过锁或每个线程使用独立的 epoll 实例。
细节 4:epoll_wait 的超时时间为何设置为 -1?是否合理?
答案:-1 表示无限等待,适合服务器模型;但需根据业务需求调整,如设置超时处理定时任务。
六、EventLoop 模块实现
1. 关于 eventfd 函数
eventfd 是 Linux 提供的一种轻量级的进程间通信(IPC)机制,用于在进程或线程之间传递事件通知。
1.1 函数概述
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
initval: 初始化计数器的值。
flags: 常见标志包括 EFD_CLOEXEC, EFD_NONBLOCK, EFD_SEMAPHORE。
1.2 代码示例
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/eventfd.h>
int main() {
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0) {
perror("eventfd");
return -1;
}
uint64_t val = 1;
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
uint64_t res = 0;
read(efd, &res, sizeof(res));
printf("res = %lu\n", res);
close(efd);
return 0;
}
1.3 使用场景及注意事项
- 线程间同步:实现生产者 - 消费者模型或信号量机制。
- 事件通知:在多线程或多进程环境中,用于通知某些事件的发生。
- 与
epoll 配合:将 eventfd 文件描述符加入 epoll,用于事件驱动的程序中。
2. EventLoop 模块概述
EventLoop:进行事件监控,以及事件处理的模块(关键点:这个模块和线程是一一对应的)。
- 在线程中对描述符进行事件监控。
- 有描述符就绪则对描述符进行事件处理。
- 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务执行。
class EventLoop {
public:
using Functor = std::function<void()>;
EventLoop() : _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()),
_event_channel(new Channel(this, _event_fd)) {
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
_event_channel->EnableRead();
}
bool IsInLoop() { return _thread_id == std::this_thread::get_id(); }
void UpdateEvent(Channel* channel) {
assert(IsInLoop());
_poller.UpdateEvent(channel);
}
void RemoveEvent(Channel* channel) {
assert(IsInLoop());
_poller.RemoveEvent(channel);
}
void RunAllTask() {
std::vector<Functor> tasks;
{
std::lock_guard<std::mutex> lock(_mutex);
tasks.swap(_tasks);
}
for (auto& t : tasks) {
t();
}
return;
}
static int CreateEventFd() {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (efd < 0) {
LOG_ERROR("CREATE EVENTFD ERROR");
abort();
}
return efd;
}
void ReadEventFd() {
uint64_t data = 0;
ssize_t ret = read(_event_fd, &data, sizeof(data));
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) {
return;
}
LOG_ERROR("READ EVENTFD ERROR");
abort();
}
return;
}
void WakeupEventFd() {
uint64_t data = 1;
ssize_t ret = write(_event_fd, &data, sizeof(data));
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) {
return;
}
LOG_ERROR("WRITE EVENTFD ERROR");
abort();
}
return;
}
void Start() {
std::vector<Channel*> actives;
_poller.Poll(&actives);
for (auto& channel : actives) {
channel->HandleEvent();
}
RunAllTask();
}
void QueueInLoop(const Functor& cb) {
{
std::lock_guard<std::mutex> lock(_mutex);
_tasks.emplace_back(cb);
}
WakeupEventFd();
}
void RunInLoop(const Functor& cb) {
if (IsInLoop()) {
cb();
} else {
QueueInLoop(cb);
}
}
private:
std::thread::id _thread_id;
int _event_fd;
std::unique_ptr<Channel> _event_channel;
Poller _poller;
std::vector<Functor> _tasks;
std::mutex _mutex;
};
3. 与 TimeWheel 模块整合
将定时器任务与事件循环绑定,确保定时器回调在 EventLoop 线程中执行。
class TimerWheel {
public:
void AddTimer(uint64_t id, uint32_t delay, const TaskFunc& cb) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void RefreshTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void CancelTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}
private:
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
void TimerRefreshInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) return;
PtrTask pt = it->second.lock();
if (!pt) return;
int remaining = pt->DelayTime();
int pos = (_tick + remaining) % _capacity;
_wheel[pos].push_back(pt);
}
void TimerCancelInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it != _timers.end()) _timers.erase(it);
}
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
auto& tasks = _wheel[_tick];
for (auto& task : tasks) {
if (!task->_canceled) task->_cb();
task->_release();
}
tasks.clear();
}
void OnTime() {
int times = ReadTimerfd();
for (int i = 0; i < times; ++i) {
RunTimerTask();
}
}
static int CreateTimerfd() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0) {
LOG_ERROR("Create timerfd error");
abort();
}
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
timerfd_settime(timerfd, 0, &itime, nullptr);
return timerfd;
}
int ReadTimerfd() {
uint64_t times = 0;
ssize_t ret = read(_timerfd, ×, sizeof(times));
if (ret < 0) {
LOG_ERROR("READ TIMERFD ERROR");
abort();
}
return times;
}
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
const int _capacity;
int _tick;
int _timerfd;
std::unique_ptr<Channel> _timer_channel;
EventLoop* _loop;
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<uint64_t, WeakTask> _timers;
};
4. 细节分析
细节 1:定时器任务中异步执行回调
如果在非事件循环线程中调用了 RemoveEvent 或 Channel::Remove(),而 EventLoop 的所有操作都要求必须在事件循环线程中执行(通过 assert(IsInLoop()) 检查),会导致程序崩溃。因此,定时器回调必须在 EventLoop 线程中执行。
细节 2:服务器端关闭再启动的文件描述符(fd)不变
Linux 系统中,文件描述符的分配遵循 '最小可用原则' 。当一个 fd 被关闭后,它会被标记为'可重用',下次分配新文件或 socket 时会优先使用这些被释放的 fd。
细节 3:Channel 类中的 Remove 和 Update 方法为何调用 EventLoop 的接口?
职责分离:Channel 仅负责事件注册,Poller 负责底层 I/O 事件监控。通过 EventLoop 统一管理事件增删改,确保事件状态一致性。
细节 4:如何避免定时器任务的重复添加?
HasTimer 检查:在添加定时任务前调用 HasTimer(id) 避免重复。
- 刷新替代新增:若定时任务已存在,调用
RefreshTimer(id) 延迟销毁时间。
- 线程安全:所有定时器操作通过
EventLoop 串行化执行。
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online