【即时通讯项目】环境搭建8——RabbitMQ,AMQP-CPP

【即时通讯项目】环境搭建8——RabbitMQ,AMQP-CPP

目录

一.RabbitMQ和AMQP-CPP

1.1.功能介绍

1.2.RabbitMQ和AMQP-CPP的安装

1.3.示例

1.3.1.示例1

1.3.2.示例2

1.3.3.示例3

二.二次封装

2.1.封装过程

2.2.测试


一.RabbitMQ和AMQP-CPP

1.1.功能介绍

RabbitMQ

RabbitMQ 是一个消息中间件,你可以把它理解成一个专门负责接收、存储和转发消息的程序。它让不同的软件系统或者同一个系统的不同模块之间可以相互通信,但不需要直接连接对方。

它的工作方式很简单:

  • 有一个发送消息的程序,我们叫它“生产者”。生产者把消息发给 RabbitMQ。
  • RabbitMQ 收到消息后,会把消息保存在一个叫“队列”的地方。
  • 另一个接收消息的程序,我们叫它“消费者”。消费者从 RabbitMQ 的队列里取走消息进行处理。

这样做的好处是:

  1. 解耦:生产者和消费者不需要知道对方的存在,也不需要同时在线。生产者只管发消息,消费者只管处理消息,它们之间通过 RabbitMQ 间接联系。
  2. 异步:生产者发完消息就可以继续做其他事,不用等消费者处理完。消费者可以在自己方便的时候去取消息。
  3. 削峰填谷:如果短时间内有大量消息涌入,RabbitMQ 可以先存起来,然后让消费者慢慢处理,避免系统被冲垮。
  4. 可靠:RabbitMQ 可以确保消息不丢失,即使消费者暂时宕机,消息也会留在队列里,等消费者恢复后再发送。

所以,RabbitMQ 常被用在需要可靠通信的场景,比如电商订单处理、日志收集、任务调度等。它就像是系统之间的“信使”,帮忙传递信息,让整个系统更灵活、更稳定。

AMQP-CPP

AMQP-CPP 是一个用于与 RabbitMQ 消息中间件通信的 C++ 库。RabbitMQ 是一个广泛使用的开源消息代理,它实现了 AMQP(高级消息队列协议)。

简单来说,RabbitMQ 负责在不同应用之间传递消息,而 AMQP-CPP 则帮助 C++ 程序与 RabbitMQ 进行交互。

AMQP-CPP 的核心设计理念

  • AMQP-CPP 的核心职责是处理 AMQP 协议本身,即解析从 RabbitMQ 接收到的数据和构造要发送给 RabbitMQ 的数据包。但它并不负责建立和维护网络连接——这意味着实际的 TCP 连接管理需要由你(或你选择的网络库)来完成。
  • 这种设计将协议处理与网络 I/O 分离,带来了极大的灵活性:你可以将 AMQP-CPP 集成到任何已有的异步网络框架中(如 libevent、libuv、Boost.Asio 等),也可以直接使用库自带的简易 TCP 模块快速上手。

异步与高性能

  • AMQP-CPP 完全采用异步设计,内部没有阻塞式的系统调用,也不依赖多线程。它通过事件驱动的方式工作:当网络数据到达时,你将其喂给 AMQP-CPP,库解析后通过回调通知你的业务逻辑;当需要发送数据时,库生成相应的 AMQP 帧,你负责通过 TCP 连接发送出去。这种模型非常适合构建高性能、低延迟的消息应用,且能很好地与单线程事件循环配合。

对 C++ 版本的要求

  • AMQP-CPP 需要编译器支持 C++17 标准。这是因为库内部使用了现代 C++ 的特性(如 std::variant、std::optional 等)来实现类型安全和高效的接口。

1.2.RabbitMQ和AMQP-CPP的安装

RabbitMQ安装

安装RabbitMQ

sudo apt install -y rabbitmq-server
# 启动 RabbitMQ 服务 sudo systemctl start rabbitmq-server.service # 查看 RabbitMQ 服务状态,确认是否正常运行 sudo systemctl status rabbitmq-server.service # 设置为开机自启动 sudo systemctl enable rabbitmq-server

安装完成后默认有一个 guest 用户,但权限不足,无法用于远程登录和消息收发

因此需要创建一个具有管理员权限的用户

# 添加一个新用户,用户名为 root,密码为 123456 sudo rabbitmqctl add_user root 123456 # 将 root 用户标记为 administrator,使其拥有管理权限 sudo rabbitmqctl set_user_tags root administrator # 为 root 用户设置权限,允许其在默认虚拟主机 "/" 上执行所有操作 # 参数依次为:虚拟主机、配置权限、写权限、读权限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ 自带 Web 管理界面,需要启用管理插件才能访问 sudo rabbitmq-plugins enable rabbitmq_management

我们去浏览器访问webUI界面, 默认端口为15672.

你的主机IP:15672

输入我们设置的root的账号密码

登陆进去了

至此rabbitmq安装成功。

AMQP-CPP的安装

官网:https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master

我们这里使用AMQP-CPP库来编写客户端程序。

# 安装 libev 开发库,这是 AMQP-CPP 依赖的网络库组件 sudo apt install libev-dev # 从 GitHub 克隆 AMQP-CPP 库的源代码 git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git # 进入克隆下来的源代码目录 cd AMCPP/ # 编译源代码,生成库文件 make # 将编译好的库文件安装到系统目录中 sudo make install
注意:如果你在make的时候遇到了下面这种错误



这种错误,表示ssl版本出现问题。



解决方案:卸载当前的ssl库,重新进行修复安装

然后我们就重新执行make即可

1.3.示例

1.3.1.示例1

test.cpp

