Redis 同步与异步连接代码实现及接口源码分析
了基于 hiredis 库的 Redis 同步与异步连接实现。对比了阻塞 IO 与非阻塞 IO 的特性差异,列出了核心 API 接口及内存管理规范。重点分析了异步连接的事件循环机制,演示了如何将其集成至自定义 Reactor 框架,并通过源码剖析揭示了连接建立、命令发送及回调处理的底层逻辑,适用于高并发网络服务开发参考。

了基于 hiredis 库的 Redis 同步与异步连接实现。对比了阻塞 IO 与非阻塞 IO 的特性差异,列出了核心 API 接口及内存管理规范。重点分析了异步连接的事件循环机制,演示了如何将其集成至自定义 Reactor 框架,并通过源码剖析揭示了连接建立、命令发送及回调处理的底层逻辑,适用于高并发网络服务开发参考。

同步连接方案采用阻塞 io 来实现;优点是代码书写是同步的,业务逻辑没有割裂;缺点是阻塞当前线程,直至 redis 返回结果;通常用多个线程来实现线程池来解决效率问题;
异步连接方案采用非阻塞 io 来实现;优点是没有阻塞当前线程,redis 没有返回,依然可以往 redis 发送命令;缺点是代码书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决(openresty,skynet);配合 redis6.0 以后的 io 多线程(前提是有大量并发请求),异步连接池,能更好解决应用层的数据访问性能;
| 接口名称 | 功能描述 | 核心参数说明 | 返回值说明 | 典型使用场景 |
|---|---|---|---|---|
redisConnect | 创建 Redis 同步连接(无连接超时限制),建立 TCP 连接 | -const char *host:Redis 服务 IP / 域名;-int port:Redis 服务端口(默认 6379) | redisContext*:连接上下文指针。NULL = 内存分配失败;非 NULL 需检查 ctx->err 判断连接是否成功 | 简单测试场景,对连接超时无要求,快速建立连接 |
redisConnectWithTimeout | 创建 Redis 同步连接(带连接超时限制),避免连接阶段无限阻塞 | -const char *host;-int port;-const struct timeval *timeout:连接超时结构体 | redisContext*:同 redisConnect,需检查 ctx->err 判断连接结果 | 生产环境推荐,需要控制连接超时时间(如网络不稳定场景) |
redisCommand | 同步发送 Redis 命令(格式化输入,类似 printf),阻塞等待执行结果返回 | -redisContext *c:连接上下文;-const char *format:命令格式化字符串;-...:可变参数 | redisReply*:命令执行结果指针。NULL = 执行失败; 非 NULL 需通过 reply->type 解析结果 | 大多数同步场景,命令参数较少、无特殊字符(空格、引号),使用便捷 |
redisCommandArgv | 同步发送 Redis 命令(批量参数传入),避免格式化字符串的转义 / 注入问题 | -redisContext *c;-int argc:参数个数;-const char **argv:参数数组;-const size_t *argvlen:参数长度数组 | redisReply*:同 redisCommand,需解析 reply->type | 命令参数较多、包含特殊字符,或参数来自外部输入(避免安全问题)的场景 |
freeReplyObject | 释放 redisReply 结果对象占用的内存,避免内存泄漏 | void *reply:redisReply* 类型指针(即 redisCommand 返回值) | 无(void) | 每次获取 redisReply 并处理完成后,必须调用该接口释放内存 |
redisFree | 关闭 Redis 连接,释放 redisContext 连接上下文的所有资源 | redisContext *c:连接上下文指针 | 无(void) | 程序退出、不再使用该 Redis 连接时,清理连接资源 |
redisGetError | 获取 Redis 连接 / 命令执行的错误信息,用于问题排查 | redisContext *c:连接上下文指针 | const char*:错误信息字符串,NULL 表示无错误 | 连接失败或命令执行返回 NULL 时,排查错误原因(如网络中断、命令语法错误) |
使用前需安装 hiredis 库(Ubuntu:apt-get install libhiredis-dev;CentOS:yum install hiredis-devel),编译时需链接库:gcc xxx.c -o xxx -lhiredis。
redisContext 和 redisReply 均由 hiredis 分配内存,必须通过 redisFree 和 freeReplyObject 释放,不可直接用 free()。redisCommand(或 redisCommandArgv)返回的 redisReply,使用完成后都要调用 freeReplyObject,否则会造成内存泄漏。redisReply 常见结果类型(解析结果的核心):REDIS_REPLY_STRING:字符串结果,内容存于 reply->str,长度存于 reply->len。REDIS_REPLY_INTEGER:整数结果,值存于 reply->integer。REDIS_REPLY_STATUS:状态结果(如 OK),内容存于 reply->str。REDIS_REPLY_ERROR:命令执行错误,错误信息存于 reply->str。REDIS_REPLY_ARRAY:数组结果,元素个数存于 reply->elements,元素数组存于 reply->element。REDIS_REPLY_NIL:空结果(对应 Redis 的 nil)。上述接口均为阻塞式同步接口,命令发送后会阻塞当前线程,直到获取 Redis 返回结果或超时,单个 redisContext 不支持并发操作。
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <hiredis/hiredis.h>
int main() {
// 1. 连接 Redis(参数:地址、端口、超时时间(毫秒))
redisContext *conn = redisConnect("127.0.0.1", 6379);
redisReply *reply;
if (conn == NULL || conn->err) {
if (conn) {
printf("连接失败:%s\n", conn->errstr);
redisFree(conn);
} else {
printf("连接失败:无法分配连接对象\n");
}
return 1;
}
printf("Redis 连接成功!\n");
// 2. 若 Redis 设置了密码,需执行 AUTH 认证(无密码则跳过)
// const char *redis_pwd = "你的 Redis 密码";
// reply = redisCommand(conn, "AUTH %s", redis_pwd);
// if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
// printf("认证失败:%s\n", reply ? reply->str : "未知错误");
// freeReplyObject(reply);
// redisFree(conn);
// return 1;
// }
// freeReplyObject(reply); // 释放命令返回结果
// 3. 执行 SET 命令:设置键 test_key 的值为 hello_redis
reply = redisCommand(conn, "SET test_key %s", "hello_redis");
if (reply == NULL || reply->type != REDIS_REPLY_STATUS || strcasecmp(reply->str, "OK") != 0) {
printf("SET 命令执行失败:%s\n", reply ? reply->str : "未知错误");
freeReplyObject(reply);
redisFree(conn);
return 1;
}
printf("SET 命令执行成功!\n");
freeReplyObject(reply);
// 4. 执行 GET 命令:获取 test_key 的值
reply = redisCommand(conn, "GET test_key");
if (reply == NULL || reply->type != REDIS_REPLY_STRING) {
printf("GET 命令执行失败:%s\n", reply ? reply->str : "未知错误");
freeReplyObject(reply);
redisFree(conn);
return 1;
}
printf("GET test_key 的结果:%s\n", reply->str);
freeReplyObject(reply);
// 5. 释放资源
redisFree(conn);
return 0;
}
hiredis 的异步接口是非阻塞式的,依赖事件循环(如 hiredis 自带的 ae、或 epoll/kqueue)驱动,所有结果(连接状态、命令执行结果)均通过回调函数返回。
| 接口名称 | 功能描述 | 核心参数说明 | 返回值说明 | 典型使用场景 |
|---|---|---|---|---|
redisAsyncConnect | 创建 Redis 异步连接(无连接超时,非阻塞),仅初始化连接资源,不立即建立 TCP 连接 | -const char *host:Redis 服务 IP / 域名;-int port:Redis 服务端口(默认 6379) | redisAsyncContext*:异步连接上下文指针。-NULL = 内存分配失败; -非 NULL 需检查 ctx->err 判断上下文初始化是否成功(非实际连接结果) | 简单异步场景,无需绑定本地地址,对连接初始化无特殊要求 |
redisAsyncConnectBind | 创建 Redis 异步连接(带本地地址绑定,非阻塞),初始化时绑定本地网卡 / 端口 | -const char *host;-int port;-const char *source_addr:本地绑定 IP;-int source_port:本地绑定端口 | redisAsyncContext*:同 redisAsyncConnect,检查 ctx->err 判断上下文初始化结果 | 需要指定本地出口 IP / 端口的场景(如多网卡服务器、端口映射需求) |
redisAsyncSetConnectCallback | 设置异步连接回调函数,感知连接成功 / 失败的结果(异步核心) | -redisAsyncContext *ac:异步连接上下文;-redisConnectCallback *fn:连接回调函数(格式:-void(redisAsyncContext*, int status)) | int:-0 = 回调设置成功; -非 0 = 设置失败(上下文无效 / 已设置回调) | 所有异步场景必须调用,用于处理连接建立结果(成功初始化资源 / 连接失败) |
redisAsyncSetDisconnectCallback | 设置异步断开连接回调函数,感知连接被动 / 主动断开的事件 | -redisAsyncContext *ac;-redisDisconnectCallback *fn:断开回调函数(格式:-void(redisAsyncContext*, int status)) | int:-0 = 回调设置成功; -非 0 = 设置失败 | 所有异步场景推荐调用,用于清理连接断开后的相关资源(如释放自定义数据) |
redisAsyncCommand | 异步发送 Redis 命令(格式化输入,类似 printf),非阻塞,结果通过回调返回 | -redisAsyncContext *ac;-redisCallbackFn *fn:命令结果回调;-void *privdata:用户自定义参数(传递给回调);-const char *format:命令格式化字符串;...:可变参数 | int:-0 = 命令发送成功(加入事件循环队列); -非 0 = 发送失败(上下文无效 / 连接断开) | 命令参数较少、无特殊字符,需要传递自定义参数给回调的场景 |
redisAsyncCommandArgv | 异步发送 Redis 命令(批量参数传入),避免转义 / 注入问题,非阻塞,结果回调返回 | -redisAsyncContext *ac;-redisCallbackFn *fn;-void *privdata;-int argc:参数个数;-const char **argv:参数数组;-const size_t *argvlen:参数长度数组 | int:-0 = 命令发送成功; -非 0 = 发送失败 | 命令参数较多、含特殊字符,或外部输入参数(安全需求)的生产环境场景 |
redisAsyncDisconnect | 主动断开 Redis 异步连接,非阻塞,断开结果通过 DisconnectCallback 返回。(注意:默认情况下内部隐含 redisAsyncFree) | redisAsyncContext *ac:异步连接上下文 | 无(void) | 程序退出、不再使用该异步连接时,主动触发断开流程 |
redisAeAttach | 将异步连接上下文绑定到 ae 事件循环(hiredis 推荐 / Redis 内置事件库),驱动异步流程 | aeEventLoop *el:ae 事件循环实例;redisAsyncContext *ac:异步连接上下文 | 无(void) | 所有基于 ae 事件循环的异步场景必须调用,用于监听 socket 读写事件,推进异步流程 |
redisAsyncFree | 释放 redisAsyncContext 异步连接上下文的所有资源,清理内存 | redisAsyncContext *ac:异步连接上下文 | 无(void) | 仅在 DisconnectCallback 触发后调用(确认连接已完全断开),避免资源泄漏 |
异步接口本身不具备事件驱动能力,必须依赖事件循环(如 hiredis 自带的 ae、Linux 的 epoll、BSD 的 kqueue)监听 socket 的读写事件,才能完成 TCP 连接建立、命令发送、结果接收的全流程,redisAeAttach 是 hiredis 与 ae 事件循环的绑定入口(ae 是 Redis 源码内置的轻量级事件库,也是 hiredis 的默认推荐)。
redisAsyncContext 异步连接使用独立的上下文,与同步连接的 redisContext 不通用、不可混用接口;创建后仅需检查 ctx->err 判断上下文初始化是否成功(如内存分配、socket 创建),而实际 TCP 连接结果只能通过 redisAsyncSetConnectCallback 的回调函数获取(非阻塞特性,无法立即返回连接结果)。
status 参数为 0 表示成功(连接建立 / 正常断开),非 0 表示失败(连接超时 / 异常断开)。void(redisAsyncContext *ctx, void *reply, void *privdata):
reply:redisReply* 类型的命令执行结果(与同步接口的结果格式一致,支持相同的类型解析)。privdata:用户调用 redisAsyncCommand 时传递的自定义参数(用于回调中传递业务数据)。reply 无需手动调用 freeReplyObject 释放,hiredis 会在回调函数执行完成后自动清理,避免重复释放导致崩溃。所有异步接口均为非阻塞调用,调用后立即返回,不会阻塞当前线程;命令发送后只是将请求加入事件循环队列,等待事件循环触发时才会实际发送到 Redis 服务,结果返回后也会通过事件循环触发回调函数。
redisAsyncFree,用户无需重复调用,避免重复释放。privdata(若为动态分配内存),需在回调函数中手动释放(hiredis 不会处理用户自定义内存)。单个 redisAsyncContext 不支持多线程并发操作,若需多线程处理,需通过互斥锁保护,或为每个线程创建独立的异步上下文。
void eventloop_once(reactor_t * r, int timeout) {
int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
for (int i = 0; i < n; i++) {
struct epoll_event *e = &r->fire[i];
int mask = e->events;
if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
event_t *et = (event_t*) e->data.ptr;
if (mask & EPOLLIN) {
if (et->read_fn) et->read_fn(et->fd, EPOLLIN, et);
}
if (mask & EPOLLOUT) {
if (et->write_fn) et->write_fn(et->fd, EPOLLOUT, et);
else {
uint8_t * buf = buffer_write_atmost(evbuf_out(et));
event_buffer_write(et, buf, buffer_len(evbuf_out(et)));
}
}
}
}
void stop_eventloop(reactor_t * r) {
r->stop = 1;
}
void eventloop(reactor_t * r) {
while (!r->stop) {
// int timeout = find_nearest_expire_timer();
eventloop_once(r, /*timeout*/ -1);
// expire_timer();
}
}
redis 异步底层采用的是 io 多路复用(事件模型)redis 负责事件的设置,至于事件触发的回调函数由我们来写(即我们提供 event_loop 和 callback_func) redis 负责协议的解析,即我们需要在 callback_func 中,需要调用 redis 提供的 send 和 write 函数:redisAsyncHandleRead(redisAsyncContext* ctx)redisAsyncHandleWrite(redisAsyncContext* ctx) 我们设置的命令响应回调、连接回调、断连回调在上面这两个函数内部调用。
具体步骤:
关键替换 addRead,delRead,addWrite,delWrite,cleanup,scheduleTimer。
可以参照 redis-7.2.4/deps/hiredis/adapters/ae.h 来仿写
以下是我们仿写的
typedef struct {
event_t e;
int mask;
redisAsyncContext *ctx;
} redis_event_t;
static void redisReadHandler(int fd, int events, void *privdata) {
((void)fd);
((void)events);
printf("redisReadHandler %d\n", fd);
event_t *e = (event_t*)privdata;
redis_event_t *re = (redis_event_t *)(char *)e;
redisAsyncHandleRead(re->ctx);
// 读完并解析之后,执行用户设置的回调函数,执行完之后如果判断还有数据没收到或没解析全部,就继续保持读事件,否则删除读事件设置写事件
}
static void redisWriteHandler(int fd, int events, void *privdata) {
((void)fd);
((void)events);
event_t *e = (event_t*)privdata;
redis_event_t *re = (redis_event_t *)(char *)e;
redisAsyncHandleWrite(re->ctx);
// 发送完之后,如果没发送完全,继续保持写事件,否则删除写事件设置读事件
}
// ... 各项事件函数 (redisAddRead、redisDelRead、redisAddWrite、redisDelWrite)
// redisCleanup:释放 re
static void redisCleanup(void *privdata) {
redis_event_t *re = (redis_event_t *)privdata;
reactor_t *r = re->e.r;
del_event(r, &re->e);
hi_free(re);
}
static int redisAttach(reactor_t *r, redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redis_event_t *re;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL) return REDIS_ERR;
/* Create container for ctx and r/w events */
re = (redis_event_t*)hi_malloc(sizeof(*re));
if (re == NULL) return REDIS_ERR;
re->ctx = ac;
re->e.fd = c->fd;
re->e.r = r;
// dont use event buffer, using hiredis's buffer
re->e.in = NULL;
re->e.out = NULL;
re->mask = 0;
ac->ev.addRead = redisAddRead;
ac->ev.delRead = redisDelRead;
ac->ev.addWrite = redisAddWrite;
ac->ev.delWrite = redisDelWrite;
ac->ev.cleanup = redisCleanup;
ac->ev.data = re;
return REDIS_OK;
}
给回调指针赋值并添加写事件,写事件会在连接首次建立的时候触发
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
return redisAsyncSetConnectCallbackImpl(ac, fn, NULL);
}
int redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn, redisConnectCallbackNC *fn_nc) {
/* If either are already set, this is an error */
if (ac->onConnect || ac->onConnectNC) return REDIS_ERR;
if (fn) {
ac->onConnect = fn;
} else if (fn_nc) {
ac->onConnectNC = fn_nc;
}
/* The common way to detect an established connection is to wait for *
the first write event to be fired. This assumes the related event *
library functions are already set. */
_EL_ADD_WRITE(ac);
return REDIS_OK;
}
给回调指针赋值
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
if (ac->onDisconnect == NULL) {
ac->onDisconnect = fn;
return REDIS_OK;
}
return REDIS_ERR;
}
设置回调,接着往 outbuf 末尾插入新的命令,并设置写事件 redisvAsyncCommand:参数分别对应:上下文,回调函数,回调函数中的第三个参数(自定义私有数据),剩下参数(发送给 redis 的指令)
int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
char *cmd;
int len;
int status;
len = redisvFormatCommand(&cmd, format, ap);
/* We don't want to pass -1 or -2 to future functions as a length. */
if (len < 0) return REDIS_ERR;
status = __redisAsyncCommand(ac, fn, privdata, cmd, len);
hi_free(cmd);
return status;
}
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
redisContext *c = &(ac->c);
redisCallback cb;
...
/* Setup callback */
cb.fn = fn;
cb.privdata = privdata;
cb.pending_subs = 1;
cb.unsubscribe_sent = 0;
/* Don't accept new commands when the connection is about to be closed. */
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
/* Find out which command will be appended. */
....
__redisAppendCommand(c, cmd, len);
/* Always schedule a write when the write buffer is non-empty */
_EL_ADD_WRITE(ac);
return REDIS_OK;
oom:
...
}
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len) {
hisds newbuf;
newbuf = hi_sdscatlen(c->obuf, cmd, len);
if (newbuf == NULL) {
__redisSetError(c, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
c->obuf = newbuf;
return REDIS_OK;
}
hisds hi_sdscatlen(hisds s, const void *t, size_t len) {
size_t curlen = hi_sdslen(s);
s = hi_sdsMakeRoomFor(len);
if (s == NULL) return NULL;
memcpy(s + curlen, t, len);
hi_sdssetlen(s, curlen + len);
s[curlen + len] = '\0';
return s;
}
补充:
用户定义的命令响应回调函数需匹配:typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
第一个参数:redisAsyncContext *
第二个参数:redisReply *reply
第三个参数:用户自定义的数据类型
因连接建立触发,则调用用户定义的回调函数不因连接触发的逻辑,则发送 outbuf 里面的数据,如果一次没发送完成就设置写事件和读事件,否则就只设置读事件。如果监测到连接关闭,则触发连接断开的一系列操作 (详细见 redisAsyncHandleRead)
void redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* must not be called from a callback */
assert(!(c->flags & REDIS_IN_CALLBACK));
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
if (__redisAsyncHandleConnect(ac) != REDIS_OK) return;
/* Try again later when the context is still not connected. */
if (!(c->flags & REDIS_CONNECTED)) return;
}
c->funcs->async_write(ac); // 调用 redisAsyncWrite(redisAsyncContext *ac)
}
// 1. 不因连接触发对应的逻辑
void redisAsyncWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
int done = 0;
if (redisBufferWrite(c, &done) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
/* Continue writing when not done, stop writing otherwise */
if (!done) _EL_ADD_WRITE(ac);
else _EL_DEL_WRITE(ac);
/* Always schedule reads after writes */
_EL_ADD_READ(ac);
}
}
// 2. 因连接触发的逻辑
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
int completed = 0;
redisContext *c = &(ac->c);
...
/* 判断是否真的连接成功了,若是往下走 */
c->flags |= REDIS_CONNECTED;
__redisRunConnectCallback(ac, REDIS_OK); // 调用用户设置的回调函数
...
}
static void __redisRunConnectCallback(redisAsyncContext *ac, int status) {
if (ac->onConnect == NULL && ac->onConnectNC == NULL) return;
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
ac->c.flags |= REDIS_IN_CALLBACK;
if (ac->onConnect) {
ac->onConnect(ac, status);
} else {
ac->onConnectNC(ac, status);
}
ac->c.flags &= ~REDIS_IN_CALLBACK;
} else {
/* already in callback */
if (ac->onConnect) {
ac->onConnect(ac, status);
} else {
ac->onConnectNC(ac, status);
}
}
}
读事件是一直设置的 reply 默认情况下会自动释放,用户无需在回调函数中释放 监测到连接断开:移除事件(cleanup,用户自定义)、触发用户定义的连接断开回调函数、最后释放上下文(无需用户外部手动释放) 监测到是初次连接,会触发连接建立的相关逻辑(详细见 redisAsyncHandleWrite)
void redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* must not be called from a callback */
assert(!(c->flags & REDIS_IN_CALLBACK));
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
if (__redisAsyncHandleConnect(ac) != REDIS_OK) return;
/* Try again later when the context is still not connected. */
if (!(c->flags & REDIS_CONNECTED)) return;
}
c->funcs->async_read(ac); // 底层调用 redisAsyncRead
}
void redisAsyncRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
/* Always re-schedule reads */
_EL_ADD_READ(ac);
redisProcessCallbacks(ac);
}
}
// 1. 连接未断开的逻辑
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
void *reply = NULL;
int status;
while((status = redisGetReply(c, &reply)) == REDIS_OK) {
...
/* 协议解析 */
if (cb.fn != NULL) {
__redisRunCallback(ac, &cb, reply); // 用户的回调函数
if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
c->reader->fn->freeObject(reply); // 自动释放 reply
}
/* Proceed with free'ing when redisAsyncFree() was called. */
if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
} else {
/* No callback for this reply. This can either be a NULL callback,
* or there were no callbacks to begin with. Either way, don't
* abort with an error, but simply ignore it because the client
* doesn't know what the server will spit out over the wire.
*/
c->reader->fn->freeObject(reply);
}
/* If in monitor mode, repush the callback */
if (c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies, &cb);
}
}
/* Disconnect when there was an error reading the reply */
if (status != REDIS_OK) __redisAsyncDisconnect(ac);
}
static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
redisContext *c = &(ac->c);
if (cb->fn != NULL) {
c->flags |= REDIS_IN_CALLBACK;
cb->fn(ac, reply, cb->privdata);
c->flags &= ~REDIS_IN_CALLBACK;
}
}
// 2. 连接断开的逻辑
void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
....
/* cleanup event library on disconnect.
* this is safe to call multiple times */
_EL_CLEANUP(ac); // 删除事件,cleanup,由用户外部适配 redis 的接口
/* For non-clean disconnects, __redisAsyncFree() will execute pending
* callbacks with a NULL-reply. */
if (!(c->flags & REDIS_NO_AUTO_FREE)) {
__redisAsyncFree(ac); // 调用用户定义的回调函数
}
}
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
...
/* Signal event lib to clean up */
_EL_CLEANUP(ac);
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
* this context, the status will always be REDIS_OK. */
if (c->flags & REDIS_CONNECTED) {
int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
if (c->flags & REDIS_FREEING) status = REDIS_OK;
__redisRunDisconnectCallback(ac, status); // 执行用户定义的回调函数
}
if (ac->dataCleanup) {
ac->dataCleanup(ac->data);
}
/* Cleanup self */
redisFree(c);
}
static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status) {
if (ac->onDisconnect) {
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
ac->c.flags |= REDIS_IN_CALLBACK;
ac->onDisconnect(ac, status);
ac->c.flags &= ~REDIS_IN_CALLBACK;
} else {
/* already in callback */
ac->onDisconnect(ac, status);
}
}
}
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <time.h>
#include "reactor.h"
#include "adapter_async.h"
static reactor_t *R;
static int cnt, before, num;
int current_tick() {
int t = 0;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (int)ti.tv_sec * 1000;
t += ti.tv_nsec / 1000000;
return t;
}
void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) return;
// printf("argv[%s]: %lld\n", (char *)privdata, reply->integer);
// ========== 打印 outbuf 长度 ==========
// redisContext *ctx = &(c->c);
// if (ctx->obuf) {
// printf("connectCallback: outbuf len = %zu (bytes)\n", strlen(ctx->obuf));
// } else {
// printf("connectCallback: outbuf is NULL\n");
// }
/* Disconnect after receiving the reply to GET */
cnt++;
if (cnt == num) {
int used = current_tick() - before;
printf("after %d exec redis command, used %d ms\n", num, used);
redisAsyncDisconnect(c);
}
}
void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
stop_eventloop(R);
return;
}
printf("Connected...\n");
for (int i = 0; i < num; i++) {
if (redisAsyncCommand((redisAsyncContext *)c, getCallback, "count", "INCR counter") != REDIS_OK) {
printf("wrong\n");
}
}
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
stop_eventloop(R);
return;
}
printf("Disconnected...\n");
stop_eventloop(R);
}
int main(int argc, char **argv) {
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return 1;
}
// int sndbuf_size = 1024;
// 64 * 1024
// setsockopt(c->c.fd, SOL_SOCKET, SO_SNDBUF, &sndbuf_size, sizeof(sndbuf_size));
R = create_reactor();
redisAttach(R, c);
redisAsyncSetConnectCallback(c, connectCallback);
redisAsyncSetDisconnectCallback(c, disconnectCallback);
before = current_tick();
num = (argc > 1) ? atoi(argv[1]) : 1000;
eventloop(R);
release_reactor(R);
return 0;
}
void __redisAsyncDisconnect(redisAsyncContext *ac) {
...
/* For non-clean disconnects, __redisAsyncFree() will execute pending
* callbacks with a NULL-reply. */
if (!(c->flags & REDIS_NO_AUTO_FREE)) {
__redisAsyncFree(ac);
}
}
void redisProcessCallbacks(redisAsyncContext *ac) {
...
if (cb.fn != NULL) {
__redisRunCallback(ac, &cb, reply);
if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
c->reader->fn->freeObject(reply);
}
/* Proceed with free'ing when redisAsyncFree() was called. */
if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
} else {
/* No callback for this reply. This can either be a NULL callback,
* or there were no callbacks to begin with. Either way, don't
* abort with an error, but simply ignore it because the client
* doesn't know what the server will spit out over the wire.
*/
c->reader->fn->freeObject(reply);
}
...
}
redisAsyncDisconnect 断开连接,注意 redisAsyncDisconnect 内部隐含 redisAsyncFree,所以千万不要重复调用 redisAsyncFree| 对比维度 | 同步连接(redisContext) | 异步连接(redisAsyncContext) |
|---|---|---|
| 连接方式 | 调用 redisConnect/redisConnectWithTimeout,直接建立 TCP 连接 | 调用 redisAsyncConnect 初始化上下文,TCP 连接为非阻塞式(需事件循环驱动) |
| 连接结果获取 | 直接检查 ctx->err 判断连接是否成功(阻塞返回结果) | 需通过 redisAsyncSetConnectCallback 设置回调,由回调返回连接状态(非阻塞,异步通知) |
| 命令执行方式 | 调用 redisCommand/redisCommandArgv,阻塞等待 Redis 返回结果 | 调用 redisAsyncCommand/redisAsyncCommandArgv,非阻塞调用(命令加入事件队列,由事件循环触发发送) |
| 命令结果获取 | 直接返回 redisReply* 结果对象,同步拿到执行结果 | 需为命令设置回调函数,由回调函数接收 redisReply* 结果(异步通知) |
| 结果内存释放 | 必须手动调用 freeReplyObject(reply) 释放结果,否则内存泄漏 | 默认由 hiredis 自动释放 reply;仅开启 REDIS_NO_AUTO_FREE_REPLIES 时需手动处理 |
| 连接上下文释放 | 必须手动调用 redisFree(ctx) 释放连接资源 | 默认由 hiredis 在断开时自动调用 __redisAsyncFree 释放;仅开启 REDIS_NO_AUTO_FREE 时需在断开回调中手动调用 |
| 事件循环依赖 | 无依赖,单步阻塞式执行 | 必须依赖事件循环(如 hiredis 自带 ae、epoll),需通过 redisAeAttach(或用户仿照 ae.h 自己自定义书写一个) 绑定上下文与事件循环 |
| 线程 / 并发特性 | 单个 redisContext 不可多线程并发操作;单连接阻塞会占用线程 | 单个 redisAsyncContext 不可多线程并发操作,但单线程可通过事件循环管理多个异步连接 |
| 典型适用场景 | 简单逻辑、低并发场景;对响应速度要求不高的同步任务 | 高并发场景、多连接管理;需要非阻塞 IO 的服务(如网络服务器) |

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online