连接管理模块和搭建客户端
1. 连接管理
在客户端这边,RabbitMQ 弱化了客户端的概念,因为用户所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这一流程。
这个模块同样是针对 muduo 库客户端连接的二次封装,向用户提供创建 channel 信道的接口,创建信道后,可以通过信道来获取指定服务。
#ifndef__M_CONNECTION_H__#define__M_CONNECTION_H__#include"muduo/proto/dispatcher.h"#include"muduo/proto/codec.h"#include"muduo/base/Logging.h"#include"muduo/base/Mutex.h"#include"muduo/net/EventLoop.h"#include"muduo/net/TcpClient.h"#include"muduo/net/EventLoopThread.h"#include"muduo/base/CountDownLatch.h"#include"channel.hpp"#include"worker.hpp"namespace rabbitmq {classConnection{public:using ptr = std::shared_ptr<Connection>;Connection(const std::string server_ip,int server_port,const AsyncWorker::ptr& worker):_latch(1),_client(worker->_loopthread.startLoop(), muduo::net::InetAddress(server_ip, server_port),"Client"),_dispatcher(std::bind(&Connection::onUnknownMessage,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){ _dispatcher.registerMessageCallback<rabbitmq::basicCommonResponse>(std::bind(&Connection::basicResponse,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback<rabbitmq::basicConsumeResponse>(std::bind(&Connection::consumeResponse,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _client.setConnectionCallback(std::bind(&Connection::onConnection,this, std::placeholders::_1)); _client.connect(); _latch.wait();//阻塞等待,直到连接建立成功} Channel::ptr openChannel(){ Channel::ptr channel = _channel_manager->create(_conn, _codec);bool ret = channel->openChannel();if(ret ==false){DLOG("打开信道失败!");returnChannel::ptr();}return channel;}voidcloseChannel(const Channel::ptr& channel){ channel->closeChannel(); _channel_manager->remove(channel->cid());}private:voidbasicResponse(const muduo::net::TcpConnectionPtr &conn,const basicCommonResponsePtr &message, muduo::Timestamp){//1. 找到信道 Channel::ptr channel = _channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道信息!");return;}//2. 将得到的响应对象,添加到信道的基础响应hash_map中 channel->putBasicResponse(message);}voidconsumeResponse(const muduo::net::TcpConnectionPtr &conn,const basicConsumeResponsePtr &message, muduo::Timestamp){//1. 找到信道 Channel::ptr channel = _channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道信息!");return;}//2. 封装异步任务(消息处理任务),抛入线程池 _worker->_pool.push([channel, message](){ channel->consume(message);});}voidonUnknownMessage(const muduo::net::TcpConnectionPtr &conn,const MessagePtr &message, muduo::Timestamp){ LOG_INFO <<"onUnknownMessage: "<< message->GetTypeName(); conn->shutdown();}// 连接建立成功时候的回调函数,连接建立成功后,唤醒上边的阻塞voidonConnection(const muduo::net::TcpConnectionPtr &conn){if(conn->connected()){ _latch.countDown();// 唤醒主线程中的阻塞 _conn = conn;}else{// 连接关闭时的操作 _conn.reset();}}private: muduo::CountDownLatch _latch;// 实现同步的 muduo::net::TcpConnectionPtr _conn;// 客户端对应的连接 muduo::net::TcpClient _client;// 客户端 ProtobufDispatcher _dispatcher;// 请求分发器对象--要向其中注册请求处理函数 ProtobufCodecPtr _codec;// protobuf协议处理器--针对收到的请求数据进行protobuf协议处理 AsyncWorker::ptr _worker; ChannelManager::ptr _channel_manager;};}#endif这段代码是RabbitMQ客户端的连接管理模块,它负责与RabbitMQ服务器建立连接,并管理信道(Channel)的创建和关闭。主要功能包括:
- 封装muduo库的TcpClient,建立与服务器的TCP连接。
- 使用ProtobufCodec进行消息的编解码,使用ProtobufDispatcher进行消息的分发。
- 管理多个信道(Channel),每个信道可以执行不同的AMQP操作(如声明交换机、队列,发布消息等)。
- 异步处理服务器推送的消息(如basicConsumeResponse),通过线程池处理消息消费。
关键点:
- 连接建立:在构造函数中,通过TcpClient连接服务器,并使用CountDownLatch等待连接建立成功。连接建立成功后,onConnection回调函数会被调用,唤醒主线程。
- 消息分发:通过ProtobufDispatcher注册了两类消息的回调函数:basicCommonResponse(普通响应)和basicConsumeResponse(消费消息响应)。当收到服务器消息时,根据消息类型分发给对应的处理函数。
- 信道管理:通过ChannelManager管理信道的创建和删除。每个信道有一个唯一的cid(信道ID),在创建信道时生成。当收到响应消息时,根据cid找到对应的信道,将响应放入信道的响应映射表中,等待信道处理。
- 异步消息处理:对于消费消息(basicConsumeResponse),将其封装成任务提交到线程池中,由工作线程调用信道的consume方法处理消息(即调用用户注册的回调函数)。
- 连接关闭:当收到未知消息时,会关闭连接。另外,当服务器断开连接时,也会触发onConnection回调,重置连接指针。
响应处理
- 基础响应(basicCommonResponse):直接找到对应的信道,并将响应放入信道的响应映射表中,由信道中等待响应的线程处理(通过条件变量唤醒)。
- 消费响应(basicConsumeResponse):找到对应的信道,然后将消息处理任务提交到线程池,由工作线程执行信道的
consume方法(最终调用用户注册的回调函数)。
2. 搭建客户端

