非阻塞connect
看到Redis源码中主从复制的源码,对某些逻辑不是很确定。梳理了Redis非阻塞connect的大概实现之后,自己写了一个简单的版本。
需要理解的是:非阻塞connect的完成被认为是使响应套接字可写。(UNP Ch6.10)
一、主要流程:
- 创建非阻塞socket,socket(...., SOCK_NONBLOCK, ...)
- 检查connect(fd, ...)返回是否为0
- 如果为-1,检查errno是否为EINPROGRESS,如果connect失败且错误不为EINPROGRESS,返回错误。
- 返回fd,并利用IO多路复用阻塞,监听POLLOUT事件。
- getsockopt(fd, SOL_SOCKET, SO_ERROR, ...)检查socket状态
- 成功
static void epoll_ctl_add(int epfd, int fd, int evts) { |
int err = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); |
void connectionEstablished(int fd) { |
socklen_t errlen = sizeof(sockerr); |
getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen); |
printf("connection done.\n"); |
void handle_events(struct epoll_event* e, int epfd) { |
printf("events %d: ", e->data.fd); |
if (e->events & EPOLLOUT) { |
connectionEstablished(e->data.fd); |
/* non-blocking-connect */ |
int connect(const char* ip, int port) { |
struct sockaddr_in address; |
bzero(&address, sizeof(address)); |
address.sin_family = AF_INET; |
inet_pton(AF_INET, ip, &address.sin_addr); |
address.sin_port = htons(port); |
int s = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); |
if (connect(s, (struct sockaddr*)&address, sizeof(address)) == -1) { |
if (errno == EINPROGRESS) { |
int fd = connect("127.0.0.1", 8888); |
printf("connect failed\n"); |
struct epoll_event events[MAX_EVENTS]; |
epoll_ctl_add(epfd, fd, EPOLLOUT); |
int n = epoll_wait(epfd, events, MAX_EVENTS, -1); |
for (int i = 0; i < n; i++) { |
handle_events(&events[i], epfd); |
使用nc -l 8888当服务端,测试发现确实是可以通过监听POLLOUT事件来判断connect成功的
二、Redis源码:
int connectWithMaster(void) { |
/* 从服务器作为client,执行connect(2)连接到master */ |
fd = anetTcpNonBlockBindConnect(NULL, |
server.masterhost,server.masterport,REDIS_BIND_ADDR); |
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", |
/* 监听读写事件,设置事件处理回调函数为syncWithMaster */ |
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == |
redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); |
server.repl_transfer_lastio = server.unixtime; |
server.repl_transfer_s = fd; |
server.repl_state = REDIS_REPL_CONNECTING; |
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { |
int sockerr = 0, psync_result; |
socklen_t errlen = sizeof(sockerr); |
/* If this event fired after the user turned the instance into a master |
* with SLAVEOF NO ONE we must just return ASAP. */ |
if (server.repl_state == REDIS_REPL_NONE) { |
/* Check for errors in the socket. */ |
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) |
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); |
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", |
/* If we were connecting, it's time to send a non blocking PING, we want to |
* make sure the master is able to reply before going into the actual |
* replication process where we have long timeouts in the order of |
* seconds (in the meantime the slave would block). */ |
/* 建立连接后首先给master发PING,确保两端读写正常和master可以正确处理命令 |
因为从服务器注册了RD and WR,而非阻塞connect(2)会触发EPOLLOUT,所以会执行第一步 |
if (server.repl_state == REDIS_REPL_CONNECTING) { |
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); |
/* Delete the writable event so that the readable event remains |
* registered and we can wait for the PONG reply. */ |
aeDeleteFileEvent(server.el,fd,AE_WRITABLE); |
server.repl_state = REDIS_REPL_RECEIVE_PONG; |
/* Send the PING, don't check for errors at all, we have the timeout |
* that will take care about this. */ |
syncWrite(fd,"PING\r\n",6,100); |
/* Receive the PONG command. */ |
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { |
继续看一下Redis非阻塞IO的实现:
#define ANET_CONNECT_NONE 0 |
#define ANET_CONNECT_NONBLOCK 1 |
static int anetTcpGenericConnect(char *err, char *addr, int port, |
char *source_addr, int flags) |
for (p = servinfo; p != NULL; p = p->ai_next) { |
if (connect(s,p->ai_addr,p->ai_addrlen) == -1) { |
/* If the socket is non-blocking, it is ok for connect() to |
* return an EINPROGRESS error here. */ |
if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK) |