仿 RabbitMQ 实现消息队列项目:交换机路由、队列隔离与消息可靠投递
介绍基于 C++ 实现的仿 RabbitMQ 消息队列项目。涵盖交换机、队列、绑定信息及消息四大核心模块的设计与实现。支持直连、扇出、主题三种路由模式,具备消息持久化及消费者确认机制。通过 GTest 单元测试验证各模块功能,包括增删查及文件恢复逻辑,旨在构建低耦合、高可靠的异步通信系统。

介绍基于 C++ 实现的仿 RabbitMQ 消息队列项目。涵盖交换机、队列、绑定信息及消息四大核心模块的设计与实现。支持直连、扇出、主题三种路由模式,具备消息持久化及消费者确认机制。通过 GTest 单元测试验证各模块功能,包括增删查及文件恢复逻辑,旨在构建低耦合、高可靠的异步通信系统。

我们按照 AMQP 形式设计,下面先简单认识下:
**AMQP(高级消息队列协议)**是消息队列的'交通规则',定义了生产者、交换机、队列、消费者如何协作,确保消息可靠传递,解决异步通信、系统解耦等问题。
| 组件 | 作用 | 快递站 |
|---|---|---|
| 生产者 | 发送消息(如订单系统) | 寄快递的用户 |
| 消费者 | 接收消息(如库存系统) | 取快递的用户 |
| 交换机 | 按规则(路由键)分发消息到队列 | 快递分拣员(看标签分区域) |
| 队列 | 存储消息的缓冲区 | 快递货架(存不同收件人包裹) |
首先需要消费客户端先进行队列申明以及订阅(此时有个自己的信道),然后服务端为它开一个对应的信道,然后消费客户就可以通过信道进行消息订阅(先进入订阅队列,等发布客户端发消息),发布客户端同理,进行(通过对应的信道底层依次调用服务端对应模块:交换机模块 队列模块等等,进行提供对应请求 protobuf,等待服务端对应的接收请求的 channel 进行解析依次调用底层封装的对应模块提供接口进行操作)最后发布给消费客户端进行处理。
整体过程图展示:

当客户端发布一条消息到交换机后,这条消息,应该被入队到该交换机绑定的哪些队列中?交换路由模块就是决定这件事情的。
首先对于交换机有三个模式(对于绑定的队列递送而言):
因此需要定义不同的枚举来表示(这里我们定义在 proto 里):
enum ExchangeType {
UNKNOWTYPE = 0;
DIRECT = 1;
FANOUT = 2;
TOPIC = 3;
};
模块设计图:

下面基于 gtest 的全局模式的测试对 exchange 模块的增删查进行测试:
测试代码:
#include "../mqserver/exchange.hpp"
#include <gtest/gtest.h>
ExchangeManager::ptr emp;
class ExchangeTest : public testing::Environment {
public:
virtual void SetUp() override {
emp = std::make_shared<ExchangeManager>("./data/meta.db");
}
virtual void TearDown() override {
emp->Clear();
std::cout << "最后的清理!!\n";
}
};
//因为涉及文件的永久删除故测试应联合 TEST(queue_test, insert_test) 这个模块一起进行!
TEST(exchange_test, insert_test) {
google::protobuf::Map<std::string, std::string> map;
map.insert({"k1","v1"});
emp->DeclareExchange("exchange1", msg::ExchangeType::DIRECT,true,false, map);
emp->DeclareExchange("exchange2", msg::ExchangeType::DIRECT,true,false, map);
emp->DeclareExchange("exchange3", msg::ExchangeType::DIRECT,true,false, map);
emp->DeclareExchange("exchange4", msg::ExchangeType::DIRECT,true,false, map);
ASSERT_EQ(emp->Size(),);
}
(exchange_test, select_test) {
(emp->(),);
(emp->(),);
(emp->(),);
(emp->(),);
Exchange::ptr exp = emp->();
(exp.(),);
(exp->name,);
(exp->durable,);
(exp->auto_delete,);
(exp->type, msg::ExchangeType::DIRECT);
(exp->(), std::());
}
(exchange_test, remove_test) {
emp->();
Exchange::ptr exp = emp->();
(exp.(),);
(emp->(),);
}
{
testing::(&argc, argv);
testing::( ());
ret = ();
(!ret) std::cout << ;
;
}
测试效果:

对应 SQLite3 数据库查询(gtest 测试最后不进行整体 Clear 操作):

全局视角看待大致过程:

下面基于 gtest 的全局模式的测试对 queue 模块的增删查进行测试:
测试代码:
#include "../mqserver/msgqueue.hpp"
#include <gtest/gtest.h>
MsgqueueManager::ptr emp;
class MsgqueueTest : public testing::Environment {
public:
virtual void SetUp() override {
emp = std::make_shared<MsgqueueManager>("./data/meta.db");
}
virtual void TearDown() override {
// emp->Clear();
std::cout << "最后的清理!!\n";
}
};
//因为涉及文件的永久删除故测试应联合 TEST(queue_test, insert_test) 这个模块一起进行!
TEST(msgqueue_test, insert_test) {
google::protobuf::Map<std::string, std::string> map;
map.insert({"k1","v1"});
emp->DeclareMsgqueue("mq1",1,true,false, map);
emp->DeclareMsgqueue("mq2",1,true,false, map);
emp->DeclareMsgqueue("mq3",1,true,false, map);
emp->DeclareMsgqueue("mq4",1,true,false, map);
ASSERT_EQ(emp->(),);
}
(msgqueue_test, select_test) {
(emp->(),);
(emp->(),);
(emp->(),);
(emp->(),);
Msgqueue::ptr exp = emp->();
(exp.(),);
(exp->name,);
(exp->exclusive,);
(exp->auto_delete,);
(exp->(), std::());
}
(msgqueue_test, remove_test) {
emp->();
Msgqueue::ptr exp = emp->();
(exp.(),);
(emp->(),);
}
{
testing::(&argc, argv);
testing::( ());
ret = ();
(!ret) std::cout << ;
;
}
测试效果:

对应 SQLite3 数据库查询(gtest 测试最后不进行整体 Clear 操作):

这里我们设计的是绑定信息(一个交换机对应有一个和很多队列的一个绑定信息集合)。
绑定信息具体成员:
Binding_key(进行对应交换机路由匹配找队列有关)。如下:
struct Binding {
using ptr = std::shared_ptr<Binding>;
// 一组绑定关系
std::string exchange_name;
std::string msgqueue_name;
std::string binding_key;
Binding(const std::string &ename,const std::string &qname,const std::string &key)
:exchange_name(ename),msgqueue_name(qname),binding_key(key){}
};
下面还是基于上面的那俩模块进行的基于文件与内存级别实现(这里只有对应的交换机和队列都是持久化的,绑定信息才能持久化并存储文件中)。
其次就是这里我们实现的是一个交换机对应的是一个 map(绑定的队列与对应 binding 的映射);但是为什么这么实现?
这里绑定采取这种映射而不是直接拿交换机映射 binding(这样每次删除指定队列先关绑定需要遍历 1 次交换机数组 + 每次对应的 binding)而采取交换机–Queue+binding 映射方式,只需要遍历一遍交换机直接拿到 second 进行删除 second[Queue]即可。
如下:
using MsgqueueBindingMap = std::unordered_map<std::string, Binding::ptr>; // MsgQueue---binding
using ExchangeMBMap = std::unordered_map<std::string, MsgqueueBindingMap>; // Exchange---mb
下面剩下的模式几乎和上面他俩相同:
**基于内存 + 文件管理:**内存管理模块就是进行绑定信息的增删,查取并恢复进内存,根据队列名/交换机名,进行相关绑定信息删除。管理者就是对外提供调用接口:绑定,解绑,根据交换机名/队列名移除绑定信息,获取交换机所有相关绑定信息 map,获取特定绑定信息等。
全局视角看待大致过程:

下面基于 gtest 的全局模式的测试对 binding 模块的增删查进行测试:
测试代码:
#include "../mqserver/binding.hpp"
#include <gtest/gtest.h>
BindingManager::ptr bmp;
class bindingTest : public testing::Environment {
public:
virtual void SetUp() override {
bmp = std::make_shared<BindingManager>("./data/meta.db");
}
virtual void TearDown() override {
// bmp->Clear();
}
};
//因为涉及文件的永久删除故测试应联合 TEST(queue_test, insert_test) 这个模块一起进行!
TEST(queue_test, insert_test) {
bmp->Bind("exchange1","queue1","news.music.#",true);
bmp->Bind("exchange1","queue2","news.sport.#",true);
bmp->Bind("exchange1","queue3","news.gossip.#",true);
bmp->Bind("exchange2","queue1","news.music.pop",true);
bmp->Bind("exchange2","queue2","news.sport.football",true);
bmp->Bind(,,,);
(bmp->(),);
}
(queue_test, select_test) {
(bmp->(,),);
(bmp->(,),);
(bmp->(,),);
(bmp->(,),);
(bmp->(,),);
(bmp->(,),);
(bmp->(,),);
Binding::ptr bp = bmp->(,);
(bp.(),);
(bp->exchange_name, std::());
(bp->msgqueue_name, std::());
(bp->binding_key, std::());
}
(queue_test, remove_exchange_test) {
bmp->();
(bmp->(,),);
(bmp->(,),);
(bmp->(,),);
}
{
testing::( bindingTest);
testing::(&argc, argv);
ret = ();
(!ret) std::cout << ;
std::<bindingTest>();
;
}
基于插入,查找,操作测试:


数据库内:

文件恢复到内存的操作测试:


解除绑定测试:


根据交换机与队列分别进行移除测试:



具体如下(proto 文件):
enum DeliveryMode {
UNKNOWMODE = 0;
UNDURABLE = 1;
DURABLE = 2;
};
//这里 BasicProperties 在外面而 Payload 进行嵌套:
//BasicProperties 需要外界单独创建穿进去进行消息插入故能外界直接创建,但是 payload 只能通过先创建 Message 对象在进行创建更合乎情理
message BasicProperties {
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message {
message Payload {
BasicProperties properties = 1;
string body = 2;
string valid = 3;
};
Payload payload = 1;
uint32 offset = 2; //对应有效载荷的文件偏移量(方便移除写 0 覆盖)
uint32 length = 3; //有效载荷长度 (方便移除写 0 覆盖)
};
本模块相对于前面三个确实有点复杂(代码也是比较多的),下面梳理下一些要点:
以队列形式管理该队列的内存消息 + 文件消息:
消息是否持久化与它要插入的队列是否持久化有关。
1·文件操作:
"0"为无效(把传进来的 message 对应位置标记无效再写回去),而有恢复操作:就是把文件有效的数据(写入文件后对应位置是"1"的消息)转移到临时文件再重命名成对应消息文件即可),这种方式避免了文件频繁被操作带来不便(这里恢复操作是符合特定要求(持久化的消息总量大于 2000,且其中有效比例低于 50% 则需要持久化))。2·queuemessage 部分:
这里恢复不仅要文件,还要与内存同步(比如文件变了对应的 msg 的偏移量等就变了,需要及时同步更新内存的。))3·消息总的管理者:
下图展示了关系图:

下面基于 gtest 的全局模式的测试对 message 模块的增删查进行测试:
测试代码:
#include "../mqserver/message.hpp"
#include <gtest/gtest.h>
MessageManager::ptr mp;
class MsgqueueTest : public testing::Environment {
public:
virtual void SetUp() override {
mp = std::make_shared<MessageManager>("./message");
}
virtual void TearDown() override {
// mp->Clear();
std::cout << "最后的清理!!\n";
}
};
//先要 insert 让他写入 queue1 文件,再次跑动加载:
TEST(message_test2, recovery_test) {
mp->InitQueueMessage("queue1");
ASSERT_EQ(mp->GetAll_count("queue1"),4);
Message::ptr msg1 = mp->Front("queue1");
ASSERT_NE(msg1.get(),nullptr);
ASSERT_EQ(msg1->payload().body(), std::string("Hello World-1"));
ASSERT_EQ(mp->GetAll_count("queue1"),3);
ASSERT_EQ(mp->Waitack_count("queue1"),);
Message::ptr msg2 = mp->();
(msg(),);
(msg2->().(), std::());
(mp->(),);
(mp->(),);
Message::ptr msg3 = mp->();
(msg(),);
(msg3->().(), std::());
(mp->(),);
(mp->(),);
Message::ptr msg4 = mp->();
(msg(),);
(msg4->().(), std::());
(mp->(),);
(mp->(),);
}
{
testing::(&argc, argv);
testing::( ());
ret = ();
(!ret) std::cout << ;
;
}
测试效果:
基于插入的测试:



基于取出 + 顺序的测试(这里没有确认以及更新故文件消息不变):



基于确认应答后文件对应被标记操作测试:



下面测试下从文件中恢复这四条消息,然后判断下对应取出的顺序等是否合理:



本文介绍了这个项目的 server 大模块的几个分支,基于书写到测试并未发现问题,后续将更新对应的 路由匹配 consumer channel connection broker(服务器)等模块。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online