发布消息的生产者客户端:
#include"connection.hpp"intmain(){//1.实例化异步工作线程对象 rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();//2. 实例化连接对象 rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1",8085, awp);//3,通过连接创建信道 rabbitmq::Channel::ptr channel = conn->openChannel();//4.通过信道提供的服务完成所需// 4.1.声明一个交换机exchange1,交换机类型为广播模式 google::protobuf::Map<std::string, std::string> tmp_map; channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT,true,false, tmp_map);// 4.2. 声明一个队列queue1 channel->declareQueue("queue1",true,false,false, tmp_map);// 4.3. 声明一个队列queue2 channel->declareQueue("queue2",true,false,false, tmp_map);// 4.4。绑定queue1-exchange1,且binding_key设置为queue1 channel->queueBind("exchange1","queue1","queue1");// 4.5. 绑定queue2-exchange1, 且binding_key设置为news.music.# channel->queueBind("exchange1","queue2","news.music.#");//5. 循环向交换机发布消息for(int i =0; i <10; i++){ channel->basicPublish("exchange1",nullptr,"Hello World-"+ std::to_string(i));}//6.关闭信道 conn->closeChannel(channel);return0;}订阅消息的消费者客户端:
#include"connection.hpp"voidcb(rabbitmq::Channel::ptr &channel,const std::string consumer_tag,const rabbitmq::BasicProperties *bp,const std::string &body){ std::cout << consumer_tag <<"消费了消息:"<< body << std::endl; channel->basicAck(bp->id());}intmain(int argc,char* argv[]){if(argc !=2){ std::cout <<"usage: ./consume_client queue1\n";return-1;}//1.实例化异步工作线程对象 rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();//2. 实例化连接对象 rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1",8085, awp);//3,通过连接创建信道 rabbitmq::Channel::ptr channel = conn->openChannel();//4.通过信道提供的服务完成所需// 4.1.声明一个交换机exchange1,交换机类型为广播模式 google::protobuf::Map<std::string, std::string> tmp_map; channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT,true,false, tmp_map);// 4.2. 声明一个队列queue1 channel->declareQueue("queue1",true,false,false, tmp_map);// 4.3. 声明一个队列queue2 channel->declareQueue("queue2",true,false,false, tmp_map);// 4.4。绑定queue1-exchange1,且binding_key设置为queue1 channel->queueBind("exchange1","queue1","queue1");// 4.5. 绑定queue2-exchange1, 且binding_key设置为news.music.# channel->queueBind("exchange1","queue2","news.music.#");auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); channel->basicConsume("consumer1", argv[1],false, functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3)); conn->closeChannel(channel);return0;}一、生产者客户端流程
生产者客户端的主要流程是建立连接、创建信道、声明交换机和队列、绑定队列到交换机,然后向交换机发布消息。
步骤详解:
- 实例化异步工作线程对象:
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();创建异步工作线程对象,该对象包含一个EventLoopThread(用于网络IO)和一个线程池(用于处理消息)。
- 实例化连接对象:
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1",8085, awp);创建Connection对象,传入服务器IP和端口以及异步工作线程对象。在构造函数中,会建立TCP连接,并且阻塞直到连接成功。
- 通过连接创建信道:
rabbitmq::Channel::ptr channel = conn->openChannel();通过连接对象创建一个信道。在openChannel方法中,会通过ChannelManager创建Channel对象,并向服务器发送打开信道的请求,等待响应。
- 通过信道提供的服务完成所需操作:
声明一个交换机exchange1,类型为广播模式(FANOUT):
channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT,true,false, tmp_map);声明两个队列queue1和queue2:
channel->declareQueue("queue1",true,false,false, tmp_map); channel->declareQueue("queue2",true,false,false, tmp_map);绑定队列到交换机,并设置绑定键(binding key):
channel->queueBind("exchange1","queue1","queue1"); channel->queueBind("exchange1","queue2","news.music.#");- 循环向交换机发布消息:
for(int i = 0; i < 10; i++) { channel->basicPublish("exchange1", nullptr, "Hello World-" + std::to_string(i)); } 向交换机exchange1发布10条消息。由于交换机类型是FANOUT,消息会被广播到所有绑定的队列(queue1和queue2)。
- 关闭信道:
conn->closeChannel(channel);注意:这里调用的是channel的closeChannel方法,实际上在closeChannel方法中会向服务器发送关闭信道的请求,并且从ChannelManager中移除该信道。
- 程序结束:Connection对象和AsyncWorker对象会随着智能指针的释放而自动销毁,连接断开。
二、消费者客户端流程
消费者客户端的主要流程是建立连接、创建信道、声明交换机和队列、绑定队列到交换机,然后订阅队列中的消息,并处理消息。
步骤详解:
- 实例化异步工作线程对象:同生产者。
- 实例化连接对象:同生产者。
- 通过连接创建信道:同生产者。
- 通过信道提供的服务完成所需操作:声明交换机和队列、绑定队列,与生产者相同。
- 订阅队列消息:
auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); channel->basicConsume("consumer1", argv[1],false, functor);创建一个回调函数,然后调用channel的basicConsume方法订阅队列。这里传入消费者标签为"consumer1",队列名由命令行参数指定,自动确认为false(即手动确认),以及回调函数。
- 循环等待消息:
while(1) std::this_thread::sleep_for(std::chrono::seconds(3));主线程循环等待,因为消息的处理是异步的(在工作线程池中),所以这里需要保持程序运行。
- 关闭信道:
conn->closeChannel(channel);注意:这里是通过连接对象来关闭信道,实际上内部会调用channel的closeChannel方法,并从ChannelManager中移除。
- 程序结束:同样,连接和异步工作线程对象会随着智能指针的释放而自动清理。
三、消息处理流程
当消费者客户端订阅队列后,服务器会推送消息到客户端。客户端的网络线程(EventLoopThread)接收到消息后,通过ProtobufCodec解码,然后由ProtobufDispatcher根据消息类型分发。
对于消费消息(basicConsumeResponse),会调用Connection::consumeResponse,然后在线程池中执行Channel::consume方法,最终调用用户注册的回调函数。
在回调函数中,用户处理消息,并手动确认消息(basicAck)。确认消息会发送一个请求到服务器,服务器收到后从队列中删除消息。
3. 功能联调
广播模式下的测试:
分别编译运行:server、consume_client queue1、consume_client queue2、publish_client
结果如下:
ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqserver$ ./server 2026020314:42:57.353717Z 2752283 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:36086 - TcpServer.cc:80[DBG][22:42:57][channel.hpp:43] new Channel: 0x5580b78cdfd0 [DBG][22:42:57][connection.hpp:26] b34ba8ee-60be-beb1-0000-000000000001 信道创建成功! 2026020314:43:38.563389Z 2752283 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#2] from 127.0.0.1:50796 - TcpServer.cc:80[DBG][22:43:38][channel.hpp:43] new Channel: 0x5580b78d76a0 [DBG][22:43:38][connection.hpp:26] a7fbd957-40ba-488c-0000-000000000001 信道创建成功! 2026020314:46:48.488416Z 2752283 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#3] from 127.0.0.1:50976 - TcpServer.cc:80[DBG][22:46:48][channel.hpp:43] new Channel: 0x5580b78d7260 [DBG][22:46:48][connection.hpp:26] a48a0436-beeb-b767-0000-000000000001 信道创建成功! ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./consume_client queue1 2026020314:42:57.353236Z 2757934 INFO TcpClient::TcpClient[Client] - connector 0x56352AF5F770 - TcpClient.cc:69 2026020314:42:57.353557Z 2757934 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./consume_client queue2 2026020314:43:38.562948Z 2758411 INFO TcpClient::TcpClient[Client] - connector 0x55CDC2641770 - TcpClient.cc:69 2026020314:43:38.563187Z 2758411 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./publish_client 2026020314:46:48.488012Z 2760240 INFO TcpClient::TcpClient[Client] - connector 0x55B3CC2E7770 - TcpClient.cc:69 2026020314:46:48.488247Z 2760240 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 在运行发布消息的生产者客户端(publish_client)时,我们应该可以看到消费者客户端会消费消息,并通过回调函数将消息打印出来,但是我们在消费者客户端没有看到任何消息,说明我们代码中存在一点小问题

