项目--协程库(C++)模块解析篇
参考:sylar的协程库
GitHub链接:Cookies-CGQ/myCoroutine: 协程库,支持协程调度、定时器、事件调度功能
协程库模块的详解与代码分析
需要的类
- thread:这个模块的意义主要是使用多线程配合多协程更好地利用多核CPU的资源
- fiber:负责协程的创建、执行、暂停等真正运行任务的地方
- scheduler:调度器,负责调度协程的执行和暂停等,无需用户手动执行和暂停,做到自动调度
- ioscheduler:io+scheduler,协程库是需要使用在服务器上的,服务器上的fd都配合不了,那不就成了一个玩具项目,此类使用epoll监听fd上绑定的读写事件,当读写事件触发将其放入到调度器中等待调度
- timer:服务器定时器,用于定时执行任务,所以此类完成定时器的创建、删除、取消,使用最小堆的结果,将超时定时器触发作为固定信号tickle来触发ioscheduler等待的epoll_wait
- hook:hook+ioscheduler才能完全体现出一个非阻塞的服务器框架,虽然前面实现了协程的调度,但是每个系统调用中,不去改变函数内部结构无法做出协程的挂起和恢复,也就是sleep(1)该睡多久还是多久,无法体现我们使用协程的优势,所以就将使用hook改变原始函数增添内容,更好的搭配上我们的ioscheduler,将其sleep(1)作为定时任务,放入到time中的定时器堆中等待超时触发,后唤醒epoll去放入到调度器中执行
模块之间的协作
- fiber 是任务的执行单元,所有任务运行在其中
- scheduler 管理 fiber 的执行顺序,实现协程的挂起和恢复
- ioscheduler 扩展了 scheduler 的功能,支持IO事件触发调度
- timer 为 scheduler 和 ioscheduler 提供定时任务支持
- hook 改变系统调用行为,使其与协程框架更好地配合
Thread模块
在Thread模块分为两个部分:Semaphore类和Thread类
Semaphore类 -- condition_variable、semaphore
C++11的condition_variable(条件变量):condition_variable是C++11提供的一种线程间的同步机制,主要用于在多线程环境下实现条件等待(wait)和通知(notify),从而协调线程之间的执行顺序;条件变量通常用于需要等待某个条件成立之后才能继续执行的场景,一般需要配合C++11的互斥锁std::unique_lock一起使用。
关于C++11的条件变量详细可以参考文档:并发支持库 (C++11 起) - cppreference.cn - C++参考手册
semaphore信号量:在操作系统中我们学习过进程间通信其中信号量pv操作,一般P操作代表减去公共共享的资源,V操作代表增加上减去的资源,可以简单理解为是一个-1,+1操作;其中PV操作还可以实现同步或者互斥,比如共享资源只有1个那么经过P操作后剩余0个,此时其他进程或者线程想要获取资源是不行的要等到执行完V操作后才能获取。同步则是共享资源为0时因为P操作无法执行,只能等待V操作先添加资源后才能继续,这个就是同步。
对于semaphore,C++20才开始支持,这里我们利用条件变量来自己编写一个信号量,代码参考后续。
对于条件变量和信号量,除了C++库的实现,linux系统调用本身也有实现,如果感兴趣可以自己去了解一下,这里我们条件变量使用C++11的实现,信号量使用条件变量自己实现。
linux下条件变量的系统调用:Linux -- 线程、锁_linux thread-ZEEKLOG博客
linux下信号量的系统调用:Linux -- 进程间通信(IPC)-- 进程间通信、管道、system V 共享内存、system V 消息队列、责任链模式 、system V 信号量、建造者模式、IPC资源管理_system()管道-ZEEKLOG博客
Thread模块代码
thread.h:
thread.cc:
模块测试:
Fiber模块
fiber类提供了协程的基本功能,包括创建、管理、切换和销毁协程;它使用了ucontext_t结构(主要使用非对称协程)保存和恢复协程的上下文,并通过std::function来存储协程的执行逻辑。
协程的实现模型协程的状态:准备(READY)、正在运行(RUNNING)、运行结束(TERM)
状态转换:READY <=> RUNNING -> TERM
有栈协程:每个协程都分配一部分内存空间,每个协程都具有独立的执行栈(栈可重复使用,避免重复开辟空间的开销)。非对称协程:当resume时,恢复协程运行;当yield时,暂停执行让出执行权;而对于非对称协程,由主协程(或调度协程)resume子协程将执行权交给子协程,子协程yield时将子协程的执行权让出还给主协程(或调度协程)。也就是说没有实现嵌套的协程,也就是协程和子协程的调度无法实现,本项目的任务工作主要是由子协程去完成,但是在使用协程之前必须调用一次Getthis()来初始化主协程和调度协程。协程的结束没有专门的操作,协程函数运行结束时即结束,协程结束时会自动调用一次yield从子协程返回到主协程。
同一个线程,存在一个主协程、一个调度协程、多个子协程;
注意:调度协程也可以是主协程,初始化一个主协程时,调度协程默认是主协程。
| 协程类型 | 核心职责 | 关键定位 |
|---|---|---|
| 主协程 (Main Fiber) | 1. 程序入口:通常是线程的起点,如 main 函数所在协程。2. 初始化调度器:负责创建调度器实例并配置参数。 3. 提交任务:将需要执行的异步任务(子协程)添加到调度器的任务队列中。 4. 启动/停止调度:触发调度开始,并在适当时机安全地停止调度器,等待所有任务完成。 | 线程的管理者和调度任务的发起者,不直接参与任务的具体调度与执行,而是负责整个异步流程的掌控。 |
| 调度协程 (Scheduler Fiber) | 1. 任务循环:核心工作是运行一个循环,不断从共享任务队列中取出待执行的子协程。 2. 协程切换:执行权在调度协程和子协程之间来回切换。它唤醒一个子协程执行,待子协程挂起或完成后,再将执行权收回,继续调度下一个。 3. 空闲处理:当任务队列为空时,会切到一个特殊的 idle协程 上空转或等待,避免忙等消耗CPU。 | 线程内部的调度中心,是子协程得以被轮询执行的根本驱动力,确保任务被有序、高效地执行。 |
| 子协程 (Task Fiber) | 1. 执行具体任务:每个子协程封装一段具体的业务逻辑,如计算、网络请求等。 2. 协作式让出:在执行过程中,主动通过 yield 关键字让出执行权,以便调度协程可以调度其他任务,从而实现协作式并发。3. 动态添加任务:一个子协程在执行过程中,还可以向调度器添加新的子协程任务。 |
Fiber模块代码
fiber.h:
构造函数和析构函数:无参构造:用于初始化当前线程的协程功能,构造线程的主协程对象,以及对线程局部存储的变量t_fiber、t_thread_fiber、t_scheduler_fiber进行赋值。这个无参构造被定位成私有的成员方法,不允许在类外部调用,只能通过GetThis()方法调用,在返回当前正在运行的协程时,如果发现当前线程的主协程未被初始化,那就用不带参数的构造函数初始化主协程。因为GetThis()兼具初始化主协程的功能,在使用协程之前必须调用一次GetThis()初始化主协程。带参构造:用于构造子协程,初始化子协程的ucontext_t上下文和栈空间,要求传入协程的入口函数,以及可选协程栈大小,构造函数负责分配栈内存空间,并且后面是否使用调度器的bool类型变量。析构函数:减少活跃的协程的数量,判断是否有栈,有独立栈的肯定是子协程所以此时直接判断协程的状态并释放空间。
协程的切换:在我们实现的协程中,根据协程的_m_runInScheduler状态来决定是否被调度器调度,resume可以分为两种情况:
1、_m_runInScheduler为true,即为使用了调度器,代表此时这个子协程需要调度协程参与调度,SetThis(this)用来设置此时运行的协程是子协程,然后使用swapcontext来保存此时线程局部存储的调度协程的变量的上下文,然后切换到子协程的上下文去做任务。
2、_m_runInScheduler为false,即为这个子协程不需要调度协程参与调度,而是主协程代替调度协程这个角色,去完成后续过程,就是保存主协程的上下文,切换到子协程的上下文,这个过程就不需要调度协程进行参与了。yield也可以分为两种情况,与resume相似:
1、_m_runInScheduler为true,那就是保存子协程的上下文,将执行权切换到调度协程。
2、_m_runInScheduler为false,那就是保存子协程的上下文,将执行权切换到主协程。
Reset协程资源的复用:主要目的:复用一个已经终止的协程对象,从而避免频繁创建和销毁对象带来的开销,通过重置协程的状态和任务,可以在执行新的任务时重新利用这个协程对象。具体使用场景:例如有一个协程池,管理了一批协程对象,每当有新任务需要执行时,从池中取出一个空闲的协程对象并重置。
MainFunc协程入口函数:这个函数封装了入口函数,可以实现协程在结束时自动重置资源并执行yield的操作。首先调用了GetThis()获取正在运行的协程,正常来说正在运行执行任务的是子协程(因为resume的时候调用了SetThis()),所以子协程对象运行其函数对象_m_cb才是真正执行任务的地方,然后运行完之后需要让其状态变为TERM代表结束,然后使用raw_ptr去调用yield,避免潜在的生命周期管理的问题。为什么需要raw_ptr去调用yield,避免潜在的生命周期管理的问题?因为使用shared_ptr的curr去调用yield,那么在yield返回之前,这个shared_ptr的引用计数是不会减少的,因此协程对象无法在合适的事件销毁。
fiber.cc:
fiber模块测试:
Scheduler模块
在fiber模块,协程的调度都是由用户进行resume或yield的,这就好比让用户充当了调度器的工作,显然是不够灵活的。引入了协程调度后,则可以先创建一个协程调度器,然后把这些要调度的协程传递给协程调度器,让其一个个消耗。
调度器的实现模型
调度算法:
调度器主要工作就是调度,那就要有调度算法,项目使用的是FIFO先来先服务算法,例如上面fiber模块测试时候test_fiberOfScheduler()里的简单调度器就是FIFO,当然这个简答调度器是一种特殊情况,这是只有一个线程的调度器的情况,然而我们即将实现的调度器是支持多线程的。
调度器任务的定义:
对于协程调度器来说,协程和函数都是需要调度的任务,为什么函数也是?因为协程本身就是函数和函数运行状态的组合,所以协程可以看作是一个函数,所以普通函数也是可以被协程调度器调度的,但实际上运行中还是要把函数包装成协程去运行,协程调度器的实现重点还是以协程为主。
支持多线程(即为多个调度线程):一个线程同一时刻只能运行一个协程,即便一个线程里有很多个协程,所以为了协程调度器的效率,使用多线程多协程,这样同一时刻有多个线程多个协程同时运行,效率高于单线程。所以协程调度器里必须支持线程池的创建。既然多线程能提高效率,那么,能不能把调度器所在的线程(称为caller线程)也加入进来作为调度线程(实际进行调度任务的线程)呢?比如典型地,在main函数中定义的调度器,能不能把main函数所在的线程也用来执行任务?
首先肯定是没问题的,毕竟创建线程也是需要一定的开销,而且实际上做事的只是调度协程和子协程,此时完全可以让caller线程参与调度,省去了创建调度线程的时间和效率。
调度器的运行:
调度器创建后,内部首先会创建一个调度线程池,调度开始后,所有调度线程按顺序从任务队列里取任务执行,调度线程数越多,能够同时调度的任务也就越多,当所有任务都调度完后,调度线程就停止下来等新的任务进来。
添加调度任务:
添加调度任务的本质就是往调度器的任务队列里塞任务,但是只添加任务是不够的,还要有一种方式用于通知调度线程有新的任务加进来了,因为调度线程并不一定知道有新任务进来。当然可以考虑不断轮询有没有新任务,但是这样cpu占用率会很高。
调度器的停止:
调度器应该支持停止调度的功能,以便回收调度线程的资源,只有当所有的调度线程都结束后,调度器才算真正的停止。
总结:
调度器内部维护一个任务队列和一个调度线程池。开始调度后,线程池从任务队列里按顺序取任务执行。调度线程可以包含caller(调度器的线程也可也参与调度)。当全部的任务执行完成后,线程池停止调度,等待新的任务到来。当新的任务到来,通知线程池重新开始调度,然后线程池重新开始运行调度。停止调度时,各调度线程退出,调度器停止工作。
调度时的协程切换问题:
1、当主线程不参与调度时,即_use_caller(主线程是否参与调度)为false时,就必须要创建其他线程进行协程调度:
因为有单独的线程用于协程的调度(调度器线程或者main主线程),那只需要让调度线程的入口函数作为调度协程,从任务队列里取任务执行就行了;main函数(main主线程和调度器线程)不参与协程的调度,所以只需要将任务添加到调度器中即可,在适当的时机停止调度器,当调度器停止时,main函数要等待调度线程结束后再退出。
注意:主线程和调度器线程不参与调度(不作为调度线程)并且其主协程、调度协程、子协程也不参与调度(协程不参与任何任务调度和任务执行),它做的只是创建了调度线程,然后这个新线程启动后会运行调度器的主循环,负责从任务队列中取出任务并根据_m_runIScheduler的状态判断此时的是true还是false,如果是true此时就是新线程的调度协程和子协程进行上下文的切换,如果是false就是新线程的主协程和子协程的上下文切换。
2、当主线程也参与调度时,即为_use_caller为true时,可以是多线程,也可以是单线程,多线程时的切换和上面一样,但是不一定需要创建另外一个线程作为调度协程的入口函数,现在主线程也可以参与调度,可以看作调度协程去与子协程切换,并且还负责了任务的分配和调度器的停止工作。
主线程参与调度的情况下,单线程的切换:
也就是说main函数线程要运行的协程有三类:main函数对应的主协程调度协程待调度的任务子协程
这三类协程的运行顺序大概是这样的:main函数主协程运行,创建调度器main函数继续运行,向调度器添加任务开始协程调度,main函数除了要添加任务还有适当时机停止调度器外,主协程让出执行权到调度协程,调度协程从任务队列里按顺序执行所有的任务(执行任务切换到子协程)每次执行一个任务,调度协程都要让出执行权,再切换到该任务的协程里去执行,执行结束之后,还要切回调度协程,继续下一个任务的调度所有任务都执行完之后,调度协程还要让出执行权并切回main函数的主协程,以保证程序的顺利结束总体的过程: main创建调度器->添加任务->主协程->调度协程->从任务队列按顺序拿取任务->执行权切换->子协程->所有任务执行完->调度协程->主协程。
关于协程的上下文保存:
在Fiber模块已经实现,给每个线程增加线程局部变量用来保存协程的上下文,这样每个线程就可以同时保存三个协程(主协程、调度协程、当前运行的协程)的上下文,有了这三个上下文,协程就可以根据自己的身份来选择每次和哪个协程进行交换,具体过程如下:给协程类增加一个bool类型的成员_m_runInScheduler,用于记录该协程是否通过调度器来运行创建协程时,根据协程的身份指定对应的协程类型,具体来说,想让调度器调度的协程的_m_Scheduler的值设置为true,线程的主协程和线程的调度协程的_m_runInScheduler都为false。resume一个协程时,如果这个协程的_m_runInScheduler的值为true,表示这个调度器参与这个协程的调度,那么此时这个协程就应该和三个线程局部变量中的调度协程进行切换,同理,在yield时,也应该恢复调度协程的上下文,表示子协程切换回调度协程如果协程的_m_runInScheduler值为false,表示这个协程不参与调度器调度,那么在resume协程时,直接和线程主协程切换就可以了,相当于默认不去使用调度协程,yield也一样,应该恢复线程主写成的上下文。_m_runInScheduler值为false的协程上下文切换完全和调度协程无关,可以脱离调度器使用。
Scheduler模块代码
首先Scheduler是围绕协程和线程设计的,就如上面说到调度器实现思路一样,需要引入协程和线程,我们要利用多线程和多协程并且不需要像单一的Fiber类一样需要用户手动resume和yield协程充当调度器,从而做到自动化调度。所以调度器用来分配多线程去拿取任务,然后让协程去执行,这也是这个模块设计的核心。
scheduler.h:
构造函数:初始化线程池的数量n(后续还需要根据主线程/调度器线程是否参与调度来调整数量)调度器线程或主线程(这里主线程其实就是调度器线程)是否参与调度_use_caller:
1、如果_use_caller为true,即为主线程/调度器线程参与调度,则线程池的数量n-1,因为主线程/调度线程参与调度,所以需要调度协程,所以需要调用一次Fiber::GetThis初始化线程局部变量的主协程和调度协程,并通过Fiber::reset创建新的调度协程,覆盖Fiber::GetThis初始化的调度协程,将主线程的id存储到工作线程的线程id中,最后将剩余的线程数threads的总和放入到_m_threadCount中。
2、如果_use_caller为false,即为主线程/调度器线程不参与调度,则线程池的数量n设置调度器名字
析构函数:防止资源不释放占用系统资源
stopping判断调度器是否退出:用于判断调度器是否退出,在后续的stop函数中如果stopping为true代表调度器已经退出直接返回return不进行任何的操作。调度器退出的条件:_m_stopping为true && 任务队列为空 && 活跃线程数量为0
start函数:初始化线程池让初始化后的线程运行到run函数中:
1、如果是单线程caller调度且执行任务的模型,那这个start函数啥也不做,原因是上面提到的单线程的主线程作为调度线程进行调度就不需要创建额外的线程,直接将run函数作为调度协程的入口函数去和子协程进行任务调度。
2、如果是多线程模型,就把run函数作为调度协程的入口函数,并后续与子协程进行任务调度。
run入口函数:线程实现调度协程:上面说一般的调度线程在初始化后都会进入到run函数方法,然后需要判断此线程是否是主线程,因为只有主线程在构造函数调用了Fiber::GetThis有了主协程和调度协程,但是通过start初始化的线程是没有的,所以每个调度线程需要调用一次GetThis来初始化主协程,调度协程和正在运行的协程(对于每个调度线程来说,主协程就是调度协程)。为了防止空闲线程没有任务时退出,同时减少空闲线程对CPU的占用率,需要创建空闲协程idle,这个协程是为了在没有任务的时候,防止线程退出和进入一个忙等待状态占用CPU,idle协程内部的工作就是不断地resume/yield,不断得和该线程的调度协程进行切换(因为生成idle的时候使用的是默认值),同时idle内部使用sleep来降低空闲线程对CPU的占用率,这样互相切换直到等到有任务,有任务后就是子协程和调度协程进行互相切换了。调度协程循环调度任务子协程或空闲协程
idle空闲协程的入口函数:内部通过yield将执行权交还给调度协程,当线程没有任务需要调度时,就是调度协程与空闲协程相互切换。使用sleep来降低线程对CPU的占用率,避免空转浪费资源。
stop函数:stop函数的主要目的就是保证线程和协程都正常得退出需要注意的是,当主线程/调度器线程作为调度线程的情况下,当运行到start方法时候因为没有创建其他线程运行调度,所以调度任务并不会立即执行,只有当执行到stop方法的时候调度器才会真正在caller线程上开始执行,所以主线程不仅要执行任务的分配和程序的退出,还需要去负责调度任务。
scheduler.cc:
模块测试:
通过结果打印的任务执行的线程id可以看出,主线程/调度线程参与调度,调度的时机是 std::cout << "stop scheduler" << std::endl;scheduler->stop();之后,这与上面说的,主线程/调度线程参与调度,并不是一开始就参与调度,而是主线程/调度线程调用stop函数后才开始参与任务的调度。
Timer模块
Timer模块实现定时器(Timer)和定时器管理器(TimerManager)的功能,主要用于管理定时任务。定时器允许我们在设定的时间后执行某些操作,定时器管理器可以管理多个定时器,比如添加、删除、刷新等操作。
为什么要定时器?
为了实现协程调度器对定时任务的调度,服务器上经常要处理定时事件,比如3秒后关闭一个连接,或是定期检测一个客户端的连接状态。定时器也是后面hook模块的基础。
定时器管理器的实现模型关于tick信号:常见的定时器的实现有升序链表、高性能时间轮、时间堆等,我们的定时器管理器使用时间堆的实现方式,也就是最小时间堆。无论是升序链表还是时间轮的设计都依赖一个固定周期触发的tick信号,比如以2s为一个标准触发信号然后检查定时器是否超时,没有就继续等待下一个3s。
其实还可以使用另外一种设计思路,我们采用这个:具体操作就是每次取出所有定时器中超时时间最小的超时值作为一个tick信号,这样,一旦tick信号触发,超时时间最小的定时器必然到期。处理完已超时的定时器之后,再继续从剩余的定时器中找出超时时间最小的一个,并将这个最小时间作为下一个tick信号,如此反复,就可以实现较为精确的定时。采用最小堆的设计:使用最小堆是因为可以很快得获取到当前最小超时时间,所有的定时器根据绝对的超时时间点进行排序,每次取出离当前时间最近的一个超时时间点,计算出超时需要等待的时间,然后等待超时。超时时间到了之后,获取当前的绝对时间点,然后把最小堆里超时时间点小于这个时间点的定时器都收集起来,执行回调函数。(注意:在注册定时事件时,一般提供的是相对时间,比如相对于当前时间n秒后执行,我们需要根据传入的相对时间和当前的绝对时间计算出定时器超时的绝对时间点,然后根据这个绝对时间点对定时器进行排序;同时,因为依赖的是系统绝对时间,所以需要考虑校时的因素)关于定时器和IOScheduler(IO协程调度器,这是下个模块):在等待IO事件发生时,加入定时器的功能后,epoll_wait的超时时间改用当前定时器的最小超时时间来代替。epoll_wait返回后,根据当前的绝对时间把已超时的所有定时器收集起来,执行它们的回调函数。epoll触发一定是超时了吗?
由于epoll_wait的返回并不一定是超时引起的,也有可能是IO事件唤醒的,所以在epoll返回时不能想当然的以为是定时器超时,所以这个时候定时器的好处就出来了,可以通过比较当前的绝对时间和定时器的绝对时间判断一下,就可以确定一个定时器到底有没有超时。
Timer模块代码
所有的Timer对象都是由TimerManager类进行管理,TimerManager包含一个std::set类型的Timer集合,这个集合就是定时器的最小堆结构,因为set里的元素总是排过序的,所以总是可以很方便地获取到当前的最小定时器。
TimerManager提供创建定时器、获取最近一个定时器的超时时间、以及获取全部已经超时的定时器回调函数的方法,并且提供一个onTimerInsertedAt方法,这是虚函数,可以由IOScheduler继承实现,当新的定时器插入到Timer集合首部时,TimerManager
通过该方法来通知IOManager立刻更新当前的epoll_wait超时。
TimerManager同时还需要负责检测是否发生系统时间问题并进行校对,由detectClockRollover方法实现。
Timer.h:
Timer.cc:
模块测试:
IOScheduler模块
IOScheduler模块结合了IO多路复用和协程调度器:
- 之前实现的Scheduler调度器,最基本的功能就是将任务封装成协程并把这个协程添加到调度器的任务队列里,根据先到先服务的逻辑执行协程完成任务。
- 在服务器中需要处理大量的socketfd,比如连接、对应的读写事件处理,如果线程一直在某个fd上等待读写资源,那线程就会一直阻塞,不利于后续任务的高效执行,所以需要使用到IO多路复用来高效管理这些大量的fd,所以设计IO+协程调度器,目的就是让协程参与到其中;虽然IO多路复用处理的时候不需要阻塞等待数据,比如执行read()系统调用的时候看到是有读的数据才进行处理,如果在read()执行过程中想要同时处理其他事情,在没有协程的情况下是无法实现的。因此,我们引入“IO + 协程”的方案:它既解决了资源阻塞的问题,又能在处理复杂函数的同时兼顾简单任务的执行,从而提升了灵活性与并发效率(避免异步编程时的回调地狱问题,降低了代码复杂度)。
IO协程调度器有什么功能:
- IO协程调度器可以看成是增强版的协程调度器。在前面的协程调度器模块中,调度器对协程的调度是无条件执行的,在调度器已经启动调用的情况下,任务一旦添加成功,就会排队等待调度器执行。调度器不支持删除调度任务,并且调度器在正常退出之前一定会执行完全部的调度任务,所以某种程度上可以认为,把一个协程添加到调度器的任务队列,就相当于调用了协程的resume方法。
- IO协程调度器支持协程调度的全部功能,因为IO协程调度器是直接继承协程调度器实现的,除了协程调度功能外,IO协程调度器还增加了IO事件调度的功能,这个功能针对描述符(一般是套接字描述符)的。IO协程调度器支持为描述符注册可读和可写事件,当描述符可读或可写时,执行对应的回调函数(可以把回调函数等效为协程,因为实际还是用函数转换成协程)。
IO协程调度器的实现模型
IOScheduler模块基于什么实现:基于epoll实现,只支持Linux平台。对于每个fd,该模块支持两类事件,一类是可读事件(对应epoll的EPOLLIN),一类是可写事件(对应epoll的EPOLLOUT)。epoll除了支持EPOLLIN和EPOLLOUT两类事件外,还支持其他事件,比如EPOLLRDHUP(对端关闭)、EPOLLERR(错误事件)、EPOLLHUP(挂起事件)。对于这些事件,我们对其进行归并,分别对应到EPOLLIN和EPOLLOUT中,也就是所有的事件都可以表示为可读或可写事件,甚至有的事件还可以同时表示可读或可写,比如EPOLLERR事件发生,fd同时可读或可写。
IO协程调度包含的三元组信息:对于IO协程调度,每次调度都包含一个三元组信息,分别是:描述符-事件类型(可读可写事件)-回调函数,调度器记录全部需要调度的三元组信息,其中描述符和事件类型用于epoll_wait,回到函数用于协程调度。这三元组信息在源码上通过FdContext结构体来存储,在执行epoll_wait时通过epoll_event的私有数据指针data.ptr来保存FdContext的结构体信息。在FdContext源码中,处理fd字段,event字段,还有一个EventContext字段,EventContext字段是为了在fd触发了event事件(读或写或读写组合)的时候具体让执行的系统调用(如read()、write()、send())作为协程的入口函数进行任务的调度,目的是为了提高IO等待read()、write()数据较多的时候可以先解决小任务,增加灵活性和提高效率。所以EventContext有函数对象、协程、协程调度器,目的是为了在触发事件后找到对应的调度器对象去运行协程或者函数对象绑定的入口函数(read()、write())。
IO协程调度器对Scheduler类的idle协程的改造:在IO协程调度器的idle中,空闲协程idle会epoll_wait所有已经注册的fd,如果fd满足条件,epoll_wait返回,从私有数据data.ptr中拿到fd的事件上下文信息,并且执行其中的回调函数(实际是idle协程只负责收集所有已经触发的fd的回调函数并将其加入调度器的任务队列,真正的执行时机是idle协程退出后,调度器在下一轮调度时执行)。
IO协程调度器支持取消事件功能:所谓“取消事件”,是指不再关注某个文件描述符(fd)的某一类事件(如可读、可写事件)。若某个fd的所有可读/可写事件都被取消,该fd会从调度器的epoll_wait监听队列中被移除。
此外,调度器还支持IO事件调度:针对套接字描述符,可为其注册“可读事件”或“可写事件”的回调函数;当对应事件触发时,会自动执行该回调函数。
同时,调度器还额外集成了一个定时器,用于处理定时类任务(例如sleep、usleep等需要延迟执行的操作)。
//事件枚举 enum Event { NONE = 0x0, //没有事件 READ = 0x1, //读事件,READ == EPOLLIN == 0x1,对应epoll的EPOLLIN WRITE = 0x4, //写事件,WRITE == EPOLLOUT == 0x4,对应epoll的EPOLLOUT }; private: //用于描述一个文件描述符fd的事件上下文 //每一个socket fd都对应一个FdContext,包括fd的值,fd上的事件,以及fd的读写事件上下文 struct FdContext { //描述一个具体事件的上下文,如读事件和写事件 struct EventContext { //scheduler,关联的调度器 Scheduler* scheduler = nullptr; //callback fiber,关联的回调线程(协程) std::shared_ptr<Fiber> fiber; //callback function,关联的回调函数(都会注册为协程对象) std::function<void()> cb; }; //读事件上下文 EventContext read; //写事件上下文 EventContext write; //事件关联的fd值(句柄) int fd = 0; //当前注册的事件,可能是READ、WRITE、READ|WRITE,可以看成是位图 Event events = NONE; //事件上下文的互斥锁 std::mutex mutex; //根据事件类型获取相应的事件上下文(如读事件上下文或写事件上下文) EventContext& getEventContext(Event event); //重置事件上下文 void resetEventContext(EventContext& ctx); //触发事件,根据事件类型调用对应上下文结构的调度器去调度协程或函数 void triggerEvent(Event event); };IOScheduler模块代码
ioManager.h:
IOManager::GetThis():先看代码:dynamic_cast和static_cast:
1、dynamic_cast:dynamic_cast 主要用于处理多态类型的转换,即在继承层次结构中进行向上、向下或横向转换。它会在运行时检查转换的有效性,如果转换失败,对于指针类型会返回 nullptr,对于引用类型会抛出 std::bad_cast 异常。dynamic_cast 只能用于具有虚函数的类(多态类型),因为它依赖于运行时类型信息(RTTI运行时类型识别)。
2、static_cast:
是一种编译时类型转换,可以用于各种类型转换,包括基本类型之间的转换、非多态类的向上或向下转换,以及显式调用构造函数或转换运算符。它不会进行运行时类型检查,因此转换的安全性由程序员保证。static_cast 不能用于多态类型的向下转换(除非明确知道类型),因为它不会检查运行时类型信息。
void IOManager::idle() override:IOManager重写的idle,同样要支持上面的功能外,还支持将定时器超时任务加入到调度器任务队列中,并且监听事件是否触发,如果事件触发就把该事件对应的任务执行(调用triggerEvent,会把任务添加到任务队列)。
其他的解释看代码,结合注释食用更佳:
IOManager.cc:
模块测试:
在看这个重写的idle函数之前,先回顾之前在scheduler的idle函数,Scheduler的idle函数的作用通常是在调度线程没有任务处理时运行,线程也会在idel()中持续休眠并等待新的任务,保证了线程不会退出,以便于后续任务来了继续执行任务。

