跳到主要内容RabbitMQ 客户端连接管理与信道搭建实践 | 极客日志C++
RabbitMQ 客户端连接管理与信道搭建实践
阐述了基于 muduo 库构建的 RabbitMQ 客户端连接管理模块,涵盖 TCP 连接建立、信道(Channel)生命周期管理及 Protobuf 消息编解码。通过示例代码演示了生产者与消费者的搭建流程,包括交换机声明、队列绑定及消息发布订阅。重点记录了调试过程中发现的响应哈希表键值匹配错误问题及其修复方案,最终验证了广播、直接及主题三种交换模式下的消息投递功能。项目整合了 muduo 网络层、Protobuf 应用层协议及 SQLite 数据持久化,实现了简化版消息队列组件。
RefactorPro1 浏览 1. 连接管理
在客户端这边,RabbitMQ 弱化了客户端的概念,因为用户所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这一流程。
这个模块同样是针对 muduo 库客户端连接的二次封装,向用户提供创建 channel 信道的接口,创建信道后,可以通过信道来获取指定服务。
#ifndef __M_CONNECTION_H__
#define __M_CONNECTION_H__
#include "muduo/proto/dispatcher.h"
#include "muduo/proto/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"
#include "channel.hpp"
#include "worker.hpp"
namespace rabbitmq {
class Connection {
public:
using ptr = std::shared_ptr<Connection>;
Connection(const std::string server_ip, int server_port, const AsyncWorker::ptr& worker)
: _latch(1),
_client(worker->_loopthread.startLoop(), muduo::net::InetAddress(server_ip, server_port), ),
_dispatcher(std::(&Connection::onUnknownMessage, , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
_codec(std::<ProtobufCodec>(std::(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
_worker(worker),
_channel_manager(std::<ChannelManager>()) {
_dispatcher.<rabbitmq::basicCommonResponse>(std::(&Connection::basicResponse, , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.<rabbitmq::basicConsumeResponse>(std::(&Connection::consumeResponse, , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.(std::(&ProtobufCodec::onMessage, _codec.(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.(std::(&Connection::onConnection, , std::placeholders::_1));
_client.();
_latch.();
}
{
Channel::ptr channel = _channel_manager->(_conn, _codec);
ret = channel->();
(ret == ) {
();
Channel::();
}
channel;
}
{
channel->();
_channel_manager->(channel->());
}
:
{
Channel::ptr channel = _channel_manager->(message->());
(channel.() == ) {
();
;
}
channel->(message);
}
{
Channel::ptr channel = _channel_manager->(message->());
(channel.() == ) {
();
;
}
_worker->_pool.([channel, message]() { channel->(message); });
}
{
LOG_INFO << << message->();
conn->();
}
{
(conn->()) {
_latch.();
_conn = conn;
} {
_conn.();
}
}
:
muduo::CountDownLatch _latch;
muduo::net::TcpConnectionPtr _conn;
muduo::net::TcpClient _client;
ProtobufDispatcher _dispatcher;
ProtobufCodecPtr _codec;
AsyncWorker::ptr _worker;
ChannelManager::ptr _channel_manager;
};
}
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown 转 HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
- HTML 转 Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online
"Client"
bind
this
make_shared
bind
make_shared
registerMessageCallback
bind
this
registerMessageCallback
bind
this
setMessageCallback
bind
get
setConnectionCallback
bind
this
connect
wait
Channel::ptr openChannel()
create
bool
openChannel
if
false
DLOG
"打开信道失败!"
return
ptr
return
void closeChannel(const Channel::ptr& channel)
closeChannel
remove
cid
private
void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp)
get
cid
if
get
nullptr
DLOG
"未找到信道信息!"
return
putBasicResponse
void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp)
get
cid
if
get
nullptr
DLOG
"未找到信道信息!"
return
push
consume
void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
"onUnknownMessage: "
GetTypeName
shutdown
void onConnection(const muduo::net::TcpConnectionPtr &conn)
if
connected
countDown
else
reset
private
#endif
这段代码是 RabbitMQ 客户端的连接管理模块,它负责与 RabbitMQ 服务器建立连接,并管理信道(Channel)的创建和关闭。主要功能包括:
- 封装 muduo 库的 TcpClient,建立与服务器的 TCP 连接。
- 使用 ProtobufCodec 进行消息的编解码,使用 ProtobufDispatcher 进行消息的分发。
- 管理多个信道(Channel),每个信道可以执行不同的 AMQP 操作(如声明交换机、队列,发布消息等)。
- 异步处理服务器推送的消息(如 basicConsumeResponse),通过线程池处理消息消费。
- 连接建立:在构造函数中,通过 TcpClient 连接服务器,并使用 CountDownLatch 等待连接建立成功。连接建立成功后,onConnection 回调函数会被调用,唤醒主线程。
- 消息分发:通过 ProtobufDispatcher 注册了两类消息的回调函数:basicCommonResponse(普通响应)和 basicConsumeResponse(消费消息响应)。当收到服务器消息时,根据消息类型分发给对应的处理函数。
- 信道管理:通过 ChannelManager 管理信道的创建和删除。每个信道有一个唯一的 cid(信道 ID),在创建信道时生成。当收到响应消息时,根据 cid 找到对应的信道,将响应放入信道的响应映射表中,等待信道处理。
- 异步消息处理:对于消费消息(basicConsumeResponse),将其封装成任务提交到线程池中,由工作线程调用信道的 consume 方法处理消息(即调用用户注册的回调函数)。
- 连接关闭:当收到未知消息时,会关闭连接。另外,当服务器断开连接时,也会触发 onConnection 回调,重置连接指针。
- 基础响应(basicCommonResponse):直接找到对应的信道,并将响应放入信道的响应映射表中,由信道中等待响应的线程处理(通过条件变量唤醒)。
- 消费响应(basicConsumeResponse):找到对应的信道,然后将消息处理任务提交到线程池,由工作线程执行信道的
consume 方法(最终调用用户注册的回调函数)。
2. 搭建客户端
发布消息的生产者客户端
#include "connection.hpp"
int main() {
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
rabbitmq::Channel::ptr channel = conn->openChannel();
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT, true, false, tmp_map);
channel->declareQueue("queue1", true, false, false, tmp_map);
channel->declareQueue("queue2", true, false, false, tmp_map);
channel->queueBind("exchange1", "queue1", "queue1");
for (int i = 0; i < 10; i++) {
channel->basicPublish("exchange1", nullptr, "Hello World-" + std::to_string(i));
}
conn->closeChannel(channel);
return 0;
}
订阅消息的消费者客户端
#include "connection.hpp"
void cb(rabbitmq::Channel::ptr &channel, const std::string consumer_tag, const rabbitmq::BasicProperties *bp, const std::string &body) {
std::cout << consumer_tag << "消费了消息:" << body << std::endl;
channel->basicAck(bp->id());
}
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cout << "usage: ./consume_client queue1\n";
return -1;
}
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
rabbitmq::Channel::ptr channel = conn->openChannel();
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT, true, false, tmp_map);
channel->declareQueue("queue1", true, false, false, tmp_map);
channel->declareQueue("queue2", true, false, false, tmp_map);
channel->queueBind("exchange1", "queue1", "queue1");
channel->queueBind("exchange1", "queue2", "news.music.#");
auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel->basicConsume("consumer1", argv[1], false, functor);
while (1) std::this_thread::sleep_for(std::chrono::seconds(3));
conn->closeChannel(channel);
return 0;
}
一、生产者客户端流程
生产者客户端的主要流程是建立连接、创建信道、声明交换机和队列、绑定队列到交换机,然后向交换机发布消息。
-
实例化异步工作线程对象:
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
创建异步工作线程对象,该对象包含一个 EventLoopThread(用于网络 IO)和一个线程池(用于处理消息)。
-
实例化连接对象:
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
创建 Connection 对象,传入服务器 IP 和端口以及异步工作线程对象。在构造函数中,会建立 TCP 连接,并且阻塞直到连接成功。
-
通过连接创建信道:
rabbitmq::Channel::ptr channel = conn->openChannel();
通过连接对象创建一个信道。在 openChannel 方法中,会通过 ChannelManager 创建 Channel 对象,并向服务器发送打开信道的请求,等待响应。
-
通过信道提供的服务完成所需操作:
声明一个交换机 exchange1,类型为广播模式(FANOUT):
channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT, true, false, tmp_map);
声明两个队列 queue1 和 queue2:
channel->declareQueue("queue1", true, false, false, tmp_map);
channel->declareQueue("queue2", true, false, false, tmp_map);
绑定队列到交换机,并设置绑定键(binding key):
channel->queueBind("exchange1", "queue1", "queue1");
channel->queueBind("exchange1", "queue2", "news.music.#");
-
循环向交换机发布消息:
for (int i = 0; i < 10; i++) {
channel->basicPublish("exchange1", nullptr, "Hello World-" + std::to_string(i));
}
向交换机 exchange1 发布 10 条消息。由于交换机类型是 FANOUT,消息会被广播到所有绑定的队列(queue1 和 queue2)。
-
关闭信道:
conn->closeChannel(channel);
注意:这里调用的是 channel 的 closeChannel 方法,实际上在 closeChannel 方法中会向服务器发送关闭信道的请求,并且从 ChannelManager 中移除该信道。
-
程序结束:Connection 对象和 AsyncWorker 对象会随着智能指针的释放而自动销毁,连接断开。
二、消费者客户端流程
消费者客户端的主要流程是建立连接、创建信道、声明交换机和队列、绑定队列到交换机,然后订阅队列中的消息,并处理消息。
-
实例化异步工作线程对象:同生产者。
-
实例化连接对象:同生产者。
-
通过连接创建信道:同生产者。
-
通过信道提供的服务完成所需操作:声明交换机和队列、绑定队列,与生产者相同。
-
订阅队列消息:
auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel->basicConsume("consumer1", argv[1], false, functor);
创建一个回调函数,然后调用 channel 的 basicConsume 方法订阅队列。这里传入消费者标签为 "consumer1",队列名由命令行参数指定,自动确认为 false(即手动确认),以及回调函数。
-
循环等待消息:
while (1) std::this_thread::sleep_for(std::chrono::seconds(3));
主线程循环等待,因为消息的处理是异步的(在工作线程池中),所以这里需要保持程序运行。
-
关闭信道:
conn->closeChannel(channel);
注意:这里是通过连接对象来关闭信道,实际上内部会调用 channel 的 closeChannel 方法,并从 ChannelManager 中移除。
-
程序结束:同样,连接和异步工作线程对象会随着智能指针的释放而自动清理。
三、消息处理流程
当消费者客户端订阅队列后,服务器会推送消息到客户端。客户端的网络线程(EventLoopThread)接收到消息后,通过 ProtobufCodec 解码,然后由 ProtobufDispatcher 根据消息类型分发。
对于消费消息(basicConsumeResponse),会调用 Connection::consumeResponse,然后在线程池中执行 Channel::consume 方法,最终调用用户注册的回调函数。
在回调函数中,用户处理消息,并手动确认消息(basicAck)。确认消息会发送一个请求到服务器,服务器收到后从队列中删除消息。
3. 功能联调
广播模式下的测试
分别编译运行:server、consume_client queue1、consume_client queue2、publish_client
user@mq-server:~/rabbit-mq/mqserver$ ./server
2026020314:42:57.353717Z 2752283 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:36086 - TcpServer.cc:80
[DBG][22:42:57][channel.hpp:43] new Channel: 0x5580b78cdfd0
[DBG][22:42:57][connection.hpp:26] b34ba8ee-60be-beb1-0000-000000000001 信道创建成功!
...
user@mq-client:~/rabbit-mq/mqclient$ ./consume_client queue1
2026020314:42:57.353236Z 2757934 INFO TcpClient::TcpClient[Client] - connector 0x56352AF5F770 - TcpClient.cc:69
2026020314:42:57.353557Z 2757934 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107
在运行发布消息的生产者客户端(publish_client)时,我们应该可以看到消费者客户端会消费消息,并通过回调函数将消息打印出来,但是我们在消费者客户端没有看到任何消息,说明我们代码中存在一点小问题。
经过调试发现,在服务端的连接管理模块实现时,我们在调用打开信道的接口时的传参应该插入信道 id,结果因为写错了,写成了请求 id。再次编译运行,发现还是和刚刚一样。
这里我们在对网络通信进行二次封装时(信道管理模块和连接管理模块),我们是没有进行测试的,所以我们大致方向应该就是在网络通信时出现了问题,在服务器的日志信息中可以看到服务器在创建信道时,成功将信道创建,但是客户端我们不知道有没有在收到服务器响应,并且在收到响应后有没有将其加入到 hash_map 中管理起来,所以我们可以在客户端添加一些日志。
我们可以在客户端创建信道,声明交换机/队列,删除交换机/队列等请求时添加日志,看客户端是否正常发送请求以及接收响应。
同样在服务端我们也添加一些日志,看看信道创建成功之后到底有没有给客户端响应。
user@mq-server:~/rabbit-mq/mqserver$ ./server
20260204 06:19:33.333944Z 3906265 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:35632 - TcpServer.cc:80
[DBG][14:19:33][channel.hpp:43] new Channel: 0x55aedc750ad0
[DBG][14:19:33][connection.hpp:26] d5ba2ba1-5824-a417-0000-000000000001 信道创建成功!
[DBG][14:19:33][connection.hpp:29] 客户端创建信道已响应
...
user@mq-client:~/rabbit-mq/mqclient$ ./consume_client queue1
20260204 06:19:33.333532Z 3906405 INFO TcpClient::TcpClient[Client] - connector 0x56242E8A9770 - TcpClient.cc:69
20260204 06:19:33.333756Z 3906405 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107
[DBG][14:19:33][channel.hpp:39] 客户端声明信道请求已发送
可以看到服务端响应了客户端创建信道的请求,但是客户端在接收到服务端响应后应该打印一条日志 '客户端声明信道请求,服务器已响应',但是我们并没有在客户端中看到这条日志消息,说明我们在等待服务器响应时出现了问题。
但是我们发现等待响应的接口并没有问题,那就有可能是客户端在接收服务器响应时,有没有将其加入到 hash_map 中管理起来。
客户端在接收到服务器的普通响应时会调用 basicResponse 接口去处理这个响应(因为我们在构造函数中通过分发器注册了这个接口),那么我们就需要查看一下这个处理响应的接口有没有问题。
可以看到处理服务器响应的接口也没有问题,但是内部调用了将响应对象添加到 hash_map 中管理起来的接口,所以我们再继续往下看,看也没有正确管理。
我们发现原来将服务器的响应添加到哈希表中管理起来时,我们不是通过信道 id 来查找响应的,应该是通过请求 id 来查找,应该是当时不小心敲错了代码造成的。
因此,在等待响应时由于我们添加到哈希表中的键值是 [信道 id,响应对象],但是我们在条件变量处阻塞的条件是通过 请求 id 来查找对应的响应对象,所以不可能找得到,于是条件不满足就会一直阻塞在这。
user@mq-server:~/rabbit-mq/mqserver$ ./server
20260204 07:17:54.963281Z 3961554 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:48728 - TcpServer.cc:80
[DBG][15:17:54][channel.hpp:43] new Channel: 0x55badb5aaad0
[DBG][15:17:54][connection.hpp:26] 462a9d0e-e95b-e896-0000-000000000001 信道创建成功!
[DBG][15:17:54][connection.hpp:29] 客户端创建信道已响应
[DBG][15:17:54][consumer.hpp:32] new Consumer: 0x55badb602f30
[DBG][15:17:54][channel.hpp:153] 创建消费者成功
...
user@mq-client:~/rabbit-mq/mqclient$ ./consume_client queue1
20260204 07:17:54.962859Z 3961827 INFO TcpClient::TcpClient[Client] - connector 0x55648BB74770 - TcpClient.cc:69
20260204 07:17:54.963116Z 3961827 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107
[DBG][15:17:54][channel.hpp:39] 客户端声明信道请求已发送
[DBG][15:17:54][channel.hpp:42] 客户端声明信道请求,服务器已响应
[DBG][15:17:54][channel.hpp:79] 客户端声明交换机请求,服务器已响应
[DBG][15:17:54][channel.hpp:117] 客户端声明队列请求,服务器已响应
[DBG][15:17:54][channel.hpp:117] 客户端声明队列请求,服务器已响应
[DBG][15:17:54][channel.hpp:151] 客户端队列绑定请求,服务器已响应
[DBG][15:17:54][channel.hpp:151] 客户端队列绑定请求,服务器已响应
[DBG][15:17:54][consumer.hpp:32] new Consumer: 0x55648bb6c500
consumer1 消费了消息:Hello World-0
consumer1 消费了消息:Hello World-1
...
consumer1 消费了消息:Hello World-9
直接交换模式下的测试
#include "connection.hpp"
int main() {
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
rabbitmq::Channel::ptr channel = conn->openChannel();
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::DIRECT, true, false, tmp_map);
channel->declareQueue("queue1", true, false, false, tmp_map);
channel->declareQueue("queue2", true, false, false, tmp_map);
channel->queueBind("exchange1", "queue1", "queue1");
channel->queueBind("exchange1", "queue2", "news.music.#");
for (int i = 0; i < 10; i++) {
rabbitmq::BasicProperties bp;
bp.set_id(rabbitmq::UUIDHelper::uuid());
bp.set_delivery_mode(rabbitmq::DeliveryMode::DURABLE);
bp.set_routing_key("queue1");
channel->basicPublish("exchange1", &bp, "Hello World-" + std::to_string(i));
}
conn->closeChannel(channel);
return 0;
}
可以看到直接交换模式下 routing_key 和 queue1 的 binding_key 匹配才能消费消息。
主题交换模式下的测试
#include "connection.hpp"
int main() {
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
rabbitmq::Channel::ptr channel = conn->openChannel();
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::TOPIC, true, false, tmp_map);
channel->declareQueue("queue1", true, false, false, tmp_map);
channel->declareQueue("queue2", true, false, false, tmp_map);
channel->queueBind("exchange1", "queue1", "queue1");
channel->queueBind("exchange1", "queue2", "news.music.#");
for (int i = 0; i < 10; i++) {
rabbitmq::BasicProperties bp;
bp.set_id(rabbitmq::UUIDHelper::uuid());
bp.set_delivery_mode(rabbitmq::DeliveryMode::DURABLE);
bp.set_routing_key("news.music.pop");
channel->basicPublish("exchange1", &bp, "Hello World-" + std::to_string(i));
}
rabbitmq::BasicProperties bp;
bp.set_id(rabbitmq::UUIDHelper::uuid());
bp.set_delivery_mode(rabbitmq::DeliveryMode::DURABLE);
bp.set_routing_key("news.music.sport");
channel->basicPublish("exchange1", &bp, "Hello ltx");
bp.set_routing_key("news.sport");
channel->basicPublish("exchange1", &bp, "Hello");
conn->closeChannel(channel);
return 0;
}
4. 项目总结
首先明确我们所实现的项目:仿 RabbitMQ 实现一个简化版的消息队列组件,其内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消息推送功能。其次项目中所用到的技术:基于 muduo 库实现底层网络通信服务器和客户端的搭建,在应用层基于 protobuf 协议设计应用层协议接口,在数据管理上使用了轻量数据库 sqlite 来进行数据的持久化管理,以及基于 AMQP 模型的理解,实现整个消息队列项目技术的整合,并在项目的实现过程中使用 gtest 框架进行单元测试,完成项目的最终实现。