经过调试发现,在服务端的连接管理模块实现时,我们在调用打开信道的接口时的传参应该插入信道id,结果因为写错了,写成了请求id。再次编译运行,发现还是和刚刚一样。
这里我们在对网络通信进行二次封装时(信道管理模块和连接管理模块),我们是没有进行测试的,所以我们大致方向应该就是在网络通信时出现了问题,在服务器的日志信息中可以看到服务器在创建信道时,成功将信道创建,但是客户端我们不知道有没有在收到服务器响应,并且在收到响应后有没有将其加入到hash_map中管理起来,所以我们可以在客户端添加一些日志
我们可以在客户端创建信道,声明交换机/队列,删除交换机/队列等请求时添加日志,看客户端是否正常发送请求以及接收响应

同样在服务端我们也添加一些日志,看看信道创建成功之后到底有没有给客户端响应

运行结果如下:
ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqserver$ ./server 20260204 06:19:33.333944Z 3906265 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:35632 - TcpServer.cc:80[DBG][14:19:33][channel.hpp:43] new Channel: 0x55aedc750ad0 [DBG][14:19:33][connection.hpp:26] d5ba2ba1-5824-a417-0000-000000000001 信道创建成功! [DBG][14:19:33][connection.hpp:29] 客户端创建信道已响应 20260204 06:20:01.535511Z 3906265 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#2] from 127.0.0.1:43362 - TcpServer.cc:80[DBG][14:20:01][channel.hpp:43] new Channel: 0x55aedc7b5540 [DBG][14:20:01][connection.hpp:26] ac024dbb-11dc-1bc6-0000-000000000001 信道创建成功! [DBG][14:20:01][connection.hpp:29] 客户端创建信道已响应 20260204 06:28:57.219099Z 3906265 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#3] from 204.76.203.50:39048 - TcpServer.cc:8020260204 06:28:58.693862Z 3906265 ERROR ProtobufCodec::defaultErrorCallback - InvalidLength - codec.cc:121 20260204 06:29:00.053466Z 3906265 INFO TcpServer::removeConnectionInLoop [Server] - connection Server-0.0.0.0:8085#3 - TcpServer.cc:109ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./consume_client queue1 20260204 06:19:33.333532Z 3906405 INFO TcpClient::TcpClient[Client] - connector 0x56242E8A9770 - TcpClient.cc:69 20260204 06:19:33.333756Z 3906405 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 [DBG][14:19:33][channel.hpp:39] 客户端声明信道请求已发送 ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./consume_client queue2 20260204 06:20:01.535122Z 3906997 INFO TcpClient::TcpClient[Client] - connector 0x55B5470E4770 - TcpClient.cc:69 20260204 06:20:01.535371Z 3906997 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 [DBG][14:20:01][channel.hpp:39] 客户端声明信道请求已发送 可以看到服务端响应了客户端创建信道的请求,但是客户端在接收到服务端响应后应该打印一条日志 ”客户端声明信道请求,服务器已响应“,但是我们并没有在客户端中看到这条日志消息,说明我们在等待服务器响应时出现了问题。

