摘要
本文围绕 RabbitMQ 展开,涵盖其概念、安装、AMQP-CPP 库使用及客户端 API 封装。阐述消息队列原理、特性,详述 Linux 下安装步骤、库安装,展示简单使用案例与 API 封装思路。
RabbitMQ 介绍
什么是 RabbitMQ?
RabbitMQ 是一个开源的、跨平台的消息队列中间件,基于 AMQP(Advanced Message Queuing Protocol)标准实现,由 Erlang 语言编写,广泛应用于分布式系统中实现异步通信、解耦、流量削峰、任务分发等功能。
简单来说,RabbitMQ 是一个'邮局',负责在不同程序之间可靠地传递消息,让发送方和接收方无需直接交互,只需通过 RabbitMQ 进行中转。
核心功能与特点
1. 核心功能
- 消息存储与转发:生产者将消息发送到 RabbitMQ,RabbitMQ 暂存消息并根据规则将消息路由到对应的消费者。
- 多协议支持:虽然基于 AMQP,但还支持 MQTT、STOMP 等协议,适配不同场景。
- 灵活的路由规则:通过交换机和队列的绑定关系,实现复杂的消息路由逻辑。
- 消息确认机制:确保消息不丢失(生产者确认、消费者确认)。
- 高可用与集群:支持镜像队列、集群部署,保障消息服务的可靠性。
2. 核心优势
- 解耦:生产者和消费者无需知道彼此的存在,只需与 RabbitMQ 交互,降低系统模块间的依赖。
- 异步通信:生产者发送消息后无需等待消费者立即处理,提高系统响应速度。
- 流量削峰:突发流量时,RabbitMQ 可以暂存消息,避免下游服务被压垮。
- 可靠性:支持消息持久化、重试、死信队列等机制,确保消息不丢失、不重复处理。
- 跨语言/平台:提供多种语言客户端,适配不同技术栈。
核心概念
- 生产者(Producer):发送消息的应用程序,将消息发布到 RabbitMQ 的交换机中。
- 消费者(Consumer):接收消息的应用程序,从 RabbitMQ 的队列中获取消息并处理。
- 队列(Queue):消息的存储容器,实际存放消息的地方。消费者从队列中拉取消息进行处理。
- 交换机(Exchange):接收生产者发送的消息,并根据路由规则将消息分发到不同的队列。
- Direct(直连):根据精确的路由键匹配队列。
- Topic(主题):支持通配符,按规则模糊匹配路由键。
- Fanout(广播):将消息广播到所有绑定的队列。
- Headers(头部):根据消息的头部属性匹配队列。
- 绑定(Binding):连接交换机和队列的规则,定义了哪些消息应该被路由到哪个队列。
工作流程(以 Direct 交换机为例)
- 生产者:将消息发送到指定的交换机,并指定一个路由键。
- 交换机:根据绑定规则,将消息路由到对应的队列。
- 队列:存储消息,等待消费者拉取。
- 消费者:从队列中获取消息并处理。
常见应用场景
- 异步任务处理:例如用户注册后,发送邮件/短信的通知任务可以异步丢给 RabbitMQ。
- 服务间解耦:订单服务和库存服务通过 RabbitMQ 解耦,订单服务只需发送消息。
- 流量削峰:电商秒杀活动中,大量下单请求先进入队列,后端服务逐步消费。
- 日志收集与监控:多个服务将日志消息发送到 RabbitMQ,由日志服务统一收集。
- 分布式事务补偿:通过 RabbitMQ 协调各服务的最终一致性。
RabbitMQ 与相关技术对比
| 对比项 | RabbitMQ | Kafka | Redis(Stream) |
|---|---|---|---|
| 设计目标 | 通用消息队列(可靠、灵活路由) | 高吞吐、分布式日志流处理 | 轻量级消息队列(简单场景) |
| 协议 | AMQP(及其他) | 自研协议 | Redis 协议 |
| 消息模型 | 交换机 + 队列(灵活路由) | 主题分区(Pub/Sub 或 Stream) | List/PubSub/Stream |
| 可靠性 | 高(支持持久化、确认机制) | 高(副本机制) | 较低(依赖配置) |
| 适用场景 | 业务解耦、异步任务、削峰 | 大数据流处理、实时日志 | 简单队列、缓存通知 |
总结
RabbitMQ 是一个轻量级、高可靠的消息队列中间件,通过灵活的路由机制和多种交换机类型,帮助分布式系统实现异步通信、解耦和流量控制,是构建高可用、可扩展服务的核心工具之一。
RabbitMQ 安装教程
RabbitMQ 安装
1. 安装 RabbitMQ
sudo apt install rabbitmq-server
2. 启动 & 检查状态
# 启动服务
sudo systemctl start rabbitmq-server
# 查看状态(确保 active (running))
sudo systemctl status rabbitmq-server
3. 创建管理员用户(默认 guest 权限低)
# 添加用户(示例:用户名=root,密码=123456)
sudo rabbitmqctl add_user root 123456
# 设为管理员
sudo rabbitmqctl set_user_tags root administrator
# 赋予权限(对所有虚拟主机)
sudo rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
4. 开启 Web 管理界面(默认端口 15672)
sudo rabbitmq-plugins enable rabbitmq_management
访问:http://你的服务器 IP:15672
登录:用户名=root,密码=123456
如果服务器端口未向外开放,需要修改配置文件设置监听的 IP 以及部署的端口。
编辑配置文件:
vim /etc/rabbitmq/rabbitmq.conf
添加对应字段后重启服务即可。
常见问题: 连接 Web UI 时若出现错误,通常不是账号密码问题,而是权限配置问题。重新设置允许登录的新用户并重启服务即可解决。
RabbitMQ 客户端及 C++ 库安装步骤
1. 安装 RabbitMQ 官方客户端库(C 语言)
作用:提供基础的 RabbitMQ C 接口,用于消息通信。
sudo apt-get install librabbitmq-dev
2. 安装 RabbitMQ 的 C++ 客户端库(AMQP-CPP)
作用:基于 C 接口的 C++ 封装库,更适合用 C++ 编写客户端程序。
(1) 安装依赖库 libev(网络库组件):
sudo apt install libev-dev
(2) 下载 AMQP-CPP 源码:
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
(3) 编译并安装:
make && make install
注意:如果编译时出现 SSL 版本错误,需重新执行
make或修复系统依赖。
修复系统依赖:
sudo apt --fix-broken install
卸载冲突的 SSL 库(强制卸载):
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
默认端口介绍
对于 RabbitMQ 的 Web 控制台默认部署的是 15672 端口,服务端默认端口是 5672,集群模式使用的端口是 25672。
AMQP-CPP 库介绍
介绍
1. 是什么?
AMQP-CPP 是一个 C++ 库,专门用于与 RabbitMQ 通信。它的核心功能是解析/生成数据,能处理 RabbitMQ 发来的消息,也能构造要发送的消息。
- 高性能:完全异步(无阻塞调用,不用多线程),适合对性能要求高的场景。
- 依赖 C++17:代码需要用 C++17 或更高版本编译。
2. 怎么用?两种模式
模式 1:TCP 模式(自己管网络)
- 核心操作:用户需自己实现一个类,继承自
AMQP::TcpHandler(负责 TCP 连接)。 - 关键步骤:重写
monitor函数,手动管理 IO,把网络套接字交给自己的事件循环。 - 适用场景:想完全自定义网络层。
模式 2:扩展模式(用现成库省事)
- 以 libev 为例:直接使用 AMQP-CPP 提供的
AMQP::LibEvHandler类,库会自动处理 fd 监控和消息处理。 - 其他支持:除了 libev,还支持 libevent、libuv、asio 等异步通信组件。
- 适用场景:想快速集成,不想手动写网络 IO 代码(推荐大多数用户用)。
后面我们以扩展模式来实现对 RabbitMQ 的访问。
模式 2 常见接口介绍
1. AMQP::Channel: 信道类
Channel(Connection *connection);- 构造函数bool connected()- 检查连接状态Deferred &declareExchange()- 声明交换机DeferredQueue &declareQueue()- 声明队列Deferred &bindQueue()- 将交换机与队列进行绑定bool publish()- 发布消息DeferredConsumer &consume()- 订阅队列消息bool ack()- 消费者客户端对收到的消息进行确认应答
2. class Message: 消息类
const char *body()- 获取消息正文uint64_t bodySize()- 获取消息正文大小
3. libev 使用到的结构体与接口
struct ev_loop *ev_default_loop ();- 实例化并获取 I/O 事件监控结构句柄。int ev_run (struct ev_loop *loop);- 开始运行 I/O 事件监控,这是一个阻塞接口。void ev_break (struct ev_loop *loop, int32_t break_type);- 结束 I/O 监控。void ev_async_init(ev_async *w, callback cb);- 初始化异步事件结构,并设置回调函数。void ev_async_start(struct ev_loop *loop, ev_async *w);- 启动事件监控循环中的异步任务处理。void ev_async_send(struct ev_loop *loop, ev_async *w);- 发送当前异步事件到异步线程中执行。
AMQP-CPP 库简单使用
测试效果
分别实现对应的发布客户端与订阅客户端,然后分别进行底层事件监控,关联 amqp 框架,建立连接、信道、声明交换机、队列、绑定消息等,然后直接进行消息发布,消息订阅即可。
首先启动发布客户端,进行注册对应信息向 MQ 服务端。 可以看到我们 publish 客户端连对应的连接与信道。 对应的交换机,队列,以及发送进来的消息也都正确。 最后启动订阅客户端就能收到对应消息。
注意: 这个和之前实现的仿 RabbitMQ 实现的消息队列不同,本身的 RabbitMQ 消息队列无论是先启动订阅还是发布端,最后订阅端都能收到发进去的消息;但是之前实现的那个仿照版本的必须先启动订阅端然后再是发布端,因为发布端发布进去消息就去找对应订阅队列的消费者的回调函数给它推送过去,反之则不可能进行推送了。
测试代码
publish.cc 与 consume.cc 对应代码详见项目仓库。
RabbitMQ 客户端 API 二次封装
封装思想
- 将事件监控结构 loop、通信连接 connection 以及信道 channel 封装起来。
- 提供创建交换机、队列并进行直接绑定的接口。
- 提供向指定交换机发布消息的接口。
- 提供订阅指定队列消息的接口。
测试效果
客户端进行订阅队列,然后对于底层的事件监控交给对应异步线程,主线程阻塞住等待异步线程完成处理后进行异步事件通知来 break 它后进行回收。 这里发现打印的顺序和调用顺序不一样,因为这里声明完对应交换机、队列后才回去调用对应回调处理,也就是进行发布的过程速度比进行处理声明完回调速度快。 成功收到对应消息(这里 publish 与 consume 顺序谁先谁后也是无关的)。
代码汇总
详细代码见注释。
总结
学习本篇可以知道 RabbitMQ 作为强大消息队列中间件,通过灵活交换机和丰富路由规则实现异步通信、解耦等功能。Linux 下安装便捷,AMQP-CPP 库提供 C++ 交互方式,客户端 API 封装提升开发效率,适用于多种分布式场景。


