1.介绍
- RabbitMQ:消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)
RabbitMQ 是用于实现客户端主机间消息传输的消息队列组件,核心概念包括交换机、队列、绑定和消息。本文详细介绍了 RabbitMQ 的安装配置及 AMQP-CPP C++ 客户端库的使用方法。内容涵盖交换机类型(广播、直接、主题)、Channel 通道机制、Deferred 异步回调以及 libev 事件循环集成。通过 publish.cc 和 consume.cc 示例展示了声明交换机、队列、绑定关系以及发布和消费消息的完整流程,并提供了 Makefile 构建脚本。

bkey 与绑定的 rkey 对比,一致则放入队列bkey 与绑定的 rkey 进行规则匹配,成功则放入队列sudo apt install rabbitmq-server简单使用:
# 安装完成的时候默认有个用户 guest,但是权限不够,要创建一个 administrator 用户,才可以做为远程登录和发表订阅消息
# 添加用户
sudo rabbitmqctl add_user root <PASSWORD>
# 设置用户 tag
sudo rabbitmqctl set_user_tags root administrator
# 设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自带了 web 管理界面,执行下面命令开启,默认端口 15672
sudo rabbitmq-plugins enable rabbitmq_management
解决方案:卸载当前的 ssl 库,重新进行修复安装
dpkg -l |grep ssl
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
sudo apt --fix-broken install
如果安装时出现以下报错,则表示 ssl 版本出现问题
/usr/include/openssl/macros.h:147:4: error: #error "OPENSSL_API_COMPAT expresses an impossible API compatibility level"
147 | # error "OPENSSL_API_COMPAT expresses an impossible API compatibility level"
| ^~~~~
In file included from /usr/include/openssl/ssl.h:18, from linux_tcp/openssl.h:20, from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor, destructor, or type conversion before 'DEPRECATEDIN_1_1_0'
687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
sudo apt install libev-dev #libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
make install
AMQP-CPP 是用于与 RabbitMQ 消息中间件通信的 C++ 库
RabbitMQ 服务发送来的数据,也可以生成发向 RabbitMQ 的数据包AMQP-CPP 库不会向 RabbitMQ 建立网络连接,所有的网络 IO 由用户完成AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络 IO,
libevent、libev、libuv、asio 等异步通信组件,需要手动安装对应的组件AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中AMQP-CPP 的使用有两种模式:
TCP 模块进行网络通信libevent、libev、libuv、asio 异步通信组件进行通信libev 为例,不需要自己实现 monitor 函数,可以直接使用 AMQP::LibEvHandlerchannel 是一个虚拟连接,一个连接上可以建立多个通道
RabbitMQ 指令都是通过 channel 传输
channelchannel 上执行指令的返回值并不能作为操作执行结果
Deferred 类,可以使用它安装处理函数namespace AMQP {
/**
* Generic callbacks that are used by many deferred objects
*/
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
/**
* Declaring and deleting a queue
*/
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
// 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用 AckCallback
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
// 使用确认包裹通道时,当消息被 ack/nacked 时,会调用这些回调
using PublishAckCallback = std::function<void()>;
using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
// 信道类
class Channel {
Channel(Connection *connection);
bool connected();
/**
* 声明交换机
* 如果提供了一个空名称,则服务器将分配一个名称。
* 以下 flags 可用于交换机:
* *-durable 持久化,重启后交换机依然有效
* *-autodelete 删除所有连接的队列后,自动删除交换
* *-passive 仅被动检查交换机是否存在
* *-internal 创建内部交换
* @param name 交换机的名称
* @param type 交换类型 enum ExchangeType { fanout, 广播交换,绑定的队列都能拿到消息 direct, 直接交换,只将消息交给 routingkey 一致的队列 topic, 主题交换,将消息交给符合 bindingkey 规则的队列 headers, consistent_hash, message_deduplication };
* @param flags 交换机标志
* @param arguments 其他参数
*
* 此函数返回一个延迟处理程序。可以安装回调 using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments);
/**
* 声明队列
* 如果不提供名称,服务器将分配一个名称。
* flags 可以是以下值的组合:
* *-durable 持久队列在代理重新启动后仍然有效
* *-autodelete 当所有连接的使用者都离开时,自动删除队列
* *-passive 仅被动检查队列是否存在
* *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除
* @param name 队列的名称
* @param flags 标志组合
* @param arguments 可选参数
*
* 此函数返回一个延迟处理程序。可以安装回调
* 使用 onSuccess()、onError()和 onFinalize()方法。
* Deferred &onError(const char *message)
*
* 可以安装的 onSuccess()回调应该具有以下签名:void myCallback(const std::string &name, uint32_t messageCount, uint32_t consumerCount); 例如:channel.declareQueue("myqueue").onSuccess( [](const std::string &name, uint32_t messageCount, uint32_t consumerCount) { std::cout << "Queue '" << name << "' "; std::cout << "has been declared with "; std::cout << messageCount; std::cout << " messages and "; std::cout << consumerCount; std::cout << " consumers" << std::endl; });
*/
DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments);
/**
* 将队列绑定到交换机
* @param exchange 源交换机
* @param queue 目标队列
* @param routingkey 路由密钥
* @param arguments 其他绑定参数
*
* 此函数返回一个延迟处理程序。可以安装回调
* 使用 onSuccess()、onError()和 onFinalize()方法。
*/
Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments);
/**
* 将消息发布到 exchange
* 您必须提供交换机的名称和路由密钥。然后,RabbitMQ 将尝试将消息发送到一个或多个队列。使用可选的 flags 参数,可以指定如果消息无法路由到队列时应该发生的情况。默认情况下,不可更改的消息将被静默地丢弃。
*
* 如果设置了'mandatory'或'immediate'标志,则无法处理的消息将返回到应用程序。在开始发布之前,请确保您已经调用了 recall()-方法,并设置了所有适当的处理程序来处理这些返回的消息。
*
* 可以提供以下 flags:
* *-mandatory 如果设置,服务器将返回未发送到队列的消息
* *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
* @param exchange 要发布到的交易所
* @param routingkey 路由密钥
* @param envelope 要发送的完整信封
* @param message 要发送的消息
* @param size 消息的大小
* @param flags 可选标志
*/
bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0);
/**
* 告诉 RabbitMQ 服务器已准备好使用消息 - 也就是 订阅队列消息
*
* 调用此方法后,RabbitMQ 开始向客户端应用程序传递消息。consumer tag 是一个字符串标识符,如果您以后想通过 channel::cancel()调用停止它,可以使用它来标识使用者。
* 如果您没有指定使用者 tag,服务器将为您分配一个。
*
* 支持以下 flags:
* *-nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
* *-noack 如果设置了,则不必对已消费的消息进行确认
* *-exclusive 请求独占访问,只有此使用者可以访问队列
* @param queue 您要使用的队列
* @param tag 将与此消费操作关联的消费者标记
* @param flags 其他标记
* @param arguments 其他参数
*
* 此函数返回一个延迟处理程序。可以使用 onSuccess()、onError()和 onFinalize()方法安装回调
* 可以安装的 onSuccess()回调应该具有以下格式:void myCallback(const std::string_view&tag);样例:channel.consume("myqueue").onSuccess( [](const std::string_view& tag) { std::cout << "Started consuming under tag ";std::cout << tag << std::endl; });
*/
DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments);
/**
* 确认接收到的消息
*
* 消费者客户端对收到的消息进行确认应答
*
* 当在 DeferredConsumer::onReceived() 方法中接收到消息时,必须确认该消息,以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)
*
* 支持以下标志:
* *-多条确认多条消息:之前传递的所有未确认消息也会得到确认
* @param deliveryTag 消息的唯一 delivery 标签
* @param flags 可选标志
* @return bool
*/
bool ack(uint64_t deliveryTag, int flags = 0);
};
class DeferredConsumer {
/* 注册一个回调函数,该函数在消费者启动时被调用 void onSuccess(const std::string &consumertag) */
DeferredConsumer &onSuccess(const ConsumeCallback &callback);
/* 注册回调函数,用于接收到一个完整消息的时候被调用 void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) */
DeferredConsumer &onReceived(const MessageCallback &callback);
/* Alias for onReceived() */
DeferredConsumer &onMessage(const MessageCallback &callback);
/* 注册要在服务器取消消费者时调用的函数 void CancelCallback(const std::string &tag) */
DeferredConsumer &onCancelled(const CancelCallback &callback);
};
class Message : public Envelope {
const std::string &exchange();
const std::string &routingkey();
};
class Envelope : public MetaData {
const char *body(); // 获取消息正文
uint64_t bodySize(); // 获取消息正文大小
};
}
typedef struct ev_async {
EV_WATCHER(ev_async);
EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum {
EVBREAK_CANCEL = 0, /* undo unloop */
EVBREAK_ONE = 1, /* unloop once */
EVBREAK_ALL = 2 /* unloop all loops */
};
// 实例化并获取 IO 事件监控接口句柄
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(=0));
#define EV_DEFAULT ev_default_loop(0)
// 开始运行 IO 事件监控,这是一个阻塞接口
int ev_run(struct ev_loop *loop);
/* break out of the loop */
// 结束 IO 监控
// 如果在主线程进行 ev_run(),则可以直接调用,
// 如果在其他线程中进行 ev_run(),需要通过异步通知进行
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
// 初始化异步事件结构,并设置回调函数
void ev_async_init(ev_async *w, callback cb);
// 启动事件监控循环中的异步任务处理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 发送当前异步事件到异步线程中执行
void ev_async_send(struct ev_loop *loop, ev_async *w);
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main() {
// 1.实例化底层网络通信框架的 IO 事件监控句柄
auto *loop = EV_DEFAULT;
// 2.实例化 libEvHandler 句柄 -> 将 AMQP 框架与事件监控关联起来
AMQP::LibEvHandler handler(loop);
// 3.实例化连接对象
AMQP::Address address("amqp://root:<PASSWORD>@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
// 4.实例化信道对象
AMQP::TcpChannel channel(&connection);
// 5.声明交换机
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
}).onSuccess([]() {
std::cout << "test-exchange 交换机创建成功" << std::endl;
});
// 6.声明队列
channel.declareQueue("test-queue").onError([](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
}).onSuccess([]() {
std::cout << "test-queue 队列创建成功" << std::endl;
});
// 7.针对交换机和队列进行绑定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
}).onSuccess([]() {
std::cout << "test-exchange - test-queue 绑定成功" << std::endl;
});
// 8.向交换机发布消息
for (int i = 0; i < 5; ++i) {
std::string msg = "Hello SnowK-" + std::to_string(i);
if (channel.publish("test-exchange", "test-queue-key", msg) == false) {
std::cout << "publish 失败" << std::endl;
}
}
// 9.启动底层网络通信框架 -> 开启 IO
ev_run(loop, 0);
return 0;
}
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
void MessageCB(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string msg;
msg.assign(message.body(), message.bodySize());
// 不能这样使用,AMQP::Message 后面没有存'\0'
// std::cout << message << std::endl
std::cout << msg << std::endl;
channel->ack(deliveryTag);
}
int main() {
// 1.实例化底层网络通信框架的 IO 事件监控句柄
auto *loop = EV_DEFAULT;
// 2.实例化 libEvHandler 句柄 -> 将 AMQP 框架与事件监控关联起来
AMQP::LibEvHandler handler(loop);
// 3.实例化连接对象
AMQP::Address address("amqp://root:<PASSWORD>@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
// 4.实例化信道对象
AMQP::TcpChannel channel(&connection);
// 5.声明交换机
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
}).onSuccess([]() {
std::cout << "test-exchange 交换机创建成功" << std::endl;
});
// 6.声明队列
channel.declareQueue("test-queue").onError([](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
}).onSuccess([]() {
std::cout << "test-queue 队列创建成功" << std::endl;
});
// 7.针对交换机和队列进行绑定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
}).onSuccess([]() {
std::cout << "test-exchange - test-queue 绑定成功" << std::endl;
});
// 8.订阅消息对垒 -> 设置消息处理回调函数
auto callback = std::bind(MessageCB, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel.consume("test-queue", "consume-tag").onReceived(callback).onError([](const char *message) {
std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl;
exit(0);
});
// 9.启动底层网络通信框架 -> 开启 IO
ev_run(loop, 0);
return 0;
}
all: publish consume
publish: publish.cc
g++ -o $@ $^ -lamqpcpp -lev -std=c++17
consume: consume.cc
g++ -o $@ $^ -lamqpcpp -lev -std=c++17
.PHONY:clean
clean: rm publish consume

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online