// 包含 libev 事件循环库的头文件,用于事件驱动 #include <ev.h> // 包含 AMQP-CPP 核心库,提供 AMQP 客户端功能 #include <amqpcpp.h> // 包含 AMQP-CPP 与 libev 的集成头文件,使 AMQP 事件能够集成到 libev 循环中 #include <amqpcpp/libev.h> // 包含标准输入输出流,用于打印信息 #include <iostream> int main() { // 获取默认的 libev 事件循环实例(EV_DEFAULT 是一个宏,返回默认事件循环指针) auto *loop = EV_DEFAULT; // 创建 AMQP 事件处理器,将 AMQP 的事件回调集成到 libev 循环中 // LibEvHandler 接受一个 libev 事件循环指针,并负责将 AMQP 底层 socket 事件注册到该循环 AMQP::LibEvHandler handler(loop); // 创建 AMQP TCP 连接,指定 RabbitMQ 服务器地址 // 地址格式:amqp://用户名:密码@主机:端口/虚拟主机 // 这里使用 root:123456 登录本机的默认虚拟主机 "/" AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://root:[email protected]:5672/")); // 在连接上创建通道,几乎所有 AMQP 操作(声明队列、发布、消费等)都通过通道进行 AMQP::TcpChannel channel(&connection); // 设置通道就绪时的回调函数 // 当 TCP 连接成功建立且通道可操作时,该回调被触发 channel.onReady([&channel, &connection]() { // 现在通道已就绪,可以执行 AMQP 命令 // 声明一个名为 "test_queue" 的队列 // 如果队列不存在,RabbitMQ 会自动创建它;如果存在,则直接使用现有队列 // declareQueue() 返回一个代表异步操作的对象,我们可以通过 onSuccess() 设置成功回调 channel.declareQueue("test_queue") .onSuccess([&channel, &connection]() { // 队列声明成功后的回调 // 向默认交换机(空字符串表示默认交换机)发送一条消息 // 路由键为 "test_queue",消息将进入同名队列 // publish() 参数:交换机、路由键、消息体(可附加属性,这里省略) channel.publish("", "test_queue", "Hello from AMQP-CPP with lambda!"); // 输出提示信息 std::cout << "[x] Sent a message to 'test_queue'" << std::endl; // 现在开始消费队列 "test_queue",当消息到达时自动调用回调 // consume() 返回一个代表消费操作的对象,通过 onReceived() 设置消息到达时的回调 channel.consume("test_queue") .onReceived([&channel, &connection](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { // 消息到达时的回调,参数包含消息内容、投递标签和重传标志 // 输出接收到的消息内容(message.body() 返回消息体的字符串) std::cout << "[x] Received '" << message.body() << "'" << std::endl; // 确认消息已被处理,告知 RabbitMQ 可以从队列中删除该消息 // 参数 deliveryTag 是服务器分配的唯一标识 channel.ack(deliveryTag); // 关闭连接,程序将退出(事件循环会因连接关闭而结束) connection.close(); }); }) .onError([&channel, &connection](const char *message) { // 如果队列声明过程中发生错误,该回调被调用 std::cerr << "Queue declaration error: " << message << std::endl; // 发生错误时也关闭连接,避免程序一直挂起 connection.close(); }); }); // 启动 libev 事件循环,程序将阻塞在这里,直到连接关闭或出现错误 // 第二个参数 0 表示正常运行,直到 ev_break 被调用 ev_run(loop, 0); return 0; }

makefile

publish_and_consume : test.cpp g++ -std=c++17 $^ -o $@ -lamqpcpp -lev .PHONY : clean clean : rm -rf publish_and_consume

可以看到,还是比较正常的,虽然这个末尾出现了一些乱码,但是不要紧的

1.3.2.示例2

send.cpp

#include <ev.h> // libev 事件循环库 #include <amqpcpp.h> // AMQP-CPP 核心库 #include <amqpcpp/libev.h> // AMQP-CPP 与 libev 的集成 #include <iostream> // 标准输入输出 int main() { // 获取默认的 libev 事件循环实例 auto *loop = EV_DEFAULT; // 创建 AMQP 事件处理器,将 AMQP 事件集成到 libev 循环中 AMQP::LibEvHandler handler(loop); // 建立 TCP 连接(本地 RabbitMQ,默认端口 5672) AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://root:[email protected]:5672/")); // 在连接上创建通道,几乎所有操作都在通道上进行 AMQP::TcpChannel channel(&connection); // 设置通道就绪时的回调函数:当 TCP 连接成功且通道可使用时,执行 Lambda channel.onReady([&channel, &connection]() { // 声明一个名为 "hello" 的队列(如果队列不存在则自动创建) // declareQueue 返回一个异步操作对象,通过 onSuccess 设置成功回调 channel.declareQueue("hello") .onSuccess([&channel, &connection]() { // 队列声明成功后,向默认交换机发送一条消息 channel.publish("", "hello", "Hello World!"); std::cout << "[x] Sent 'Hello World!'" << std::endl; // 关闭连接,程序将退出(连接关闭会触发事件循环结束) connection.close(); }); }); // 启动 libev 事件循环,该循环会阻塞,直到所有事件处理完毕 ev_run(loop, 0); return 0; }

receive.cpp

#include <ev.h> // libev 事件循环库 #include <amqpcpp.h> // AMQP-CPP 核心库 #include <amqpcpp/libev.h> // AMQP-CPP 与 libev 的集成 #include <iostream> // 标准输入输出 int main() { // 获取默认的 libev 事件循环实例 auto *loop = EV_DEFAULT; // 创建 AMQP 事件处理器,将 AMQP 事件集成到 libev 循环中 AMQP::LibEvHandler handler(loop); // 建立 TCP 连接(本地 RabbitMQ,默认端口 5672) AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://root:[email protected]:5672/")); // 在连接上创建通道,几乎所有操作都在通道上进行 AMQP::TcpChannel channel(&connection); // 设置通道就绪时的回调函数:当 TCP 连接成功且通道可使用时,执行 Lambda channel.onReady([&channel, &connection]() { // 声明一个名为 "hello" 的队列(如果队列不存在则自动创建) // declareQueue 返回一个异步操作对象,通过 onSuccess 设置成功回调 channel.declareQueue("hello") .onSuccess([&channel, &connection]() { // 开始消费队列 "hello",当有消息到达时,调用 Lambda 处理 channel.consume("hello") .onReceived([&channel, &connection](const AMQP::Message& message, uint64_t deliveryTag, bool redelivered) { // 输出接收到的消息内容 std::cout << "[x] Received '" << message.body() << "'" << std::endl; // 确认消息已被处理,告知 RabbitMQ 可以删除该消息 channel.ack(deliveryTag); // 关闭连接,程序将退出(连接关闭会触发事件循环结束) connection.close(); }); }); }); // 启动 libev 事件循环,该循环会阻塞,直到所有事件处理完毕 ev_run(loop, 0); return 0; }