// 获取当前线程的调度器对象,然后将其动态转换成IOManager*类型, // 如果转换成功,表示当前线程的调度器对象确实是一个IOManager对象, // 否则,如果是转化的是指针类型返回nullptr。引用类型抛出std::bad_cast异常 IOManager *IOManager::GetThis() { // dynamic_cast 是 C++ 中用于在继承层次结构间进行安全类型转换的运算符,它在运行时检查转换的有效性。 // 如果转换成功,返回转换后的指针,否则返回 nullptr。 return dynamic_cast<IOManager *>(Scheduler::GetThis()); }Hook模块
什么是Hook?Hook(钩子)是一种编程技术,允许在特定事件或函数执行前后插入自定义代码逻辑。它通常用于拦截、修改或扩展系统或应用程序的默认行为,而无需修改原始代码。Hook广泛应用于操作系统、软件开发、游戏修改等领域。对于本项目来说,Hook是对原API的同名封装,在调用这个接口的时候首先执行的是我们封装好的同名API,比如在系统提供的malloc()和free()进行一些自定义的隐藏操作,在真正进行内存分配和释放之前,统计内存的引用计数,以排查内存泄漏的问题。
本项目使用Hook有什么作用?协程的好处就是可以随时在发生阻塞的情况下,随时切换、提供资源的利用率,我们的协程库就是要实现使用我们协程库的服务器没有阻塞的阶段,也就是当协程发生阻塞的时候,切换到其他协程去运行,直到需要的资源就绪后再返回来执行。所以Hook额IO协程调度器的关系是密切的。
例如使用协程调度器处理三个任务(同一个线程):
协程1:sleep(2)睡眠2s后返回
协程2:在socket fd1上send 100k的数据
协程3:在socket fd2上recv直到数据接收成功
如果在未使用hook的情况下执行的流程:
先执行协程1,因为sleep(2)在协程1阻塞2s,也就是整个线程阻塞2s,直到sleep结束后协程1让出控制权;执行协程2,这里会因为send等不到要发送的写资源一样进入阻塞,也就是整个线程阻塞,等到写资源来了,才会让出执行权;最后执行协程3,但是此时又阻塞在recv的读上,线程阻塞,直到资源就绪。这样这几个协程的执行就变成了一个阻塞并且同步的框架,完全实现不了我们的非阻塞协程的切换。
导致这样的原因是什么?
在协程1执行resum函数的时候先去执行sleep(),执行完之后才yield,那此时sleep需要等待2s从而阻塞在这个协程12s,因为resume函数中执行完才能yield,所以其他协程在协程1阻塞期间是不能工作的(因为都在同一个线程上),只能等到协程1执行完让出执行权之后才能执行其他协程。导致这样的原因就是sleep函数内部无法yield,因为没有改变sleep内部的执行逻辑,里面不能进行yield所以只能一步步走了,想解决这个问题也很简单使用Hook技术内部实现即可,比如sleep改成加了一个超时定时器后就可以直接暂停让出执行权去处理其他协程,等到tickle信号(或epoll触发),然后添加任务到协程调度器中调度,就可以完成sleep(2)的任务了,在阻塞2s的期间可以切换到其他协程执行任务,这就做到了非阻塞,体现了协程的灵活性。
在使用hook的情况下执行的流程:
resume协程1时,执行的sleep函数不是原始的sleep库函数,而是我们封装的同名函数sleep,在这个我们封装的函数时,会先执行我们自定义的逻辑:先添加一个2s的定时器(定时器超时触发后会在ioManager类中idle函数被listExpiredCb函数回收后将任务放入调度器等待调度),添加完定时器后协程1就yield让出执行权,协程2就不需要同步等待sleep(2)之后才可以执行,而是立刻就可以执行。同理,执行协程2时,会执行我们封装的同名函数send,里面会有我们自己的逻辑,由于不知道fd是否可以马上写,所以在ioManager中利用addevnet给fd添加一个写事件(触发写事件就会resume回来),然后yield让出执行权,不阻塞协程;接着协程3就可以执行了,协程3的操作与协程2类似,给fd注册一个读事件然后yield,等到资源到了之后触发事件再resume回来。
1、等2s超时定时器触发tickle信号,通过listExpiredCb放入到协程调度器的任务,被调度协程1resume继续执行。
2、等协程2的fd可写,一旦可写(epoll事件触发会将其加入到协程调度器任务队列),在协程调度器中run方法调用写事件将协程2resume以便继续执行send。
3、协程3类似。
在IO协程调度中对相关的系统调用进行hook,也就是对系统调用进行同名封装,里面会有我们自己的逻辑,以便于阻塞时yield,后续资源到位后resume,可以让调度线程尽可能进行任务执行的操作上,减少阻塞,提高线程效率。
需要注意:hook的重点是在替换API的底层实现的同时完全模拟器原本的行为,因为调用方是不知道hook的细节的,在调用被hook的API时,如果其行为与原本的行为不一致,就会给调用方造成困惑。比如,所有socket fd在进行IO调度时都会被设置成NONBLOCK模式,如果用户未显示地对fd设置NONBLOCK,那就要处理好fcntl,不要对用户暴露fd已经是NONBLOCK的事实,这点也说明了,除了IO相关的函数要进行hook外,对fcntl、setsockopt之类的功能函数也要进行hook,才能保证API的一致性。
Hook的两种方式:侵入式和外挂式
侵入式:侵入式Hook通过直接修改目标进程的内存或代码实现功能拦截或扩展。
外挂式:外挂式Hook是一种通过优先加载自定义动态库来实现对后加载动态库进行Hook的技术。它允许开发者在程序运行时拦截和修改函数调用,而无需修改原始代码。
本项目使用的是外挂式Hook,外挂式Hook的工作原理:
外挂式Hook的核心机制是全局符号介入+动态库优先加载。当程序运行时,系统会按照特定的顺序加载动态库,通过设置LD_PRELOAD环境变量,可以强制系统优先加载自定义的动态库。
具体实现步骤:创建与目标函数同名的自定义函数将自定义函数编译成动态库(如libhook.so)设置LD_PRELOAD环境变量指向该动态库运行目标程序,系统会优先加载自定义函数
全局符号介入机制和LD_PRELOAD:
当程序(如test.out)加载时,系统会先加载LD_PRELOAD指定的动态库(libhook.so)。此时,程序的全局符号表中会先出现函数的符号(来自libhook.so)。后续加载系统库(如libc.so)时,若发现同名符号,由于“全局符号介入”规则,系统库的同名符号会被忽略,最终程序中使用的write是自定义的版本。LD_PRELOAD:Linux下的环境变量,指定优先加载的动态库。程序启动时,会先加载LD_PRELOAD中的库,再加载其他库(如libc)。
全局符号介入机制:当程序加载多个动态库时,若不同库中存在同名全局符号(如函数、变量),先加载的库的符号会“覆盖”后加载的库的符号。因此,libhook.so先加载,其函数会劫持libc的函数。
例如有hook.cc和执行程序test.out
#生成动态库 g++ -shared -fPIC -o libhook.so hook.cc #利用全局符号介入机制,优先加载libhook.so LD_PRELOAD="./libhook.so" ./test.outHook模块实现
关于hook模块和IO协程调度的整合,一共有三类接口需要hook:sleep延时接口,包括sleep/usleep/nanosleep。对于这些接口的hook,只需要给IO协程调度器注册一个定时事件,在定时事件触发后再进行执行当前协程即可。当前协程在注册完定时事情后即可yield让出协程。socket IO系列接口,包括read/write/recv等,connect及accept也可也归到这类接口中。这类接口的hook首先需要判断操作的fd是否scoket fd,以及用户是否显示地对fd设置过非阻塞模式,如果不是socket fd 或是用户显示设置过非阻塞模式,那么就不需要使用hook了,直接调用操作系统的IO接口即可(就是原始的系统调用)。如果需要hook,那么首先在IO协程调度器上注册对应的读写事件,等事件发生后再继续执行当前协程。当前协程再注册完IO事件即可yield让出执行权让其他协程执行。socket/fcntl/ioctl/close等接口,这类接口主要处理的是边缘情况,比如分配fd上下文,处理超时及用户显式设置非阻塞问题。首先是socet fd 上下文和FdManager的实现,这两个类用于记录fd上下文和保存全部的fd上下文。
FdCtx、FdManager、Singleton类:用于管理文件描述符的上下文和其相关的操作FdCtx类主要用于管理与文件描述符相关的状态和操作;FdCtx类在用户态记录了fd的读写超时和非阻塞信息,其中非阻塞包括用户显示设置的非阻塞和hook内部设置的非阻塞,区分这两种非阻塞可以有效应对用户对fd设置/获取NONBLOCK模式的情形。FdManager类主要用于管理FdCtx对象的集合,它提供了对文件文件描述符上下文的访问和管理功能。Singleton类实现单例模式,确保一个类只有一个实例,并提供全局访问点。这里用于形成单例FdManager对象。
Hook拦截并重定向系统调用:
hook.h:
hook.cc:
HOOK_FUN(XX)宏和XX(name)宏HOOK_FUN(XX)利用宏展开机制,相当于一个"容器",里面存储了多个XX(name),再配合XX(name)可以做到依次根据每一个name函数名称来生成一系列代码。这样可以有效减少重复代码,提高代码的可读性和维护性。dlsym():
dlsym函数的功能就是可以从共享库(动态库)中获取符号(全局变量与函数符号)地址,通常用于获取函数符号地址,这样可用于对共享库中函数的包装;
因为我们使用了hook外挂式改变了本应该加载的libc共享库,全局符号表中已经有我们的sleep了,此时如果想在代码中继续获取原始的系统调用就需要借助dlsym(从动态库中获取符号地址函数)和RTLD_NEXT,二者结合的就说RTLD_NEXT告诉dlsym查找原始符号从当前库或者程序之后继续搜索,也就是搜索没被加载进来的libc共享库的sleep原始系统调用。
do_io通用模板:do_io模板用来做一个同一的规范化检查全局钩子、文件描述符(fd)的有效性以及是否为非阻塞模式,以决定采用何种 IO 策略。支持为 IO 操作设置时间限制(如使用SO_SNDTIMEO参数)。在非阻塞 IO 操作因资源暂时不可用(例如网络缓冲区已满)而无法立即完成时,do_io会利用事件监听机制挂起当前协程,让出 CPU 资源。当 IO 条件就绪(如可写)时,再恢复协程执行,从而高效地处理高并发请求。
其他函数看代码和详细注释理解:
模块测试:
// 用于读写函数的通用模板 template <typename OriginFun, typename... Args> static ssize_t do_io(int fd, OriginFun fun, const char *hook_fun_name, uint32_t event, int timeout_so, Args &&...args) // 这里的&&是万能引用,用于完美转发参数 { // 如果全局钩子动能未启用,则直接调用原始的I/O函数 if (!nsCoroutine::t_hook_enable) { // 完美转发参数,避免参数被复制 return fun(fd, std::forward<Args>(args)...); } // 获取与文件描述符fd相关联的上下文ctx,如果上下文不存在,则直接调用原始的I/O函数 std::shared_ptr<nsCoroutine::FdCtx> ctx = nsCoroutine::FdMgr::GetInstance()->get(fd); if (!ctx) { return fun(fd, std::forward<Args>(args)...); } // 如果文件描述符已经关闭,设置errno为EBADF并返回-1 if (ctx->isClosed()) { errno = EBADF; // 表示文件描述符无效或已经关闭 return -1; } // 如果文件描述符不是一个socket或者用户设置了非阻塞模式,则直接调用原始的I/O操作函数 if (!ctx->isSocket() || ctx->getUserNonblock()) { return fun(fd, std::forward<Args>(args)...); } // 获取超时设置并初始化timer_info结构体,用于后续的超时管理和取消操作。 uint64_t timeout = ctx->getTimeout(timeout_so); // 条件定时器的条件 std::shared_ptr<timer_info> tinfo(new timer_info); // 调用原始的I/O操作函数,并处理超时情况;如果由于系统中断(EINTR)导致操作失败,函数会重试 retry: // 调用原始I/O操作函数 ssize_t n = fun(fd, std::forward<Args>(args)...); // 由于系统中断(EINTR)导致操作失败,函数重试 while (n == -1 && errno == EINTR) { n = fun(fd, std::forward<Args>(args)...); } // 如果I/O操作因为资源暂时不可用(EAGAIN)而失败,函数会添加一个事件监听器来等待资源可用; // 同时,如果有超时设置,还会启动一个条件计时器来取消事件 if (n == -1 && errno == EAGAIN) { nsCoroutine::IOManager *iom = nsCoroutine::IOManager::GetThis(); // timer std::shared_ptr<nsCoroutine::Timer> timer; std::weak_ptr<timer_info> winfo(tinfo); // 如果执行的read等函数在Fdmanager管理的Fdctx中fd设置了超时时间,就会走到这里,添加addconditionTimer事件 if (timeout != (uint64_t)-1) { timer = iom->addConditionTimer(timeout, [winfo, fd, iom, event]() { auto t = winfo.lock(); // 如果 timer_info 对象已被释放(!t),或者操作已被取消(t->cancelled 非 0),则直接返回。 if(!t || t->cancelled) { return; } t->cancelled = ETIMEDOUT; //如果超时时间到达并且事件尚未被处理(即cancelled任然是0); // 取消该文件描述符上的事件,并立即触发一次事件(即恢复被挂起的协程) iom->cancelEvent(fd, (nsCoroutine::IOManager::Event)(event)); }, winfo); } // 这行代码的作用是将 fd(文件描述符)和 event(要监听的事件,如读或写事件)添加到 IOManager 中进行管理。IOManager 会监听这个文件描述符上的事件,当事件触发时,它会调度相应的协程来处理这个事件 int rt = iom->addEvent(fd, (nsCoroutine::IOManager::Event)(event)); if (rt == -1) { std::cout << hook_fun_name << " addEvent(" << fd << ", " << event << ")"; // 如果 rt 为-1,说明 addEvent 失败。此时,会打印一条调试信息,并且因为添加事件失败所以要取消之前设置的定时器,避免误触发。 if (timer) { timer->cancel(); } return -1; } else { // 如果 addEvent 成功(rt 为 0),当前协程会调用 yield() 函数,将自己挂起,等待事件的触发。 nsCoroutine::Fiber::GetThis()->yield(); // 当协程被恢复时(例如,事件触发后),它会继续执行 yield() 之后的代码。 // 如果之前设置了定时器(timer 不为 nullptr),则在事件处理完毕后取消该定时器。取消定时器的原因是,该定时器的唯一目的是在 I/O 操作超时时取消事件。如果事件已经正常处理完毕,那么定时器就不再需要了。 if (timer) { timer->cancel(); } // 接下来检查 tinfo->cancelled 是否等于 ETIMEDOUT。如果等于,说明该操作因超时而被取消,因此设置 errno 为 ETIMEDOUT 并返回 -1,表示操作失败。 if (tinfo->cancelled == ETIMEDOUT) { errno = tinfo->cancelled; return -1; } // 如果没有超时,则跳转到 retry 标签,重新尝试这个操作。 goto retry; } } return n; } // 宏定义,用于声明所有需要hook的函数 // 配合 #define XX(name) name##_f = (name##_fun)dlsym(RTLD_NEXT, #name); 使用 #define HOOK_FUN(XX) \ XX(sleep) \ XX(usleep) \ XX(nanosleep) \ XX(socket) \ XX(connect) \ XX(accept) \ XX(read) \ XX(readv) \ XX(recv) \ XX(recvfrom) \ XX(recvmsg) \ XX(write) \ XX(writev) \ XX(send) \ XX(sendto) \ XX(sendmsg) \ XX(close) \ XX(fcntl) \ XX(ioctl) \ XX(getsockopt) \ XX(setsockopt) #define XX(name) name##_f = (name##_fun)dlsym(RTLD_NEXT, #name); HOOK_FUN(XX) #undef XX // 具体展开过程: 1、HOOK_FUN(XX)都替换成: XX(sleep) XX(usleep) XX(nanosleep) XX(socket) XX(connect) XX(accept) XX(read) XX(readv) XX(recv) XX(recvfrom) XX(recvmsg) XX(write) XX(writev) XX(send) XX(sendto) XX(sendmsg) XX(close) XX(fcntl) XX(ioctl) XX(getsockopt) XX(setsockopt) 2、每一个XX(name)的宏进一步展开为: sleep_f=(sleep_fun)dlsym(RTLD_NEXT,"sleep"); usleep_f=(usleep_fun)dlsym(RTLD_NECT,"usleep"); ......fdManager.h:
#pragma once #include <memory> #include <shared_mutex> #include "thread.h" namespace nsCoroutine { // FdCtx类用于管理与文件描述符相关的状态和操作 // FdCtx类在用户态记录了fd的读写超时和非阻塞信息,其中非阻塞包括用户显示设置的非阻塞和hook内部设置的非阻塞,区分这两种非阻塞可以有效应对用户对fd设置/获取NONBLOCK模式的情形。 class FdCtx : public std::enable_shared_from_this<FdCtx> { private: bool m_isInit = false; //标记文件描述符是否已初始化 bool m_isSocket = false; //标记文件描述符是否是一个套接字 bool m_sysNonblock = false; //标记文件描述符是否设置为系统非阻塞模式 bool m_userNonblock = false; //标记文件描述符是否设置为用户非阻塞模式 bool m_isClosed = false; //标记文件描述符是否已关闭 int m_fd; //文件描述符 // 读事件的超时时间,默认为-1表示没有超时限制 uint64_t m_recvTimeout = (uint64_t)-1; // 写事件的超时时间,默认为-1表示没有超时限制 uint64_t m_sendTimeout = (uint64_t)-1; public: FdCtx(int fd); ~FdCtx(); //初始化FdCtx对象 bool init(); bool isInit() const { return m_isInit; } bool isSocket() const { return m_isSocket; } bool isClosed() const { return m_isClosed; } // 设置和获取用户层面的非阻塞状态 void setUserNonblock(bool v) { m_userNonblock = v; } bool getUserNonblock() const { return m_userNonblock; } // 设置和获取系统层面的非阻塞状态 void setSysNonblock(bool v) { m_sysNonblock = v; } bool getSysNonblock() const { return m_sysNonblock; } // 设置和获取超时时间,type用于区分读事件和写事件的超时设置,v表示时间毫秒。 void setTimeout(int type, uint64_t v); uint64_t getTimeout(int type); }; // 用于管理FdCtx对象的集合,提供了对文件描述符上下文的访问和管理功能 class FdManager { public: FdManager(); // 获取指定文件描述符的FdCtx对象,如果auto_create为true,在不存在的时候自动创建新的FdCtx对象 std::shared_ptr<FdCtx> get(int fd, bool auto_create = false); // 删除指定文件描述符的FdCtx对象 void del(int fd); private: //用于保护对m_datas的访问,支持共享读锁和独占写锁。 std::shared_mutex m_mutex; //存储所有FdCtx对象的共享指针 std::vector<std::shared_ptr<FdCtx>> m_datas; }; // 实现单例模式,确保一个类只有一个实例,并提供全局访问点 // 使用懒汉模式 + 互斥锁维持线程安全 template <typename T> class Singleton { private: static T *instance; //对外提供的实例 static std::mutex mutex; //锁 protected: Singleton() {} public: // 删除拷贝构造函数和赋值运算符 Singleton(const Singleton &) = delete; Singleton &operator=(const Singleton &) = delete; static T *GetInstance() { std::lock_guard<std::mutex> lock(mutex); // 确保线程安全 // 这里还能锁优化 if (instance == nullptr) { instance = new T(); } return instance; } static void DestroyInstance() { std::lock_guard<std::mutex> lock(mutex); if(instance) { delete instance; instance = nullptr; } } }; typedef Singleton<FdManager> FdMgr; }fdManager.cc:
#include "fdManager.h" #include "hook.h" #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> namespace nsCoroutine { // 显示实例化,FdManager类有一个全局唯一的单例实例 template class Singleton<FdManager>; // 初始化 template <typename T> T *Singleton<T>::instance = nullptr; template <typename T> std::mutex Singleton<T>::mutex; FdCtx::FdCtx(int fd) : m_fd(fd) { init(); } FdCtx::~FdCtx() { } bool FdCtx::init() { if (m_isInit) { return true; } struct stat statbuf; // fstat 函数用于获取与文件描述符 m_fd 关联的文件状态信息存放到 statbuf 中。如果 fstat() 返回 -1,表示文件描述符无效或出现错误。 if (-1 == fstat(m_fd, &statbuf)) { m_isInit = false; m_isSocket = false; } else { m_isInit = true; // S_ISSOCK(statbuf.st_mode) SISSOCK是一个宏,用于检查'st_mode'中的位,以确定文件是否是一个套接字(socket)。该宏定义在<sys/stat.h>头文件中。 m_isSocket = S_ISSOCK(statbuf.st_mode); } // 如果是套接字就设置为非阻塞 if (m_isSocket) { // 获取文件描述符的状态 int flags = fcntl_f(m_fd, F_GETFL, 0); if (!(flags & O_NONBLOCK)) { // 检查当前标志中是否设置了非阻塞标志,如果没有就设置 fcntl_f(m_fd, F_SETFL, flags | O_NONBLOCK); } // hook 非阻塞设置成功 m_sysNonblock = true; } else { // 如果不是套接字就没必要设置非阻塞 m_sysNonblock = false; } return m_isInit; } //type指定超时类型的标志。可能的值包括 SO_RCVTIMEO 和 SO_SNDTIMEO,分别用于接收超时和发送超时。v代表设置的超时时间,单位是毫秒或者其他。 void FdCtx::setTimeout(int type, uint64_t v) { if (type == SO_RCVTIMEO) { m_recvTimeout = v; } else if(type == SO_SNDTIMEO) { m_sendTimeout = v; } //type无效 else { m_recvTimeout = -1; m_sendTimeout = -1; std::cout << "type error" << std::endl; } } uint64_t FdCtx::getTimeout(int type) { if (type == SO_RCVTIMEO) { return m_recvTimeout; } else if(type == SO_SNDTIMEO) { return m_sendTimeout; } //type无效 else { return -1; std::cout << "type error" << std::endl; } } FdManager::FdManager() { m_datas.resize(64); } std::shared_ptr<FdCtx> FdManager::get(int fd, bool auto_create) { if (fd == -1) { return nullptr; } // 读锁 std::shared_lock<std::shared_mutex> read_lock(m_mutex); if (m_datas.size() <= fd) { if (auto_create == false) { return nullptr; } } else { if (m_datas[fd] || !auto_create) { return m_datas[fd]; } } read_lock.unlock(); // 写锁 std::unique_lock<std::shared_mutex> write_lock(m_mutex); if (m_datas.size() <= fd) { m_datas.resize(fd * 1.5); } m_datas[fd] = std::make_shared<FdCtx>(fd); return m_datas[fd]; } // 删除指定文件描述符的FdCtx对象 void FdManager::del(int fd) { std::unique_lock<std::shared_mutex> write_lock(m_mutex); if (m_datas.size() <= fd) { return; } // reset()用于是否std::shared_ptr所管理的对象,并将智能指针重新置为nullptr(即空指针),如果此时执行reset的智能指针是最后一个,那么其对象会被销毁。 // 智能指针share调用reset()减少其对对象的引用计数,当引用计数为0销毁对象。 m_datas[fd].reset(); } }性能测试
测试内容
以本项目、原始epoll、libevent网络库分别编写单线程回声服务器(内置100000次循环模拟复杂业务场景),使用ApacheBench测试工具分别进行压力测试。
运行环境
- CPU:13th Gen Intel(R)core(TM)i7-13620H 10核16线程,虚拟机使用CPU数量4,CPU内核数量2
- 内存:本机16g,虚拟机4g
- 带宽:利用iperf3测试回环的总体传输速率为88.7 Gbits/sec
- Ubuntu版本:25.04
本项目测试
#include "ioManager.h" #include "hook.h" #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <fcntl.h> #include <iostream> #include <stack> #include <cstring> #include <chrono> #include <thread> static int sock_listen_fd = -1; void test_accept(); void error(const char *msg) { perror(msg); printf("erreur...\n"); exit(1); } void watch_io_read() { nsCoroutine::IOManager::GetThis()->addEvent(sock_listen_fd, nsCoroutine::IOManager::READ, test_accept); } void test_accept() { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); socklen_t len = sizeof(addr); int fd = accept(sock_listen_fd, (struct sockaddr *)&addr, &len); if (fd < 0) { std::cout << "accept failed, fd = " << fd << ", errno = " << errno << std::endl; } else { std::cout << "accepted connection, fd = " << fd << std::endl; fcntl(fd, F_SETFL, O_NONBLOCK); nsCoroutine::IOManager::GetThis()->addEvent(fd, nsCoroutine::IOManager::READ, [fd]() { char buffer[1024]; memset(buffer, 0, sizeof(buffer)); while (true) { int ret = recv(fd, buffer, sizeof(buffer), 0); if (ret > 0) { // 打印接收到的数据 //std::cout << "received data, fd = " << fd << ", data = " << buffer << std::endl; // 构建HTTP响应 const char *response = "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: 1\r\n" "\r\n" "1"; // 模拟复杂业务场景 for(int i = 0; i < 100000; ++i); // 发送HTTP响应 ret = send(fd, response, strlen(response), 0); std::cout << "sent data, fd = " << fd << ", ret = " << ret << std::endl; // 关闭连接 close(fd); break; } if (ret <= 0) { if (ret == 0 || errno != EAGAIN) { std::cout << "closing connection, fd = " << fd << std::endl; close(fd); break; } else if (errno == EAGAIN) { std::cout << "recv returned EAGAIN, fd = " << fd << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 延长睡眠时间,避免繁忙等待 } } } }); } nsCoroutine::IOManager::GetThis()->addEvent(sock_listen_fd, nsCoroutine::IOManager::READ, test_accept); } void test_iomanager() { int portno = 8080; struct sockaddr_in server_addr, client_addr; socklen_t client_len = sizeof(client_addr); // 设置套接字 sock_listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_listen_fd < 0) { error("Error creating socket..\n"); } int yes = 1; // 解决 "address already in use" 错误 setsockopt(sock_listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); memset((char *)&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(portno); server_addr.sin_addr.s_addr = INADDR_ANY; // 绑定套接字并监听连接 if (bind(sock_listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) error("Error binding socket..\n"); if (listen(sock_listen_fd, 1024) < 0) { error("Error listening..\n"); } printf("epoll echo server listening for connections on port: %d\n", portno); fcntl(sock_listen_fd, F_SETFL, O_NONBLOCK); nsCoroutine::IOManager iom(4); iom.addEvent(sock_listen_fd, nsCoroutine::IOManager::READ, test_accept); } int main(int argc, char *argv[]) { test_iomanager(); return 0; }
原生epoll测试
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/epoll.h> #define MAX_EVENTS 10 #define PORT 8080 int main() { int listen_fd, conn_fd, epoll_fd, event_count; struct sockaddr_in server_addr, client_addr; socklen_t addr_len = sizeof(client_addr); struct epoll_event events[MAX_EVENTS], event; // 创建监听套接字 if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); return -1; } int yes = 1; // 解决 "address already in use" 错误 setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); // 设置服务器地址和端口 memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(PORT); server_addr.sin_addr.s_addr = INADDR_ANY; // 绑定监听套接字到服务器地址和端口 if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { perror("bind"); return -1; } // 监听连接 if (listen(listen_fd, 1024) == -1) { perror("listen"); return -1; } // 创建 epoll 实例 if ((epoll_fd = epoll_create1(0)) == -1) { perror("epoll_create1"); return -1; } // 添加监听套接字到 epoll 实例中 event.events = EPOLLIN; event.data.fd = listen_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) { perror("epoll_ctl"); return -1; } while (1) { // 等待事件发生 event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); if (event_count == -1) { perror("epoll_wait"); return -1; } // 处理事件 for (int i = 0; i < event_count; i++) { if (events[i].data.fd == listen_fd) { // 有新连接到达 conn_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &addr_len); if (conn_fd == -1) { perror("accept"); continue; } // 将新连接的套接字添加到 epoll 实例中 event.events = EPOLLIN; event.data.fd = conn_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &event) == -1) { perror("epoll_ctl"); return -1; } } else { // 有数据可读 char buf[1024]; int len = read(events[i].data.fd, buf, sizeof(buf) - 1); if (len <= 0) { // 发生错误或连接关闭,关闭连接 close(events[i].data.fd); } else { // 发送HTTP响应 const char *response = "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: 1\r\n" "\r\n" "1"; // 模拟复杂业务场景 for(int i = 0; i < 100000; ++i); write(events[i].data.fd, response, strlen(response)); // epoll_ctl(epoll_fd,EPOLL_CTL_DEL,events[i].data.fd,NULL);//出现70007的错误再打开,或者试试-r命令 // 关闭连接 close(events[i].data.fd); } } } } // 关闭监听套接字和 epoll 实例 close(listen_fd); close(epoll_fd); return 0; }
libevent网络库测试
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <event2/event.h> #include <event2/listener.h> #include <event2/bufferevent.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #define PORT 8080 // 处理读事件的回调函数 void http_read_cb(evutil_socket_t fd, short events, void *arg) { char buf[1024]; int len = recv(fd, buf, sizeof(buf) - 1, 0); if (len <= 0) { // 发生错误或连接关闭,关闭连接并释放事件资源 close(fd); event_free((struct event *)arg); return; } buf[len] = '\0'; printf("接收到消息:%s\n", buf); // 构建HTTP响应 const char *response = "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: 13\r\n" "\r\n" "1"; // 模拟复杂业务场景 for(int i = 0; i < 100000; ++i); send(fd, response, strlen(response), 0); // 发送响应后关闭连接 close(fd); event_free((struct event *)arg); } // 接受连接的回调函数 void accept_conn_cb(evutil_socket_t listener, short event, void *arg) { struct event_base *base = (struct event_base *)arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr *)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { // 创建一个新的事件结构体 struct event *ev = event_new(NULL, -1, 0, NULL, NULL); // 将新的事件添加到事件循环中 event_assign(ev, base, fd, EV_READ | EV_PERSIST, http_read_cb, (void *)ev); event_add(ev, NULL); } } int main() { struct event_base *base; struct event *listener_event; struct sockaddr_in sin; // 初始化监听地址和端口 memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(INADDR_ANY); sin.sin_port = htons(PORT); // 创建监听套接字 int listener = socket(AF_INET, SOCK_STREAM, 0); if (listener < 0) { perror("socket"); return -1; } // 设置套接字选项,允许地址复用 evutil_make_socket_nonblocking(listener); int reuse = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); // 绑定地址和端口 if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) { perror("bind"); return -1; } // 监听端口 if (listen(listener, 1024) < 0) { perror("listen"); return -1; } // 初始化Libevent库 base = event_base_new(); // 创建一个监听事件 listener_event = event_new(base, listener, EV_READ | EV_PERSIST, accept_conn_cb, (void *)base); // 将监听事件添加到事件循环中 event_add(listener_event, NULL); // 开始事件循环 event_base_dispatch(base); // 清理资源 event_free(listener_event); event_base_free(base); close(listener); return 0; }
项目存在问题与适用场景
多次测试本项目和其他测试对比发现问题:在2核情况下,并且业务简单的情况下,本项目的性能比原生epoll差。原因:
- 因为项目使用了线程池,多核情况下性能更好。
- 如果业务场景太简单的情况会导致多线程对任务队列的竞态访问过于激烈,新切换的线程得不到锁就会被挂起(多核环境也受制约),频繁的线程切换开销严重影响性能测试。所以在所有测试代码中加一个for(int i=0;i<100000;++i);来模拟复杂业务处理,让任务不那么轻量。
所以项目适合在多核支持且业务复杂的情况下使用。