intredisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
return redisAsyncSetConnectCallbackImpl(ac, fn, NULL);
}
intredisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn, redisConnectCallbackNC *fn_nc) {
/* If either are already set, this is an error */if (ac->onConnect || ac->onConnectNC) return REDIS_ERR;
if (fn) {
ac->onConnect = fn;
} elseif (fn_nc) {
ac->onConnectNC = fn_nc;
}
/* The common way to detect an established connection is to wait for *
the first write event to be fired. This assumes the related event *
library functions are already set. */
_EL_ADD_WRITE(ac);
return REDIS_OK;
}
voidredisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* must not be called from a callback */
assert(!(c->flags & REDIS_IN_CALLBACK));
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */if (__redisAsyncHandleConnect(ac) != REDIS_OK) return;
/* Try again later when the context is still not connected. */if (!(c->flags & REDIS_CONNECTED)) return;
}
c->funcs->async_read(ac); // 底层调用 redisAsyncRead
}
voidredisAsyncRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
/* Always re-schedule reads */
_EL_ADD_READ(ac);
redisProcessCallbacks(ac);
}
}
// 1. 连接未断开的逻辑voidredisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
void *reply = NULL;
int status;
while((status = redisGetReply(c, &reply)) == REDIS_OK) {
...
/* 协议解析 */if (cb.fn != NULL) {
__redisRunCallback(ac, &cb, reply); // 用户的回调函数if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
c->reader->fn->freeObject(reply); // 自动释放 reply
}
/* Proceed with free'ing when redisAsyncFree() was called. */if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
} else {
/* No callback for this reply. This can either be a NULL callback,
* or there were no callbacks to begin with. Either way, don't
* abort with an error, but simply ignore it because the client
* doesn't know what the server will spit out over the wire.
*/
c->reader->fn->freeObject(reply);
}
/* If in monitor mode, repush the callback */if (c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies, &cb);
}
}
/* Disconnect when there was an error reading the reply */if (status != REDIS_OK) __redisAsyncDisconnect(ac);
}
staticvoid __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
redisContext *c = &(ac->c);
if (cb->fn != NULL) {
c->flags |= REDIS_IN_CALLBACK;
cb->fn(ac, reply, cb->privdata);
c->flags &= ~REDIS_IN_CALLBACK;
}
}
// 2. 连接断开的逻辑void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
....
/* cleanup event library on disconnect.
* this is safe to call multiple times */
_EL_CLEANUP(ac); // 删除事件,cleanup,由用户外部适配 redis 的接口/* For non-clean disconnects, __redisAsyncFree() will execute pending
* callbacks with a NULL-reply. */if (!(c->flags & REDIS_NO_AUTO_FREE)) {
__redisAsyncFree(ac); // 调用用户定义的回调函数
}
}
staticvoid __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
...
/* Signal event lib to clean up */
_EL_CLEANUP(ac);
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
* this context, the status will always be REDIS_OK. */if (c->flags & REDIS_CONNECTED) {
int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
if (c->flags & REDIS_FREEING) status = REDIS_OK;
__redisRunDisconnectCallback(ac, status); // 执行用户定义的回调函数
}
if (ac->dataCleanup) {
ac->dataCleanup(ac->data);
}
/* Cleanup self */
redisFree(c);
}
staticvoid __redisRunDisconnectCallback(redisAsyncContext *ac, int status) {
if (ac->onDisconnect) {
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
ac->c.flags |= REDIS_IN_CALLBACK;
ac->onDisconnect(ac, status);
ac->c.flags &= ~REDIS_IN_CALLBACK;
} else {
/* already in callback */
ac->onDisconnect(ac, status);
}
}
}
5. 示例代码:
#include<hiredis/hiredis.h>#include<hiredis/async.h>#include<time.h>#include"reactor.h"#include"adapter_async.h"staticreactor_t *R;
staticint cnt, before, num;
intcurrent_tick() {
int t = 0;
structtimespecti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (int)ti.tv_sec * 1000;
t += ti.tv_nsec / 1000000;
return t;
}
voidgetCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) return;
// printf("argv[%s]: %lld\n", (char *)privdata, reply->integer);// ========== 打印 outbuf 长度 ==========// redisContext *ctx = &(c->c);// if (ctx->obuf) {// printf("connectCallback: outbuf len = %zu (bytes)\n", strlen(ctx->obuf));// } else {// printf("connectCallback: outbuf is NULL\n");// }/* Disconnect after receiving the reply to GET */
cnt++;
if (cnt == num) {
int used = current_tick() - before;
printf("after %d exec redis command, used %d ms\n", num, used);
redisAsyncDisconnect(c);
}
}
voidconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
stop_eventloop(R);
return;
}
printf("Connected...\n");
for (int i = 0; i < num; i++) {
if (redisAsyncCommand((redisAsyncContext *)c, getCallback, "count", "INCR counter") != REDIS_OK) {
printf("wrong\n");
}
}
}
voiddisconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
stop_eventloop(R);
return;
}
printf("Disconnected...\n");
stop_eventloop(R);
}
intmain(int argc, char **argv) {
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
if (c->err) {
/* Let *c leak for now... */printf("Error: %s\n", c->errstr);
return1;
}
// int sndbuf_size = 1024;// 64 * 1024// setsockopt(c->c.fd, SOL_SOCKET, SO_SNDBUF, &sndbuf_size, sizeof(sndbuf_size));
R = create_reactor();
redisAttach(R, c);
redisAsyncSetConnectCallback(c, connectCallback);
redisAsyncSetDisconnectCallback(c, disconnectCallback);
before = current_tick();
num = (argc > 1) ? atoi(argv[1]) : 1000;
eventloop(R);
release_reactor(R);
return0;
}
6. 注意点:
默认情况下异步连接无需手动释放上下文和 reply
void __redisAsyncDisconnect(redisAsyncContext *ac) {
...
/* For non-clean disconnects, __redisAsyncFree() will execute pending
* callbacks with a NULL-reply. */if (!(c->flags & REDIS_NO_AUTO_FREE)) {
__redisAsyncFree(ac);
}
}
voidredisProcessCallbacks(redisAsyncContext *ac) {
...
if (cb.fn != NULL) {
__redisRunCallback(ac, &cb, reply);
if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
c->reader->fn->freeObject(reply);
}
/* Proceed with free'ing when redisAsyncFree() was called. */if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
} else {
/* No callback for this reply. This can either be a NULL callback,
* or there were no callbacks to begin with. Either way, don't
* abort with an error, but simply ignore it because the client
* doesn't know what the server will spit out over the wire.
*/
c->reader->fn->freeObject(reply);
}
...
}