makefile

all : send receive send : send.cpp g++ -std=c++17 $^ -o $@ -lamqpcpp -lev receive : receive.cpp g++ -std=c++17 $^ -o $@ -lamqpcpp -lev .PHONY : clean clean : rm -rf send receive

这个是因为我们使用了std::cout来打印我们的数据,但是我们的数据是二进制的,所以可能这些字节在终端显示时就会变成  之类的乱码。

我们注意一下即可。也没有什么大问题,只要不去打印即可

1.3.3.示例3

publish.cc

// 包含 libev 事件循环库的头文件,用于事件驱动 #include <ev.h> // 包含 AMQP-CPP 核心库,提供 AMQP 客户端功能 #include <amqpcpp.h> // 包含 AMQP-CPP 与 libev 的集成头文件,使 AMQP 事件能够集成到 libev 循环中 #include <amqpcpp/libev.h> // 包含 OpenSSL 的 SSL 功能头文件(本例中未直接使用,可能为后续扩展预留) #include <openssl/ssl.h> // 包含 OpenSSL 的版本信息头文件 #include <openssl/opensslv.h> int main() { // 1. 获取默认的 libev 事件循环实例(EV_DEFAULT 宏返回默认事件循环指针) auto *loop = EV_DEFAULT; // 2. 创建 AMQP 事件处理器,将 AMQP 的事件回调集成到 libev 循环中 AMQP::LibEvHandler handler(loop); // 2.5. 创建 AMQP 服务器地址对象,指定 RabbitMQ 连接信息(用户名:密码@主机:端口/虚拟主机) AMQP::Address address("amqp://root:[email protected]:5672/"); // 创建 TCP 连接对象,传入事件处理器和地址,负责底层网络通信 AMQP::TcpConnection connection(&handler, address); // 3. 在连接上创建通道,几乎所有 AMQP 操作都通过通道进行 AMQP::TcpChannel channel(&connection); // 4. 声明一个直连类型的交换机(Direct Exchange),名称为 "test-exchange" // declareExchange 返回一个 Deferred 对象,用于设置成功/失败回调 channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { // 如果声明交换机失败,输出错误信息并退出程序 std::cout << "声明交换机失败:" << message << std::endl; exit(0); }) .onSuccess([](){ // 声明交换机成功时的提示 std::cout << "test-exchange 交换机创建成功!" << std::endl; }); // 5. 声明一个队列,名称为 "test-queue" channel.declareQueue("test-queue") .onError([](const char *message) { // 如果声明队列失败,输出错误信息并退出 std::cout << "声明队列失败:" << message << std::endl; exit(0); }) .onSuccess([](){ // 声明队列成功时的提示 std::cout << "test-queue 队列创建成功!" << std::endl; }); // 6. 将交换机和队列通过路由键 "test-queue-key" 进行绑定 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { // 如果绑定失败,输出错误信息并退出 std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl; exit(0); }) .onSuccess([](){ // 绑定成功时的提示 std::cout << "test-exchange - test-queue 绑定成功!" << std::endl; }); // 7. 向交换机发布消息,循环发送10条消息 for (int i = 0; i < 10; i++) { // 构造消息内容,包含循环计数 std::string msg = "Hello Bite-" + std::to_string(i); // 发布消息到指定交换机,使用路由键 "test-queue-key",消息将路由到绑定的队列 bool ret = channel.publish("test-exchange", "test-queue-key", msg); if (ret == false) { // 如果 publish 返回 false,表示消息未能立即写入发送缓冲区(可能连接未就绪) std::cout << "publish 失败!\n"; } } // 8. 启动 libev 事件循环,程序将阻塞在这里,直到连接关闭或出现错误 // 注意:由于没有显式关闭连接,程序将一直运行,等待事件发生 ev_run(loop, 0); return 0; }

consume.cc

