跳到主要内容仿 RabbitMQ 实现消息队列项目:交换机路由、队列隔离与消息可靠投递 | 极客日志C++算法
仿 RabbitMQ 实现消息队列项目:交换机路由、队列隔离与消息可靠投递
介绍基于 C++ 实现的仿 RabbitMQ 消息队列项目。涵盖交换机、队列、绑定信息及消息四大核心模块的设计与实现。支持直连、扇出、主题三种路由模式,具备消息持久化及消费者确认机制。通过 GTest 单元测试验证各模块功能,包括增删查及文件恢复逻辑,旨在构建低耦合、高可靠的异步通信系统。
雾岛听风23 浏览 一。项目背景
我们按照 AMQP 形式设计,下面先简单认识下:
AMQP 模型:消息队列的'快递规则'
1. AMQP 是什么?
**AMQP(高级消息队列协议)**是消息队列的'交通规则',定义了生产者、交换机、队列、消费者如何协作,确保消息可靠传递,解决异步通信、系统解耦等问题。
2. 核心组件(类比快递站)
| 组件 | 作用 | 快递站 |
|---|
| 生产者 | 发送消息(如订单系统) | 寄快递的用户 |
| 消费者 | 接收消息(如库存系统) | 取快递的用户 |
| 交换机 | 按规则(路由键)分发消息到队列 | 快递分拣员(看标签分区域) |
| 队列 | 存储消息的缓冲区 | 快递货架(存不同收件人包裹) |
3. 消息流转流程
- 生产者发消息(包裹 + 收件标签)→ 交换机根据标签(路由键)分拣→ 放入对应队列(货架)→ 消费者从队列取包裹处理。*比如:标签'支付成功'→ 订单队列;标签'物流更新'→ 物流队列。
仿 RabbitMQ 项目:简易消息队列实现
1. 核心功能(对标 RabbitMQ)
- 多类型交换机:直连(Direct)、扇出(Fanout)、主题(Topic),支持灵活路由规则。
- 消息持久化:消息存磁盘,重启不丢失(像快递站存包裹到仓库)。
- 消费者确认:消费者处理完消息后反馈'已签收',避免丢件。
- 队列绑定:队列通过路由键绑定到交换机,决定接收哪些消息。
2. 关键设计亮点
- 轻量级协议:基于 TCP 自定义简单协议(类似 AMQP 但更简化),支持生产者/消费者通信。
- 高可用模拟:通过本地文件或简易数据库存储消息,模拟分布式持久化(实际项目可扩展为 Redis/MySQL)。
- 灵活路由:主题交换机支持通配符(如'订单.*'匹配所有订单相关消息),像快递站按'省份 + 类型'分拣。
3. 适用场景
- 异步解耦:订单系统生成订单后,通过消息队列通知库存、物流系统,避免直接调用阻塞。
- 流量削峰:秒杀活动时,请求先入队列,后端服务按能力消费,防止系统崩溃。
- 日志收集:多个服务将日志发到队列,由日志服务统一处理存储。
二。全局设计思想
首先需要消费客户端先进行队列申明以及订阅(此时有个自己的信道),然后服务端为它开一个对应的信道,然后消费客户就可以通过信道进行消息订阅(先进入订阅队列,等发布客户端发消息),发布客户端同理,进行(通过对应的信道底层依次调用服务端对应模块:交换机模块 队列模块等等,进行提供对应请求 protobuf,等待服务端对应的接收请求的 channel 进行解析依次调用底层封装的对应模块提供接口进行操作)最后发布给消费客户端进行处理。
注:先声明 + 订阅,然后再进行对应对列的消息的发布工作(中间的服务端相当于一个中转站的身份)
- 这里可能还是有点蒙,下面我们展开一步步展示设计,当整个项目疏通后就秒懂了。
三。基于交换机模块设计及实现
模块介绍
当客户端发布一条消息到交换机后,这条消息,应该被入队到该交换机绑定的哪些队列中?交换路由模块就是决定这件事情的。
首先对于交换机有三个模式(对于绑定的队列递送而言):
- 广播:将消息入队到该交换机的所有绑定队列中。(也就是全发送)
- 直接:将消息入队到绑定信息中 binding_key 与消息 routing_key 一致的队列中。(也就是直接匹配)
- 主题:将消息入队到绑定信息中 binding_key 与 routing_key 是匹配成功的队列中。(也就是符合规则就发送)
因此需要定义不同的枚举来表示(这里我们定义在 proto 里):
enum ExchangeType {
UNKNOWTYPE = 0;
DIRECT = 1;
FANOUT = 2;
TOPIC = 3;
};
详细设计代码
基于交换机模块测试
下面基于 gtest 的全局模式的测试对 exchange 模块的增删查进行测试:
#include "../mqserver/exchange.hpp"
#include <gtest/gtest.h>
ExchangeManager::ptr emp;
class ExchangeTest : public testing::Environment {
public:
virtual void SetUp() override {
emp = std::make_shared<ExchangeManager>("./data/meta.db");
}
virtual void TearDown() override {
emp->Clear();
std::cout << "最后的清理!!\n";
}
};
TEST(exchange_test, insert_test) {
google::protobuf::Map<std::string, std::string> map;
map.insert({"k1","v1"});
emp->DeclareExchange("exchange1", msg::ExchangeType::DIRECT,true,false, map);
emp->DeclareExchange("exchange2", msg::ExchangeType::DIRECT,true,false, map);
emp->DeclareExchange("exchange3", msg::ExchangeType::DIRECT,true,false, map);
emp->DeclareExchange("exchange4", msg::ExchangeType::DIRECT,true,false, map);
ASSERT_EQ(emp->Size(),4);
}
TEST(exchange_test, select_test) {
ASSERT_EQ(emp->ExchangeIsExists("exchange1"),true);
ASSERT_EQ(emp->ExchangeIsExists("exchange2"),true);
ASSERT_EQ(emp->ExchangeIsExists("exchange3"),true);
ASSERT_EQ(emp->ExchangeIsExists("exchange4"),true);
Exchange::ptr exp = emp->SelectExchange("exchange2");
ASSERT_NE(exp.get(),nullptr);
ASSERT_EQ(exp->name,"exchange2");
ASSERT_EQ(exp->durable,true);
ASSERT_EQ(exp->auto_delete,false);
ASSERT_EQ(exp->type, msg::ExchangeType::DIRECT);
ASSERT_EQ(exp->GetArgs(), std::string("k1=v1&"));
}
TEST(exchange_test, remove_test) {
emp->DeleteExchange("exchange2");
Exchange::ptr exp = emp->SelectExchange("exchange2");
ASSERT_EQ(exp.get(),nullptr);
ASSERT_EQ(emp->ExchangeIsExists("exchange2"),false);
}
int main(int argc,char*argv[]) {
testing::InitGoogleTest(&argc, argv);
testing::AddGlobalTestEnvironment(new ExchangeTest());
int ret = RUN_ALL_TESTS();
if(!ret) std::cout << "成功的一次全局测试\n";
return 0;
}
对应 SQLite3 数据库查询(gtest 测试最后不进行整体 Clear 操作):
四。基于队列模块设计及实现
模块介绍
- 这里与上面的交换机模块类似,即通过队列管理者文件内的队列集合以及内存中的队列集合(提供的接口也差不多)。
详细设计代码
基于队列模块测试
下面基于 gtest 的全局模式的测试对 queue 模块的增删查进行测试:
#include "../mqserver/msgqueue.hpp"
#include <gtest/gtest.h>
MsgqueueManager::ptr emp;
class MsgqueueTest : public testing::Environment {
public:
virtual void SetUp() override {
emp = std::make_shared<MsgqueueManager>("./data/meta.db");
}
virtual void TearDown() override {
std::cout << "最后的清理!!\n";
}
};
TEST(msgqueue_test, insert_test) {
google::protobuf::Map<std::string, std::string> map;
map.insert({"k1","v1"});
emp->DeclareMsgqueue("mq1",1,true,false, map);
emp->DeclareMsgqueue("mq2",1,true,false, map);
emp->DeclareMsgqueue("mq3",1,true,false, map);
emp->DeclareMsgqueue("mq4",1,true,false, map);
ASSERT_EQ(emp->Size(),4);
}
TEST(msgqueue_test, select_test) {
ASSERT_EQ(emp->MsgqueueIsExists("mq1"),true);
ASSERT_EQ(emp->MsgqueueIsExists("mq2"),true);
ASSERT_EQ(emp->MsgqueueIsExists("mq3"),true);
ASSERT_EQ(emp->MsgqueueIsExists("mq4"),true);
Msgqueue::ptr exp = emp->SelectMsgqueue("mq2");
ASSERT_NE(exp.get(),nullptr);
ASSERT_EQ(exp->name,"mq2");
ASSERT_EQ(exp->exclusive,1);
ASSERT_EQ(exp->auto_delete,false);
ASSERT_EQ(exp->GetArgs(), std::string("k1=v1&"));
}
TEST(msgqueue_test, remove_test) {
emp->DeleteMsgqueue("mq2");
Msgqueue::ptr exp = emp->SelectMsgqueue("mq2");
ASSERT_EQ(exp.get(),nullptr);
ASSERT_EQ(emp->MsgqueueIsExists("mq2"),false);
}
int main(int argc,char*argv[]) {
testing::InitGoogleTest(&argc, argv);
testing::AddGlobalTestEnvironment(new MsgqueueTest());
int ret = RUN_ALL_TESTS();
if(!ret) std::cout << "成功的一次全局测试\n";
return 0;
}
对应 SQLite3 数据库查询(gtest 测试最后不进行整体 Clear 操作):
五。基于绑定信息模块设计及实现
模块介绍:
这里我们设计的是绑定信息(一个交换机对应有一个和很多队列的一个绑定信息集合)。
- 交换机名称。
- 队列名称。
Binding_key(进行对应交换机路由匹配找队列有关)。
struct Binding {
using ptr = std::shared_ptr<Binding>;
std::string exchange_name;
std::string msgqueue_name;
std::string binding_key;
Binding(const std::string &ename,const std::string &qname,const std::string &key)
:exchange_name(ename),msgqueue_name(qname),binding_key(key){}
};
下面还是基于上面的那俩模块进行的基于文件与内存级别实现(这里只有对应的交换机和队列都是持久化的,绑定信息才能持久化并存储文件中)。
其次就是这里我们实现的是一个交换机对应的是一个 map(绑定的队列与对应 binding 的映射);但是为什么这么实现?
这里绑定采取这种映射而不是直接拿交换机映射 binding(这样每次删除指定队列先关绑定需要遍历 1 次交换机数组 + 每次对应的 binding)而采取交换机–Queue+binding 映射方式,只需要遍历一遍交换机直接拿到 second 进行删除 second[Queue]即可。
using MsgqueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
using ExchangeMBMap = std::unordered_map<std::string, MsgqueueBindingMap>;
**基于内存 + 文件管理:**内存管理模块就是进行绑定信息的增删,查取并恢复进内存,根据队列名/交换机名,进行相关绑定信息删除。管理者就是对外提供调用接口:绑定,解绑,根据交换机名/队列名移除绑定信息,获取交换机所有相关绑定信息 map,获取特定绑定信息等。
详细设计代码
基于绑定信息模块测试
下面基于 gtest 的全局模式的测试对 binding 模块的增删查进行测试:
测试代码:
#include "../mqserver/binding.hpp"
#include <gtest/gtest.h>
BindingManager::ptr bmp;
class bindingTest : public testing::Environment {
public:
virtual void SetUp() override {
bmp = std::make_shared<BindingManager>("./data/meta.db");
}
virtual void TearDown() override {
}
};
TEST(queue_test, insert_test) {
bmp->Bind("exchange1","queue1","news.music.#",true);
bmp->Bind("exchange1","queue2","news.sport.#",true);
bmp->Bind("exchange1","queue3","news.gossip.#",true);
bmp->Bind("exchange2","queue1","news.music.pop",true);
bmp->Bind("exchange2","queue2","news.sport.football",true);
bmp->Bind("exchange2","queue3","news.gossip.#",true);
ASSERT_EQ(bmp->Size(),6);
}
TEST(queue_test, select_test) {
ASSERT_EQ(bmp->Exists("exchange1","queue1"),true);
ASSERT_EQ(bmp->Exists("exchange1","queue2"),true);
ASSERT_EQ(bmp->Exists("exchange1","queue3"),true);
ASSERT_EQ(bmp->Exists("exchange2","queue1"),true);
ASSERT_EQ(bmp->Exists("exchange2","queue2"),true);
ASSERT_EQ(bmp->Exists("exchange2","queue3"),true);
ASSERT_EQ(bmp->Exists("exchange3","queue3"),false);
Binding::ptr bp = bmp->GetBinding("exchange1","queue1");
ASSERT_NE(bp.get(),nullptr);
ASSERT_EQ(bp->exchange_name, std::string("exchange1"));
ASSERT_EQ(bp->msgqueue_name, std::string("queue1"));
ASSERT_EQ(bp->binding_key, std::string("news.music.#"));
}
TEST(queue_test, remove_exchange_test) {
bmp->RemoveExchangeBindings("exchange1");
ASSERT_EQ(bmp->Exists("exchange1","queue1"),false);
ASSERT_EQ(bmp->Exists("exchange1","queue2"),false);
ASSERT_EQ(bmp->Exists("exchange1","queue3"),false);
}
int main(int argc,char*argv[]) {
testing::AddGlobalTestEnvironment(new bindingTest);
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
if(!ret) std::cout << "成功的一次全局测试\n";
std::shared_ptr<bindingTest>();
return 0;
}
六。基于消息模块设计及实现
模块介绍
- 消息包含:有效载荷(属性 内容 持久化标志),长度(在文件中的长度),偏移量(在文件中)。
enum DeliveryMode {
UNKNOWMODE = 0;
UNDURABLE = 1;
DURABLE = 2;
};
//这里 BasicProperties 在外面而 Payload 进行嵌套:
//BasicProperties 需要外界单独创建穿进去进行消息插入故能外界直接创建,但是 payload 只能通过先创建 Message 对象在进行创建更合乎情理
message BasicProperties {
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message {
message Payload {
BasicProperties properties = 1;
string body = 2;
string valid = 3;
};
Payload payload = 1;
uint32 offset = 2; //对应有效载荷的文件偏移量(方便移除写 0 覆盖)
uint32 length = 3; //有效载荷长度 (方便移除写 0 覆盖)
};
本模块相对于前面三个确实有点复杂(代码也是比较多的),下面梳理下一些要点:
- 每个队列有个文件用于存储消息(四字节长度 + 有效载荷(属性 内容 是否有效))。
- 删除操作:这里只是往文件对应位置标记
"0"为无效(把传进来的 message 对应位置标记无效再写回去),而有恢复操作:就是把文件有效的数据(写入文件后对应位置是"1"的消息)转移到临时文件再重命名成对应消息文件即可),这种方式避免了文件频繁被操作带来不便(这里恢复操作是符合特定要求(持久化的消息总量大于 2000,且其中有效比例低于 50% 则需要持久化))。
- 其他细节见代码。
- 这里提供插入 删除 恢复 回收 取出消息 等操作(持久化的话,文件也跟着操作)。
- 对于删除:每次删除完(也就是标记完),都要去检查对应的文件有无效消息占比,进行是否回收等(
这里恢复不仅要文件,还要与内存同步(比如文件变了对应的 msg 的偏移量等就变了,需要及时同步更新内存的。))
- 其他细节见代码。
- 提供初始化与销毁队列,插入队列消息 取出队列消息 对队列消息进行应答等操作。
- 这里接口其实就是对之前封装的进行调用即可,具体见代码。
详细设计代码
基于消息模块测试
下面基于 gtest 的全局模式的测试对 message 模块的增删查进行测试:
测试代码:
#include "../mqserver/message.hpp"
#include <gtest/gtest.h>
MessageManager::ptr mp;
class MsgqueueTest : public testing::Environment {
public:
virtual void SetUp() override {
mp = std::make_shared<MessageManager>("./message");
}
virtual void TearDown() override {
std::cout << "最后的清理!!\n";
}
};
TEST(message_test2, recovery_test) {
mp->InitQueueMessage("queue1");
ASSERT_EQ(mp->GetAll_count("queue1"),4);
Message::ptr msg1 = mp->Front("queue1");
ASSERT_NE(msg1.get(),nullptr);
ASSERT_EQ(msg1->payload().body(), std::string("Hello World-1"));
ASSERT_EQ(mp->GetAll_count("queue1"),3);
ASSERT_EQ(mp->Waitack_count("queue1"),1);
Message::ptr msg2 = mp->Front("queue1");
ASSERT_NE(msg2.get(),nullptr);
ASSERT_EQ(msg2->payload().body(), std::string("Hello World-2"));
ASSERT_EQ(mp->GetAll_count("queue1"),2);
ASSERT_EQ(mp->Waitack_count("queue1"),2);
Message::ptr msg3 = mp->Front("queue1");
ASSERT_NE(msg3.get(),nullptr);
ASSERT_EQ(msg3->payload().body(), std::string("Hello World-3"));
ASSERT_EQ(mp->GetAll_count("queue1"),1);
ASSERT_EQ(mp->Waitack_count("queue1"),3);
Message::ptr msg4 = mp->Front("queue1");
ASSERT_NE(msg4.get(),nullptr);
ASSERT_EQ(msg4->payload().body(), std::string("Hello World-4"));
ASSERT_EQ(mp->GetAll_count("queue1"),0);
ASSERT_EQ(mp->Waitack_count("queue1"),4);
}
int main(int argc,char*argv[]) {
testing::InitGoogleTest(&argc, argv);
testing::AddGlobalTestEnvironment(new MsgqueueTest());
int ret = RUN_ALL_TESTS();
if(!ret) std::cout << "成功的一次全局测试\n";
return 0;
}
基于取出 + 顺序的测试(这里没有确认以及更新故文件消息不变):
下面测试下从文件中恢复这四条消息,然后判断下对应取出的顺序等是否合理:
- 因为取出后没确认应答,故文件中还是标记的 1,符合预期。
七。本篇小结
本文介绍了这个项目的 server 大模块的几个分支,基于书写到测试并未发现问题,后续将更新对应的 路由匹配 consumer channel connection broker(服务器)等模块。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online