【Linux | 网络】多路转接IO之poll
一、poll函数
#include<poll.h>intpoll(structpollfd*fds,nfds_t nfds,int timeout);功能:poll 是 Linux 系统中的一种 I/O 多路复用机制,主要用于同时监控多个文件描述符的状态。
参数:
- fds:指向 struct pollfd 数组的指针,每个元素指定一个要监控的文件描述符及其关注的事件
- nfds:数组中元素的数量(即监控的文件描述符总数)
- timeout:超时时间(毫秒):
- -1:永久阻塞,直到有事件发生
- 0:立即返回(非阻塞模式)
- >0:指定超时时间,超时后返回
返回值:
- 正数:表示就绪的文件描述符总数
- 0:表示超时(无文件描述符就绪)
- -1:表示错误,并设置 errno
struct pollfd 结构
structpollfd{int fd;// 文件描述符short events;// 关注的事件(输入掩码,如 POLLIN、POLLOUT)short revents;// 实际发生的事件(输出掩码,由内核填充)};
poll 函数支持的标准事件类型,本质上是宏,只有一个比特位为1,通过与events和revents异或分为两种情况:
- 调用时:用户告诉内核,需要关注文件描述符中的events事件
- 返回时:内核告诉用户,用户关注的文件描述符,有revents中的事件准备就绪
| 事件 | 描述 | 是否可以作为输入 | 是否可以作为输出 |
|---|---|---|---|
| POLLIN | 有普通数据或优先数据可读 | 是 | 是 |
| POLLRDNORM | 有普通数据可读 | 是 | 是 |
| POLLRDBAND | 有优先级带数据可读 | 是 | 是 |
| POLLPRI | 有高优先级带数据可读 | 是 | 是 |
| POLLOUT | 有普通数据或优先数据可写 | 是 | 是 |
| POLLWRNORM | 有普通数据可写 | 是 | 是 |
| POLLWRBAND | 有优先级带数据可写 | 是 | 是 |
| POLLRDHUP | TCP连接的对端关闭连接,或关闭了写操作 | 是 | 是 |
| POLLHUP | 挂起 | 否 | 是 |
| POLLERR | 错误 | 否 | 是 |
| POLLNVAL | 文件描述符未打开 | 否 | 是 |
二、poll的优缺点
- 优点
- poll 只负责等待,可以等待多个文件描述符,在IO的时候效率会比较高
- 输入和输出参数进行分离,events和revents,不需要再对poll的参数进行频繁的重置了
- poll使用了动态数组,所以 poll 能够检测文件描述符的个数也是没有有限的
- 缺点
- 用户和内核之间,需要一直进行数据拷贝
- 在编写代码的时候,需要遍历动态数组,可能会影响select的效率
- poll 会让操作系统在底层遍历要关心的所有文件描述符,会导致效率降低
三、实现poll服务器(只关心读事件)
3.1 Log.hpp(日志)
#pragmaonce#include"LockGuard.hpp"#include<iostream>#include<string>#include<stdarg.h>#include<stdio.h>#include<pthread.h>#include<time.h>#include<unistd.h>#include<sys/types.h>#include<fcntl.h>#include<sys/stat.h> using namespace std;// 日志等级enum{ Debug =0,// 调试 Info,// 正常 Warning,// 警告 Error,// 错误,但程序并未直接退出 Fatal // 程序直接挂掉};enum{ Screen =10,// 打印到显示器上 OneFile,// 打印到一个文件中 ClassFile // 按照日志等级打印到不同的文件中}; string LevelToString(int level){switch(level){case Debug:return"Debug";case Info:return"Info";case Warning:return"Warning";case Error:return"Error";case Fatal:return"Fatal";default:return"Unknow";}}constchar*default_filename ="log.";constint default_style = Screen;constchar*defaultdir ="log"; class Log { public:Log():style(default_style),filename(default_filename){// mkdir(defaultdir,0775);pthread_mutex_init(&_log_mutex, nullptr);}voidSwitchStyle(int sty){ style = sty;}voidWriteLogToOneFile(const string &logname,const string &logmessage){int fd =open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND,0666);if(fd ==-1)return;{ LockGuard lockguard(&_log_mutex);write(fd, logmessage.c_str(), logmessage.size());}close(fd);}voidWriteLogToClassFile(const string &levelstr,const string &logmessage){mkdir(defaultdir,0775); string name = defaultdir; name +="/"; name += filename; name += levelstr;WriteLogToOneFile(name, logmessage);}voidWriteLog(int level,const string &logmessage){switch(style){case Screen:{ LockGuard lockguard(&_log_mutex); cout << logmessage;}break;case OneFile:WriteLogToClassFile("All", logmessage);break;case ClassFile:WriteLogToClassFile(LevelToString(level), logmessage);break;default:break;}} string GetTime(){time_t CurrentTime =time(nullptr);structtm*curtime =localtime(&CurrentTime);char time[128];// localtime 的年是从1900开始的,所以要加1900, 月是从0开始的所以加1snprintf(time,sizeof(time),"%d-%d-%d %d:%d:%d", curtime->tm_year +1900, curtime->tm_mon +1, curtime->tm_mday, curtime->tm_hour, curtime->tm_min, curtime->tm_sec);return time;return"";}voidLogMessage(int level,constchar*format,...){char left[1024]; string Levelstr =LevelToString(level).c_str(); string Timestr =GetTime().c_str(); string Idstr =to_string(getpid());snprintf(left,sizeof(left),"[%s][%s][%s] ", Levelstr.c_str(), Timestr.c_str(), Idstr.c_str()); va_list args;va_start(args, format);char right[1024];vsnprintf(right,sizeof(right), format, args); string logmessage = left; logmessage += right;WriteLog(level, logmessage);va_end(args);}~Log(){pthread_mutex_destroy(&_log_mutex);}; private:int style; string filename;pthread_mutex_t _log_mutex;}; Log lg; class Conf { public:Conf(){ lg.SwitchStyle(Screen);}~Conf(){}}; Conf conf;3.2 Lockguard.hpp(自动管理锁)
#pragmaonce#include<iostream> class Mutex { public:Mutex(pthread_mutex_t* lock):pmutex(lock){}voidLock(){pthread_mutex_lock(pmutex);}voidUnlock(){pthread_mutex_unlock(pmutex);}~Mutex(){} public:pthread_mutex_t* pmutex;}; class LockGuard { public:LockGuard(pthread_mutex_t* lock):mutex(lock){ mutex.Lock();}~LockGuard(){ mutex.Unlock();} public: Mutex mutex;};3.3 Socket.hpp(封装套接字)
#pragmaonce#include<iostream>#include<string>#include<string.h>#include<unistd.h>#include<sys/types.h>#include<sys/socket.h>#include<netinet/in.h>#include<arpa/inet.h>#include<errno.h>#defineCONV(addrptr)(structsockaddr*)addrptrenum{ Socket_err =1, Bind_err, Listen_err };conststaticint defalutsockfd =-1;constint defalutbacklog =5; class Socket { public: virtual ~Socket(){}; virtual voidCreateSocketOrDie()=0; virtual voidBindSocketOrDie(uint16_t port)=0; virtual voidListenSocketOrDie(int backlog)=0; virtual intAcceptConnection(std::string* ip ,uint16_t* port)=0; virtual bool ConnectServer(const std::string& serverip ,uint16_t serverport)=0; virtual intGetSockFd()=0; virtual voidSetSockFd(int sockfd)=0; virtual voidCloseSockFd()=0; virtual bool Recv(std::string& buffer,int size)=0; virtual voidSend(const std::string& send_string)=0; public:voidBuildListenSocketMethod(uint16_t port,int backlog = defalutbacklog){CreateSocketOrDie();BindSocketOrDie(port);ListenSocketOrDie(backlog);} bool BuildConnectSocketMethod(const std::string& serverip ,uint16_t serverport){CreateSocketOrDie();returnConnectServer(serverip,serverport);}voidBuildNormalSocketMethod(int sockfd){SetSockFd(sockfd);}}; class TcpSocket : public Socket { public:TcpSocket(int sockfd = defalutsockfd):_sockfd(sockfd){}~TcpSocket(){};voidCreateSocketOrDie() override { _sockfd =::socket(AF_INET,SOCK_STREAM,0);if(_sockfd <0)exit(Socket_err);}voidBindSocketOrDie(uint16_t port) override {structsockaddr_in addr;memset(&addr,0,sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port =htons(port);socklen_t len =sizeof(addr);int n =::bind(_sockfd,CONV(&addr),len);if(n <0)exit(Bind_err);}voidListenSocketOrDie(int backlog) override {int n =::listen(_sockfd,backlog);if(n <0)exit(Listen_err);}intAcceptConnection(std::string* clientip ,uint16_t* clientport) override {structsockaddr_in client;memset(&client,0,sizeof(client));socklen_t len =sizeof(client);int fd =::accept(_sockfd,CONV(&client),&len);if(fd <0)return-1;char buffer[64];inet_ntop(AF_INET,&client.sin_addr,buffer,len);*clientip = buffer;*clientport =ntohs(client.sin_port);return fd;} bool ConnectServer(const std::string& serverip ,uint16_t serverport) override {structsockaddr_in server;memset(&server,0,sizeof(server)); server.sin_family = AF_INET;// server.sin_addr.s_addr = inet_addr(serverip.c_str());inet_pton(AF_INET,serverip.c_str(),&server.sin_addr); server.sin_port =htons(serverport);socklen_t len =sizeof(server);int n =connect(_sockfd,CONV(&server),len);if(n <0)return false;elsereturn true;}intGetSockFd() override {return _sockfd;}voidSetSockFd(int sockfd) override { _sockfd = sockfd;}voidCloseSockFd() override {if(_sockfd > defalutsockfd){close(_sockfd);}} bool Recv(std::string& buffer ,int size)override {char inbuffer[size];int n =recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);if(n >0){ inbuffer[n]=0;}else{return false;} buffer += inbuffer;return true;}voidSend(const std::string& send_string){send(_sockfd,send_string.c_str(),send_string.size(),0);} private:int _sockfd;};3.4 PollServer.hpp(服务端封装)
#pragmaonce#include<iostream>#include<string>#include<poll.h>#include"Socket.hpp"#include"Log.hpp"#include<memory> using namespace std;conststaticuint16_t defalutport =8888;conststaticint gbacklog =8;conststaticint num =1024; class PollServer { private:voidHandlerEvent(){for(int i =0; i < _num; i++){// 是否监控if(_rfds[i].fd ==-1)continue;// 是否就绪int fd = _rfds[i].fd;if(_rfds[i].revents & POLLIN){// 是新连接到来,还是新数据到来// 新连接到来if(fd == _listensock->GetSockFd()){ lg.LogMessage(Info,"get a new link\n"); string clientip;uint16_t cilentport;// 由于select已经检测到listensock已经就绪了,这里不会阻塞int sockfd = _listensock->AcceptConnection(&clientip,&cilentport);if(sockfd ==-1){ lg.LogMessage(Error,"accept error\n");continue;} lg.LogMessage(Info,"get a client , client info# %s %d , fd:%d\n", clientip.c_str(), cilentport, sockfd);// 这里已经获取连接成功,由于底层数据不一定就绪// 所以这里需要将新连接的文件描述符交给poll托管// 只需将文件描述符加入到_rfds即可int pos =0;for(; pos < _num; pos++){if(_rfds[pos].fd ==-1){ _rfds[pos].fd = sockfd; _rfds[pos].events |= POLLIN;break;}}// 当存储上限时,可以选择扩容,由于poll并不是很重要,这里我为了方便就直接关闭文件描述符if(pos == _num){close(sockfd); lg.LogMessage(Warning,"server is full...!\n");}}else{// 是新数据来了// 这里读是有问题的char buffer[1024]; bool flag =recv(fd,buffer,1024,0);if(flag)// 读取成功{ lg.LogMessage(Info,"client say# %s\n",buffer);}else// 读取失败{ lg.LogMessage(Warning,"cilent quit !! close fd : %d\n",fd);close(fd); _rfds[i].fd =-1; _rfds[i].events =0; _rfds[i].revents =0;}}}}} public:PollServer(uint16_t port = defalutport):_port(port),_listensock(new TcpSocket()),_isrunning(false),_num(num),_rfds(new pollfd[_num]){}voidInit(){ _listensock->BuildListenSocketMethod(_port, gbacklog);for(int i =0; i < _num; i++){ _rfds[i].fd =-1; _rfds[i].events =0; _rfds[i].revents =0;} _rfds[0].fd = _listensock.get()->GetSockFd(); _rfds[0].events |= POLLIN;}voidLoop(){ _isrunning = true;while(_isrunning){PrintDebug();int timeout =1000;ssize_t n =poll(_rfds,_num,timeout);switch(n){case-1:{ lg.LogMessage(Fatal,"select Error\n");break;}case0:{ lg.LogMessage(Info,"select timeout...");break;}default:{ lg.LogMessage(Info,"select success , begin handler event\n");HandlerEvent();break;}}} _isrunning = false;}voidStop(){ _isrunning = false;}// 查看当前哪些文件描述符需要被监控voidPrintDebug(){ std::cout <<"current select rfds list is : ";for(int i =0; i < _num; i++){if(_rfds[i].fd ==-1)continue;else std::cout << _rfds[i].fd <<" ";} std::cout << std::endl;}~PollServer(){ delete[] _rfds;} private: unique_ptr<Socket> _listensock;uint16_t _port; bool _isrunning;int _num;structpollfd* _rfds;};3.5 Main.cpp(服务端)
#include<iostream>#include<memory>#include"PollServer.hpp" using namespace std;// ./pollServer portintmain(int argc ,char* argv[]){if(argc !=2){ cout <<"Usage : "<< argv[0]<<" port"<< endl;exit(0);}uint16_t localport =stoi(argv[1]); unique_ptr<PollServer> svr = make_unique<PollServer>(localport); svr->Init(); svr->Loop();return0;}结尾
如果有什么建议和疑问,或是有什么错误,大家可以在评论区中提出。
希望大家以后也能和我一起进步!!🌹🌹
如果这篇文章对你有用的话,希望大家给一个三连支持一下!!🌹🌹