// 包含 libev 事件循环库的头文件,用于事件驱动 #include <ev.h> // 包含 AMQP-CPP 核心库,提供 AMQP 客户端功能 #include <amqpcpp.h> // 包含 AMQP-CPP 与 libev 的集成头文件,使 AMQP 事件能够集成到 libev 循环中 #include <amqpcpp/libev.h> // 包含 OpenSSL 的 SSL 功能头文件(本例中未使用,但可能为后续扩展预留) #include <openssl/ssl.h> // 包含 OpenSSL 的版本信息头文件 #include <openssl/opensslv.h> // 消息回调处理函数的实现,当从队列中收到消息时被调用 void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { // 将消息体(二进制数据)构造为 std::string 以便打印 std::string msg; msg.assign(message.body(), message.bodySize()); // 输出消息内容到标准输出 std::cout << msg << std::endl; // 确认消息已被处理,告知 RabbitMQ 可以删除该消息 channel->ack(deliveryTag); } int main() { // 1. 获取默认的 libev 事件循环实例(EV_DEFAULT 宏返回默认事件循环指针) auto *loop = EV_DEFAULT; // 2. 创建 AMQP 事件处理器,将 AMQP 的事件回调集成到 libev 循环中 AMQP::LibEvHandler handler(loop); // 2.5. 创建 AMQP 服务器地址对象,指定 RabbitMQ 连接信息(用户名:密码@主机:端口/虚拟主机) AMQP::Address address("amqp://root:[email protected]:5672/"); // 创建 TCP 连接对象,传入事件处理器和地址,负责底层网络通信 AMQP::TcpConnection connection(&handler, address); // 3. 在连接上创建通道,几乎所有 AMQP 操作都通过通道进行 AMQP::TcpChannel channel(&connection); // 4. 声明一个直连类型的交换机(Direct Exchange),名称为 "test-exchange" // declareExchange 返回一个 Deferred 对象,用于设置成功/失败回调 channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { // 如果声明交换机失败,输出错误信息并退出程序 std::cout << "声明交换机失败:" << message << std::endl; exit(0); }) .onSuccess([](){ // 声明交换机成功时的提示 std::cout << "test-exchange 交换机创建成功!" << std::endl; }); // 5. 声明一个队列,名称为 "test-queue" channel.declareQueue("test-queue") .onError([](const char *message) { // 如果声明队列失败,输出错误信息并退出 std::cout << "声明队列失败:" << message << std::endl; exit(0); }) .onSuccess([](){ // 声明队列成功时的提示 std::cout << "test-queue 队列创建成功!" << std::endl; }); // 6. 将交换机和队列通过路由键 "test-queue-key" 进行绑定 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { // 如果绑定失败,输出错误信息并退出 std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl; exit(0); }) .onSuccess([](){ // 绑定成功时的提示 std::cout << "test-exchange - test-queue 绑定成功!" << std::endl; }); // 7. 订阅队列消息 -- 设置消息处理回调函数 // 使用 std::bind 将 MessageCb 函数与 channel 指针绑定,并预留占位符供实际参数使用 auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); // 开始消费队列 "test-queue",消费者标签为 "consume-tag",返回 DeferredConsumer 对象 channel.consume("test-queue", "consume-tag") .onReceived(callback) // 设置消息到达时的回调 .onError([](const char *message){ // 如果订阅过程中发生错误,输出错误信息并退出 std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl; exit(0); }); // 8. 启动 libev 事件循环,程序将阻塞在这里,直到连接关闭或出现错误 ev_run(loop, 0); return 0; }

makefile

all : publish consume publish : publish.cc g++ -std=c++17 $^ -o $@ -lamqpcpp -lev consume : consume.cc g++ -std=c++17 $^ -o $@ -lamqpcpp -lev .PHONY : clean clean : rm publish consume

非常的完美!!

二.二次封装

2.1.封装过程

那么基于上面的例子,我们也懂得了如何使用AMQP-CPP来和我们的RabbitMQ进行沟通,那么后续为了更方便快捷的实现这一过程,我们在这里对AMQP-CPP的接口来进行二次封装。

class MQClient { public: // 定义消息回调函数类型,接收消息指针和大小 using MessageCallback = std::function<void(const char*, size_t)>; // 智能指针类型别名,便于管理 MQClient 对象的生命周期 using ptr = std::shared_ptr<MQClient>; /** * @brief 构造函数,初始化与 RabbitMQ 的连接并启动事件循环线程。 * @param user RabbitMQ 用户名 * @param passwd RabbitMQ 密码 * @param host RabbitMQ 服务器地址(格式:ip:port,例如 "127.0.0.1:5672") */ MQClient(const std::string &user, const std::string passwd, const std::string host) { // 获取默认的 libev 事件循环 _loop = EV_DEFAULT; // 创建 AMQP 的 libev 事件处理器,将其与事件循环关联 _handler = std::make_unique<AMQP::LibEvHandler>(_loop); // 构造 AMQP 连接 URL,格式:amqp://user:password@host/ std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/"; AMQP::Address address(url); // 解析地址 // 创建 TCP 连接对象,使用 handler 处理事件,连接到指定地址 _connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address); // 创建通道,所有 AMQP 操作都通过通道进行 _channel = std::make_unique<AMQP::TcpChannel>(_connection.get()); // 启动一个线程专门运行 libev 事件循环,使网络事件能够异步处理 _loop_thread = std::thread([this]() { ev_run(_loop, 0); // 开始事件循环,0 表示一直运行直到被停止 }); } …… private: struct ev_async _async_watcher; // libev 异步 watcher,用于退出事件循环 struct ev_loop *_loop; // libev 事件循环指针 std::unique_ptr<AMQP::LibEvHandler> _handler; // AMQP 事件处理器,连接 libev 和 AMQP-CPP std::unique_ptr<AMQP::TcpConnection> _connection; // AMQP TCP 连接 std::unique_ptr<AMQP::TcpChannel> _channel; // AMQP 通道,用于执行命令 std::thread _loop_thread; // 运行事件循环的线程 };

首先,针对构造函数和成员变量的设计,大家对于这个应该是没有什么问题的,可以去看看每个例子,基本上都有这些

那么我们接着看析构函数