但是我们发现等待响应的接口并没有问题,那就有可能是客户端在接收服务器响应时,有没有将其加入到hash_map中管理起来

客户端在接收到服务器的普通响应时会调用 basicResponse 接口去处理这个响应(因为我们在构造函数中通过分发器注册了这个接口),那么我们就需要查看一下这个处理响应的接口有没有问题

可以看到处理服务器响应的接口也没有问题,但是内部调用了将响应对象添加到hash_map中管理起来的接口,所以我们再继续往下看,看也没有正确管理

我们发现原来将服务器的响应添加到哈希表中管理起来时,我们不是通过信道id来查找响应的,应该是通过请求id来查找,应该是当时不小心敲错了代码造成的。
因此,在等待响应时由于我们添加到哈希表中的键值是 [信道id,响应对象] ,但是我们在条件变量处阻塞的条件是通过 请求id 来查找对应的响应对象,所以不可能找得到,于是条件不满足就会一直阻塞在这
下面我们修改好了这个问题后再来编译运行一下:
ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqserver$ ./server 20260204 07:17:54.963281Z 3961554 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:48728 - TcpServer.cc:80[DBG][15:17:54][channel.hpp:43] new Channel: 0x55badb5aaad0 [DBG][15:17:54][connection.hpp:26] 462a9d0e-e95b-e896-0000-000000000001 信道创建成功! [DBG][15:17:54][connection.hpp:29] 客户端创建信道已响应 [DBG][15:17:54][consumer.hpp:32] new Consumer: 0x55badb602f30 [DBG][15:17:54][channel.hpp:153] 创建消费者成功 20260204 07:18:00.189887Z 3961554 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#2] from 127.0.0.1:52726 - TcpServer.cc:80[DBG][15:18:00][channel.hpp:43] new Channel: 0x55badb602ab0 [DBG][15:18:00][connection.hpp:26] 6dbda6fd-8cf1-088f-0000-000000000001 信道创建成功! [DBG][15:18:00][connection.hpp:29] 客户端创建信道已响应 [DBG][15:18:00][consumer.hpp:32] new Consumer: 0x55badb6027b0 [DBG][15:18:00][channel.hpp:153] 创建消费者成功 20260204 07:18:09.545793Z 3961554 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#3] from 127.0.0.1:58468 - TcpServer.cc:80[DBG][15:18:09][channel.hpp:43] new Channel: 0x55badb60d630 [DBG][15:18:09][connection.hpp:26] a3c4b9db-0ed5-6e7e-0000-000000000001 信道创建成功! [DBG][15:18:09][connection.hpp:29] 客户端创建信道已响应 [DBG][15:18:09][channel.hpp:51] del Channel: 0x55badb60d630 20260204 07:18:09.553509Z 3961554 INFO TcpServer::removeConnectionInLoop [Server] - connection Server-0.0.0.0:8085#3 - TcpServer.cc:109ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./consume_client queue1 20260204 07:17:54.962859Z 3961827 INFO TcpClient::TcpClient[Client] - connector 0x55648BB74770 - TcpClient.cc:69 20260204 07:17:54.963116Z 3961827 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 [DBG][15:17:54][channel.hpp:39] 客户端声明信道请求已发送 [DBG][15:17:54][channel.hpp:42] 客户端声明信道请求, 服务器已响应 [DBG][15:17:54][channel.hpp:79] 客户端声明交换机请求, 服务器已响应 [DBG][15:17:54][channel.hpp:117] 客户端声明队列请求, 服务器已响应 [DBG][15:17:54][channel.hpp:117] 客户端声明队列请求, 服务器已响应 [DBG][15:17:54][channel.hpp:151] 客户端队列绑定请求, 服务器已响应 [DBG][15:17:54][channel.hpp:151] 客户端队列绑定请求, 服务器已响应 [DBG][15:17:54][consumer.hpp:32] new Consumer: 0x55648bb6c500 consumer1消费了消息:Hello World-0 consumer1消费了消息:Hello World-1 consumer1消费了消息:Hello World-2 consumer1消费了消息:Hello World-3 consumer1消费了消息:Hello World-4 consumer1消费了消息:Hello World-5 consumer1消费了消息:Hello World-6 consumer1消费了消息:Hello World-7 consumer1消费了消息:Hello World-8 consumer1消费了消息:Hello World-9 ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./consume_client queue2 20260204 07:18:00.189418Z 3961994 INFO TcpClient::TcpClient[Client] - connector 0x55F9681CA770 - TcpClient.cc:69 20260204 07:18:00.189681Z 3961994 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 [DBG][15:18:00][channel.hpp:39] 客户端声明信道请求已发送 [DBG][15:18:00][channel.hpp:42] 客户端声明信道请求, 服务器已响应 [DBG][15:18:00][channel.hpp:79] 客户端声明交换机请求, 服务器已响应 [DBG][15:18:00][channel.hpp:117] 客户端声明队列请求, 服务器已响应 [DBG][15:18:00][channel.hpp:117] 客户端声明队列请求, 服务器已响应 [DBG][15:18:00][channel.hpp:151] 客户端队列绑定请求, 服务器已响应 [DBG][15:18:00][channel.hpp:151] 客户端队列绑定请求, 服务器已响应 [DBG][15:18:00][consumer.hpp:32] new Consumer: 0x55f9681c2500 consumer1消费了消息:Hello World-0 consumer1消费了消息:Hello World-1 consumer1消费了消息:Hello World-2 consumer1消费了消息:Hello World-3 consumer1消费了消息:Hello World-4 consumer1消费了消息:Hello World-5 consumer1消费了消息:Hello World-6 consumer1消费了消息:Hello World-7 consumer1消费了消息:Hello World-8 consumer1消费了消息:Hello World-9 ltx@My-Xshell-8-Pro-Max-Ultra:~/rabbit-mq/mqclient$ ./publish_client 20260204 07:18:09.545247Z 3962222 INFO TcpClient::TcpClient[Client] - connector 0x55DA734B6770 - TcpClient.cc:69 20260204 07:18:09.545562Z 3962222 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107 [DBG][15:18:09][channel.hpp:39] 客户端声明信道请求已发送 [DBG][15:18:09][channel.hpp:42] 客户端声明信道请求, 服务器已响应 [DBG][15:18:09][channel.hpp:79] 客户端声明交换机请求, 服务器已响应 [DBG][15:18:09][channel.hpp:117] 客户端声明队列请求, 服务器已响应 [DBG][15:18:09][channel.hpp:117] 客户端声明队列请求, 服务器已响应 [DBG][15:18:09][channel.hpp:151] 客户端队列绑定请求, 服务器已响应 [DBG][15:18:09][channel.hpp:151] 客户端队列绑定请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 [DBG][15:18:09][channel.hpp:191] 客户端发布消息请求, 服务器已响应 20260204 07:18:09.553382Z 3962222 INFO TcpClient::~TcpClient[Client] - connector 0x55DA734B6770 - TcpClient.cc:75 问题解决,可以看到运行结果符合预期
避免日志太多干扰,后续就将日志给注释掉
直接交换模式下的测试:
生产者客户端:
#include"connection.hpp"intmain(){//1.实例化异步工作线程对象 rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();//2. 实例化连接对象 rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1",8085, awp);//3,通过连接创建信道 rabbitmq::Channel::ptr channel = conn->openChannel();//4.通过信道提供的服务完成所需// 4.1.声明一个交换机exchange1,交换机类型为直接交换模式 google::protobuf::Map<std::string, std::string> tmp_map; channel->declareExchange("exchange1", rabbitmq::ExchangeType::DIRECT,true,false, tmp_map);// 4.2. 声明一个队列queue1 channel->declareQueue("queue1",true,false,false, tmp_map);// 4.3. 声明一个队列queue2 channel->declareQueue("queue2",true,false,false, tmp_map);// 4.4。绑定queue1-exchange1,且binding_key设置为queue1 channel->queueBind("exchange1","queue1","queue1");// 4.5. 绑定queue2-exchange1, 且binding_key设置为news.music.# channel->queueBind("exchange1","queue2","news.music.#");//5. 循环向交换机发布消息for(int i =0; i <10; i++){ rabbitmq::BasicProperties bp; bp.set_id(rabbitmq::UUIDHelper::uuid()); bp.set_delivery_mode(rabbitmq::DeliveryMode::DURABLE); bp.set_routing_key("queue1"); channel->basicPublish("exchange1",&bp,"Hello World-"+ std::to_string(i));}//6.关闭信道 conn->closeChannel(channel);return0;}消费者客户端只需要修改交换机类型为直接交换模式
运行结果:

可以看到直接交换模式下routing_key和queue1的binding_key匹配才能消费消息
主题交换模式下的测试:
生产者客户端:
#include"connection.hpp"intmain(){//1.实例化异步工作线程对象 rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();//2. 实例化连接对象 rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1",8085, awp);//3,通过连接创建信道 rabbitmq::Channel::ptr channel = conn->openChannel();//4.通过信道提供的服务完成所需// 4.1.声明一个交换机exchange1,交换机类型为主题交换模式 google::protobuf::Map<std::string, std::string> tmp_map; channel->declareExchange("exchange1", rabbitmq::ExchangeType::TOPIC,true,false, tmp_map);// 4.2. 声明一个队列queue1 channel->declareQueue("queue1",true,false,false, tmp_map);// 4.3. 声明一个队列queue2 channel->declareQueue("queue2",true,false,false, tmp_map);// 4.4。绑定queue1-exchange1,且binding_key设置为queue1 channel->queueBind("exchange1","queue1","queue1");// 4.5. 绑定queue2-exchange1, 且binding_key设置为news.music.# channel->queueBind("exchange1","queue2","news.music.#");//5. 循环向交换机发布消息for(int i =0; i <10; i++){ rabbitmq::BasicProperties bp; bp.set_id(rabbitmq::UUIDHelper::uuid()); bp.set_delivery_mode(rabbitmq::DeliveryMode::DURABLE);//bp.set_routing_key("queue1"); bp.set_routing_key("news.music.pop"); channel->basicPublish("exchange1",&bp,"Hello World-"+ std::to_string(i));} rabbitmq::BasicProperties bp; bp.set_id(rabbitmq::UUIDHelper::uuid()); bp.set_delivery_mode(rabbitmq::DeliveryMode::DURABLE); bp.set_routing_key("news.music.sport"); channel->basicPublish("exchange1",&bp,"Hello ltx"); bp.set_routing_key("news.sport"); channel->basicPublish("exchange1",&bp,"Hello");//6.关闭信道 conn->closeChannel(channel);return0;}消费者客户端只需要改变交换机类型为主题交换模式

可以看到只有符合匹配规则的队列消息才会被消费
4. 项目总结
首先明确我们所实现的项目:仿 RabbitMQ 实现一个简化版的消息队列组件,其内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消息推送功能。其次项目中所用到的技术:基于 muduo 库实现底层网络通信服务器和客户端的搭建,在应用层基于 protobuf 协议设计应用层协议接口,在数据管理上使用了轻量数据库 sqlite 来进行数据的持久化管理,以及基于 AMQP 模型的理解,实现整个消息队列项目技术的整合,并在项目的实现过程中使用 gtest 框架进行单元测试,完成项目的最终实现。