 ~MQClient() { // 初始化一个异步 watcher,用于通知事件循环退出 ev_async_init(&_async_watcher, watcher_callback); // 在事件循环中启动该 watcher ev_async_start(_loop, &_async_watcher); // 发送异步信号,触发 watcher_callback,从而停止事件循环 ev_async_send(_loop, &_async_watcher); // 等待事件循环线程结束 _loop_thread.join(); // 清空事件循环指针(虽然 unique_ptr 会自动释放,但置空是个好习惯) _loop = nullptr; } static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) { ev_break(loop, EVBREAK_ALL); // 停止所有事件循环 }

这段代码主要涉及 libev 事件循环 的异步通知机制,用于在多线程环境中安全地停止事件循环。下面逐行解释其作用:

1. ev_async_init(&_async_watcher, watcher_callback);

  • 作用:初始化一个 异步监视器(async watcher)。
  • _async_watcher 是一个 ev_async 类型的变量,它相当于一个“信号旗”,可以在任意线程中触发。
  • watcher_callback 是回调函数,当异步信号被触发时,libev 会在事件循环所在的线程中调用这个函数。
  • 初始化后,监视器与回调函数绑定,但尚未激活。

2. ev_async_start(_loop, &_async_watcher);

  • 作用:将异步监视器注册到事件循环 _loop 中
  • 注册后,事件循环会开始监听这个监视器,一旦有异步信号发送过来,它就会调用对应的回调函数。
  • 这一步是必需的,否则即使发送信号,事件循环也不会处理。

3. ev_async_send(_loop, &_async_watcher);

  • 作用:向异步监视器发送一个信号,触发其回调,这里的回调函数就是停止所有事件循环。
  • 这个函数是线程安全的,可以在任何线程中调用(包括事件循环线程自身)。
  • 发送信号后,libev 会在事件循环的下一次迭代中执行 watcher_callback。
  • 在回调函数中,通常会调用 ev_break(loop, EVBREAK_ALL) 来停止事件循环。

4. _loop_thread.join();

  • 作用:等待事件循环所在的线程(_loop_thread)结束。
  • 因为事件循环通常运行在一个独立的线程中(例如 std::thread 启动的线程),主线程需要等待它完成清理工作后再退出,否则可能导致资源泄漏或未定义行为。
  • join() 会阻塞,直到目标线程执行完毕。

这样子就形成了一个闭环,大家都明白了吧

创建交换机,队列,并将它们进行绑定
/** * @brief 声明交换机、队列并进行绑定。 * @param exchange 交换机名称 * @param queue 队列名称 * @param routing_key 绑定时的路由键,默认 "routing_key" * @param echange_type 交换机类型,默认 direct */ void declareComponents(const std::string &exchange, const std::string &queue, const std::string &routing_key = "routing_key", AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) //交换机类型默认是direct { // 声明交换机 _channel->declareExchange(exchange, echange_type) .onError([](const char *message) { LOG_ERROR("声明交换机失败:{}", message); exit(0); // 发生错误时直接退出程序(生产环境可改为异常或重试) }) .onSuccess([exchange]() { LOG_DEBUG("{} 交换机创建成功!", exchange); }); // 声明队列 _channel->declareQueue(queue) .onError([](const char *message) { LOG_ERROR("声明队列失败:{}", message); exit(0); }) .onSuccess([queue]() { LOG_DEBUG("{} 队列创建成功!", queue); }); // 将队列绑定到交换机,使用指定的路由键 _channel->bindQueue(exchange, queue, routing_key) .onError([exchange, queue](const char *message) { LOG_ERROR("{} - {} 绑定失败:", exchange, queue); exit(0); }) .onSuccess([exchange, queue, routing_key]() { LOG_DEBUG("{} - {} - {} 绑定成功!", exchange, queue, routing_key); }); }

这里的思路很简单,就是单纯的创建和绑定

发布消息
bool publish(const std::string &exchange, const std::string &msg, const std::string &routing_key = "routing_key") { LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key); // 调用 channel 的 publish 方法,将消息发布到指定交换机和路由键 bool ret = _channel->publish(exchange, routing_key, msg); if (ret == false) { LOG_ERROR("{} 发布消息失败:", exchange); return false; } return true; }

我们就是单纯的发布消息,这里需要指定交换机名称,然后指定消息的routing_key,消息进入交换机后,会拿这个消息的routing_key和这个交换机绑定的所有binding_key进行路由匹配,至于怎么进行路由匹配,完全取决于交换机的类型是啥?

消费消息
 void consume(const std::string &queue, const MessageCallback &cb) { LOG_DEBUG("开始订阅 {} 队列消息!", queue); // 调用 channel 的 consume 方法,指定消费者标签 "consume-tag" _channel->consume(queue, "consume-tag") // 返回 DeferredConsumer 对象 .onReceived([this, cb](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { // 当收到消息时,调用用户提供的回调函数 cb(message.body(), message.bodySize()); // 确认消息已被处理,以便 RabbitMQ 可以将其从队列中移除 _channel->ack(deliveryTag); }) .onError([queue](const char *message) { LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message); exit(0); // 出错时直接退出程序 }); }

这里需要指定队列的名称,并且传递一个消息处理回调函数进来,当消费成功的时候,我们就调用我们传递进去的消息处理回调函数

完整代码
#pragma once // 防止头文件被多次包含 // 包含必要的库头文件 #include <ev.h> // libev 事件循环库 #include <amqpcpp.h> // AMQP-CPP 核心库,用于 RabbitMQ 通信 #include <amqpcpp/libev.h> // AMQP-CPP 与 libev 集成的处理程序 #include <openssl/ssl.h> // OpenSSL 库,用于 SSL/TLS 支持(尽管此处未显式使用) #include <openssl/opensslv.h> // OpenSSL 版本信息 #include <iostream> // 标准输入输出流(可能用于调试,但未直接使用) #include <functional> // std::function,用于回调函数包装 #include <thread> // std::thread,用于运行事件循环的线程 #include <memory> // std::unique_ptr, std::shared_ptr #include "logger.hpp" // 自定义日志记录器 namespace IMS { /** * @brief MQClient 类封装了与 RabbitMQ 服务器的交互功能。 * 使用 AMQP-CPP 库和 libev 事件循环,提供声明交换机/队列、发布消息和消费消息的能力。 */ class MQClient { public: // 定义消息回调函数类型,接收消息指针和大小 using MessageCallback = std::function<void(const char*, size_t)>; // 智能指针类型别名,便于管理 MQClient 对象的生命周期 using ptr = std::shared_ptr<MQClient>; /** * @brief 构造函数,初始化与 RabbitMQ 的连接并启动事件循环线程。 * @param user RabbitMQ 用户名 * @param passwd RabbitMQ 密码 * @param host RabbitMQ 服务器地址(格式:ip:port,例如 "127.0.0.1:5672") */ MQClient(const std::string &user, const std::string passwd, const std::string host) { // 获取默认的 libev 事件循环 _loop = EV_DEFAULT; // 创建 AMQP 的 libev 事件处理器,将其与事件循环关联 _handler = std::make_unique<AMQP::LibEvHandler>(_loop); // 构造 AMQP 连接 URL,格式:amqp://user:password@host/ std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/"; AMQP::Address address(url); // 解析地址 // 创建 TCP 连接对象,使用 handler 处理事件,连接到指定地址 _connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address); // 创建通道,所有 AMQP 操作都通过通道进行 _channel = std::make_unique<AMQP::TcpChannel>(_connection.get()); // 启动一个线程专门运行 libev 事件循环,使网络事件能够异步处理 _loop_thread = std::thread([this]() { ev_run(_loop, 0); // 开始事件循环,0 表示一直运行直到被停止 }); } /** * @brief 析构函数,停止事件循环并等待线程结束,释放资源。 */ ~MQClient() { // 初始化一个异步 watcher,用于通知事件循环退出 ev_async_init(&_async_watcher, watcher_callback); // 在事件循环中启动该 watcher ev_async_start(_loop, &_async_watcher); // 发送异步信号,触发 watcher_callback,从而停止事件循环 ev_async_send(_loop, &_async_watcher); // 等待事件循环线程结束 _loop_thread.join(); // 清空事件循环指针(虽然 unique_ptr 会自动释放,但置空是个好习惯) _loop = nullptr; } /** * @brief 声明交换机、队列并进行绑定。 * @param exchange 交换机名称 * @param queue 队列名称 * @param routing_key 绑定时的路由键,默认 "routing_key" * @param echange_type 交换机类型,默认 direct */ void declareComponents(const std::string &exchange, const std::string &queue, const std::string &routing_key = "routing_key", AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) //交换机类型默认是direct { // 声明交换机 _channel->declareExchange(exchange, echange_type) .onError([](const char *message) { LOG_ERROR("声明交换机失败:{}", message); exit(0); // 发生错误时直接退出程序(生产环境可改为异常或重试) }) .onSuccess([exchange]() { LOG_DEBUG("{} 交换机创建成功!", exchange); }); // 声明队列 _channel->declareQueue(queue) .onError([](const char *message) { LOG_ERROR("声明队列失败:{}", message); exit(0); }) .onSuccess([queue]() { LOG_DEBUG("{} 队列创建成功!", queue); }); // 将队列绑定到交换机,使用指定的路由键 _channel->bindQueue(exchange, queue, routing_key) .onError([exchange, queue](const char *message) { LOG_ERROR("{} - {} 绑定失败:", exchange, queue); exit(0); }) .onSuccess([exchange, queue, routing_key]() { LOG_DEBUG("{} - {} - {} 绑定成功!", exchange, queue, routing_key); }); } /** * @brief 向指定交换机发布消息。 * @param exchange 目标交换机名称 * @param msg 消息内容 * @param routing_key 路由键,默认为 "routing_key" * @return true 表示发布成功(实际是消息已投递给连接,但 AMQP-CPP 的 publish 返回 bool 表示是否成功放入缓冲区) * false 表示发布失败 */ bool publish(const std::string &exchange, const std::string &msg, const std::string &routing_key = "routing_key") { LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key); // 调用 channel 的 publish 方法,将消息发布到指定交换机和路由键 bool ret = _channel->publish(exchange, routing_key, msg); if (ret == false) { LOG_ERROR("{} 发布消息失败:", exchange); return false; } return true; } /** * @brief 开始消费队列中的消息。 * @param queue 要消费的队列名称 * @param cb 收到消息时的回调函数,参数为消息指针和大小,具体怎么进行消费,还是需要看这个用户传递进来的函数干了啥 */ void consume(const std::string &queue, const MessageCallback &cb) { LOG_DEBUG("开始订阅 {} 队列消息!", queue); // 调用 channel 的 consume 方法,指定消费者标签 "consume-tag" _channel->consume(queue, "consume-tag") // 返回 DeferredConsumer 对象 .onReceived([this, cb](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { // 当收到消息时,调用用户提供的回调函数 cb(message.body(), message.bodySize()); // 确认消息已被处理,以便 RabbitMQ 可以将其从队列中移除 _channel->ack(deliveryTag); }) .onError([queue](const char *message) { LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message); exit(0); // 出错时直接退出程序 }); } private: /** * @brief 静态回调函数,用于异步 watcher,当收到异步通知时停止事件循环。 * @param loop 事件循环指针 * @param watcher 异步 watcher 对象 * @param revents 事件类型(未使用) */ static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) { ev_break(loop, EVBREAK_ALL); // 停止所有事件循环 } private: struct ev_async _async_watcher; // libev 异步 watcher,用于退出事件循环 struct ev_loop *_loop; // libev 事件循环指针 std::unique_ptr<AMQP::LibEvHandler> _handler; // AMQP 事件处理器,连接 libev 和 AMQP-CPP std::unique_ptr<AMQP::TcpConnection> _connection; // AMQP TCP 连接 std::unique_ptr<AMQP::TcpChannel> _channel; // AMQP 通道,用于执行命令 std::thread _loop_thread; // 运行事件循环的线程 }; } 

2.2.测试

我们先来讲讲我们的消息队列是怎么进行路由匹配的

其实最关键的就是下面这2个东西

Routing Key(路由键)

  • 属于每条消息,由生产者发送消息时指定。
  • 一条消息只有一个 Routing Key。

Binding Key(绑定键)

  • 属于每个绑定,而每个绑定连接一个交换机和一个队列。
  • 一个队列可以有多个绑定,因此可以有多个不同的 Binding Key。
  • 一个绑定就是一条规则:"如果消息的 Routing Key 匹配这个 Binding Key,就把消息送到这个队列"。
它们是怎么进行工作的?

现在,我们通过一个消息从生产到消费的完整流程,来理解这些组件如何协同工作,并具体解释 Exchange 和 Queue 的“多对多”关系。

步骤 1:生产者发送

生产者应用创建一个消息,为其指定一个路由键(Routing Key),然后将其发送到 Broker 上一个已知的 Exchange

步骤 2:交换机路由

Exchange 收到消息后,会提取消息中的 Routing Key。然后,它查看所有绑定(Binding) 到自身的规则列表。

步骤 3:绑定匹配与投递

对于每一条绑定规则,Exchange 会根据自身的类型将消息的 Routing Key与绑定的 Binding Key进行匹配。

  • 如果匹配成功,Exchange 就会将消息的一个副本投递到该绑定规则所指向的队列。
  • 如果匹配失败,则跳过该队列。
  • Exchange 会遍历所有绑定到它的规则,可能将消息投递到零个、一个或多个队列。

步骤 4:队列存储与消费

  • 消息被投递到队列后,便存储在队列中。消费者应用从自己订阅的队列(中获取消息进行处理。
那么它们怎么进行匹配呢?

交换机的类型决定了Binding Key和Routing Key的匹配算法。

1. 直连交换机(Direct)

  • 匹配规则:精确、完全相等的字符串匹配。
  • Binding Key:必须是一个明确的字符串,如 "email"、"order.paid"。
  • Routing Key:必须与Binding Key完全一致,消息才会被路由。

示例:

  • 队列Q1绑定了 Binding Key: "error"。
  • 生产者发送消息A (Routing Key: "info") -> 不匹配,消息A不会进入Q1。
  • 生产者发送消息B (Routing Key: "error") -> 精确匹配,消息B进入Q1。

用途:点对点精确路由,常用于任务分发(如将不同的任务类型路由到不同的处理队列)。

2. 主题交换机(Topic)

匹配规则:通配符模式匹配。这是最灵活、最常用的路由方式。

Binding Key:是一个用点号.分隔的单词组成的模式,支持两个通配符:

  • * (星号):匹配恰好一个单词。
  • # (井号):匹配零个或多个单词。

Routing Key:也是一个用点号.分隔的单词组成的字符串(不能包含通配符)。

示例:

  • 队列Q1绑定了 Binding Key: "*.stock.usd" -> 关心所有以.stock.usd结尾,且中间有一个任意单词的消息。
  • 队列Q2绑定了 Binding Key: "nyse.#" -> 关心所有以nyse.开头的消息。
  • 生产者发送消息 (Routing Key: "nyse.stock.usd") -> 同时匹配Q1和Q2,消息会进入两个队列(广播)。
  • 生产者发送消息 (Routing Key: "forex.eur.usd") -> 只匹配Q1,消息进入Q1。
  • 生产者发送消息 (Routing Key: "nyse") -> 只匹配Q2(#可以匹配零个单词),消息进入Q2。

用途:基于多重标准(如来源、严重级别、业务类型)的灵活消息路由,如日志系统、事件通知系统。

3. 扇出交换机(Fanout)

  • 匹配规则:忽略Routing Key和Binding Key。
  • 行为:它会将收到的所有消息无条件地广播到所有与之绑定的队列。
  • Binding Key:在创建绑定时通常设置为空字符串""(但设置什么值都无所谓,因为不会被使用)。
  • 用途:纯粹的广播/发布-订阅场景。

那么在我们这里我们只会去测试这个direct交换机

示例1

direct.cpp

#include <iostream> #include <atomic> #include <thread> #include <chrono> #include "../../common/rabbitmq.hpp" int main() { IMS::init_logger(false, "", 0); // 1. 创建 MQClient 对象,连接到本地 RabbitMQ(请根据实际情况修改认证信息) IMS::MQClient client("root", "123456", "127.0.0.1:5672"); // 2. 声明组件:交换机 "test-exchange"、队列 "test-queue"、绑定键 "test-key" // 此操作是异步的,内部会通过 AMQP-CPP 发送请求,但不会阻塞主线程。 client.declareComponents("test-exchange", "test-queue", "test-key");//默认交换机类型是direct // 3. 设置消费回调:当收到队列 "test-queue" 的消息时打印内容并通知主线程 std::atomic<bool> messageReceived(false); client.consume("test-queue", [&messageReceived](const char* data, size_t size) { std::string msg(data, size); std::cout << "Received message: " << msg << std::endl; messageReceived = true; }); // 4. 短暂等待 std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 5. 发布一条消息到交换机 "test-exchange",路由键 "test-key" bool pubSuccess = client.publish("test-exchange", "Hello, RabbitMQ!", "test-key"); if (!pubSuccess) { std::cerr << "Failed to publish message!" << std::endl; return 1; } std::cout << "Message published." << std::endl; // 6. 等待消息被消费,设置超时避免无限等待 std::this_thread::sleep_for(std::chrono::seconds(3)); // 7. 检查结果 if (messageReceived) { std::cout << "Test passed: message received successfully." << std::endl; return 0; } else { std::cerr << "Test failed: timeout waiting for message." << std::endl; return 1; } }
ubuntu@10-13-52-255:~/cpp-chatsystem/server/example/mq$ ./direct [default-logger][20:35:48][396036][debug ][../../common/rabbitmq.hpp:150] 开始订阅 test-queue 队列消息! [default-logger][20:35:48][396038][debug ][../../common/rabbitmq.hpp:93] test-exchange 交换机创建成功! [default-logger][20:35:48][396038][debug ][../../common/rabbitmq.hpp:105] test-queue 队列创建成功! [default-logger][20:35:48][396038][debug ][../../common/rabbitmq.hpp:117] test-exchange - test-queue - test-key 绑定成功! [default-logger][20:35:49][396036][debug ][../../common/rabbitmq.hpp:133] 向交换机 test-exchange-test-key 发布消息! Message published. Received message: Hello, RabbitMQ!
示例2

publish.cc

#include "../../../../common/rabbitmq.hpp" #include "../../../../common/logger.hpp" #include <gflags/gflags.h> DEFINE_string(user, "root", "rabbitmq访问用户名"); DEFINE_string(pswd, "123456", "rabbitmq访问密码"); DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port"); DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;"); DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件"); DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级"); int main(int argc, char *argv[]) { google::ParseCommandLineFlags(&argc, &argv, true); IMS::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level); IMS::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host); client.declareComponents("test-exchange", "test-queue");//默认情况下创建的是direct交换机 for (int i = 0; i < 10; i++) { std::string msg = "Hello Bite-" + std::to_string(i); bool ret = client.publish("test-exchange", msg); if (ret == false) { std::cout << "publish 失败!\n"; } } std::this_thread::sleep_for(std::chrono::seconds(3)); return 0; } 

consume.cc

#include "../../../../common/rabbitmq.hpp" #include "../../../../common/logger.hpp" #include <gflags/gflags.h> DEFINE_string(user, "root", "rabbitmq访问用户名"); DEFINE_string(pswd, "123456", "rabbitmq访问密码"); DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port"); DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;"); DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件"); DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级"); void callback(const char *body, size_t sz) { std::string msg; msg.assign(body, sz); std::cout << msg << std::endl; } int main(int argc, char *argv[]) { google::ParseCommandLineFlags(&argc, &argv, true); IMS::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level); IMS::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host); client.declareComponents("test-exchange", "test-queue");//默认情况下创建的是direct交换机 client.consume("test-queue", callback); std::this_thread::sleep_for(std::chrono::seconds(60)); return 0; } 

makfile

all : publish consume publish : publish.cc g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags consume : consume.cc g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags .PHONY : clean clean : rm publish consume

Read more

算法王冠上的明珠——动态规划之路径问题(第一篇)

算法王冠上的明珠——动态规划之路径问题(第一篇)

目录 1. 什么叫路径类动态规划 一、核心定义(通俗理解) 二、核心特征(识别这类问题的关键) 2. 动态规划步骤 状态表示 状态转移方程 初始化 填表顺序 返回值 3. 例题讲解 3.1 LeetCode62. 不同路径 3.2 LeetCode63. 不同路径 II 3.3 LeetCodeLCR 166. 珠宝的最高价值 今天我们来聊一聊动态规划的路径类问题。 1. 什么叫路径类动态规划 路径类动态规划是 动态规划的一个重要分支,核心解决 “从起点到终点的路径相关问题”—— 比如 “路径总数”“最短路径长度”“路径上的最大 / 最小和” 等,其本质是通过 “状态递推” 避免重复计算,高效求解多阶段决策的路径问题。 一、

By Ne0inhk
【动态规划】数位DP的原理、模板(封装类)

【动态规划】数位DP的原理、模板(封装类)

本文涉及知识点 C++动态规划 复杂但相对容易理解的解法 上界、下界的位数一样都为N。如果不一样,拆分一样。比如:[10,200],拆分[10,99]和[100,200]。由于要枚举到 1 ∼ N 1\sim N 1∼N,故实际复杂度是N倍。 动态规划的状态表示 dp[n][m][m1],n表示已经处理最高n位,m表示上下界状态:0非上下界,1下界,2上界,3上下界。m1是自定义状态。 某题范围是[110,190],处理一位后:1是上下界,无其它合法状态。处理二位后,11是下界,19是上界, 12 ∼ 18 12

By Ne0inhk
【LeetCode_27】移除元素

【LeetCode_27】移除元素

刷爆LeetCode系列 * LeetCode27题: * github地址 * 前言 * 题目描述 * 题目思路分析 * 代码实现 * 算法代码优化 LeetCode27题: github地址 有梦想的电信狗 前言 本文用C++实现LeetCode 第27题 题目描述 题目链接:https://leetcode.cn/problems/remove-element/ 题目思路分析 目标分析: 1. 将数组中等于val的元素移除 2. 原地移除,意味着时间复杂度为O(n),空间复杂度为O(1) 3. 返回nums中与val值不同的元素个数 思路:双指针 * src:用于扫描元素,从待扫描元素的第一个开始,因此初始下标为0 * dst:指向数组中,最后一个位置正确的元素的下标,因此初始值为-1 * count:记录赋值的次数,赋值的次数即为数组中与val值不同的元素个数,初始值为0 操作: * nums[

By Ne0inhk
一文彻底搞清楚数据结构之快速排序和归并排序的深入优化

一文彻底搞清楚数据结构之快速排序和归并排序的深入优化

🔥承渊政道:个人主页 ❄️个人专栏: 《C语言基础语法知识》《数据结构与算法初阶》 ✨逆境不吐心中苦,顺境不忘来时路!🎬 博主简介: 前言:前面小编已经介绍八大排序算法的基本思想和实现方法!但关于其中的快速排序和归并排序还有一些细节可以优化!接下来跟着小编来看看快速排序和归并排序的深入优化,学习一下优化完之后,具体在实际中的应用!废话不多说,下面跟着小编的节奏🎵一起学习吧! 目录 * 1.快速排序性能的关键点分析 * 1.1三路划分算法思想讲解 * 1.2hoare和lomuto和三路划分单趟排序代码分析 * 1.3三种快排单趟排序运⾏结果分析 * 2.排序数组OJ题 * 2.1lomuto的快速排序跑排序数组OJ题 * 2.2hoare的快速排序跑排序数组OJ题 * 2.3三路划分的快速排序跑排序数组OJ题 * 2.4introsort的快速排序跑排序数组OJ题 * 3.外排序介绍 * 3.1创建随机数据⽂件的代码 * 3.2⽂件归并排序思路分析 * 3.3⽂件归并排序代码实现 * 3.4非递归版

By Ne0inhk