跳到主要内容RabbitMQ 核心概念与消息模式详解 | 极客日志Javajava
RabbitMQ 核心概念与消息模式详解
综述由AI生成系统介绍了 RabbitMQ 的核心概念(Broker、Exchange、Queue 等)及七种常用消息模式(简单、工作队列、发布订阅、路由、主题、RPC、发布确认)。详细阐述了消息持久化、ACK 确认机制、QoS 配置以及死信队列的处理逻辑。此外,还对比了基于 TTL+DLX 和官方插件两种延迟队列实现方案,并结合代码示例说明了如何在实际业务中选择合适的模式以确保消息可靠投递。
人间失格41 浏览 一、RabbitMQ 总体长什么样?
RabbitMQ 是一个消息代理(Message Broker),实现 AMQP 协议,核心角色是:
- 接收生产者发出的消息
- 根据规则路由到队列
- 再推(或拉)给消费者
官方 tutorials 里把常用模式分成:Hello World、Work Queues、Publish/Subscribe、Routing、Topics、RPC、Publisher Confirms 等几种。
核心概念
| 概念 | 作用 | 简要说明 |
|---|
| Broker | RabbitMQ 服务器实例 | 负责接收、路由、存储、投递消息 |
| Virtual Host (vhost) | 逻辑'租户/命名空间' | 每个 vhost 有独立的 Exchange/Queue/Binding,用于权限隔离 |
| Connection | TCP 连接 | 客户端与 Broker 之间的物理连接 |
| Channel | 连接里的'轻量级连接' | 复用一个 TCP,多线程时一般每个线程一个 Channel |
| Exchange | 交换机(消息路由器) | 接收生产者消息,根据类型和路由规则分发到队列 |
| Queue | 消息队列 | 真正存储消息,等待消费者消费 |
| Binding | 绑定 | 队列 → 交换机的关系,说明'这个队列想要哪些消息' |
| Routing Key | 路由键 | Direct/Topic Exchange 用来匹配绑定规则 |
| Message | 消息 | 包含 body(业务数据)+ 属性(deliveryMode、headers 等) |
二、消息模式
- 简单模式 / Hello World(Simple Queue)
- 工作队列模式(Work Queues)
- 发布/订阅模式(Publish/Subscribe)
- 路由模式(Routing)
- 主题模式 / 通配符模式(Topics)
- RPC 模式
- Publisher Confirms(发布确认)——更偏可靠性机制,不是'业务模式'

三、各种消息模式
1. 简单模式 / Hello World
特点:
- 一个生产者 P → 一个队列 Queue → 一个消费者 C
- 典型的「点对点」模式
- 一般不显式声明 Exchange,使用默认 Exchange(空字符串 Direct Exchange),路由键 = 队列名
- 生产者声明队列(queueDeclare)
- 生产者发送消息,routingKey = 队列名,发送到默认 Exchange
- 消费者从该队列消费消息
- 消息只能被一个消费者消费一次
- 不需要复杂路由,只是'把任务丢给队列,让后台慢慢处理'
public class Producer { public static void main(String[] args) throws Exception {
public class Consume { public static void main(String[] args) throws Exception {
2. 工作队列模式
- 一个生产者 → 一个队列 → 多个消费者 C1…Cn
- 多个消费者竞争消费同一个队列,一条消息只会被一个消费者消费
- 默认使用轮询(Round-Robin)分发给多个消费者
- 生产者往队列里持续扔任务
- 多个 worker(消费者)同时从队列取任务
- RabbitMQ 按轮询或公平分发策略,把任务分给不同 worker
- 耗时任务异步处理,支持负载均衡:
- 图片/视频处理、压缩
- 订单后续处理(发短信、积分等)
- 消息不会重复消费:每条消息只给一个 worker
- 可以通过 QoS + 手动 ACK 实现'能者多劳'(公平分发):
- 设置 prefetchCount = 1,同时只给一个未确认消息
- 消费者处理慢就少拿点,处理快就多拿点
public class Producer { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~"; channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close(); } }
public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1 body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
public class Consumer2 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer2 body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
3. 发布/订阅模式
- 一个生产者 → 一个 Fanout Exchange → 多个队列 → 多个消费者
- 广播:消息会被所有绑定到该 Exchange 的队列收到
- Fanout Exchange 会忽略 routing key,直接发给所有绑定队列
- 声明 Fanout Exchange:
exchangeDeclare("logs", "fanout")
- 每个消费者声明自己的队列,并绑定到这个 Exchange:
queueDeclare() → 得到一个临时队列(exclusive/autoDelete)
queueBind(queueName, "logs", "")
- 生产者往
logs Exchange 发消息,所有队列都会收到一份
- 广播型事件:
- 日志系统:所有日志服务都收到同一份日志
- 实时新闻、通知广播
- 每个消费者有自己的独立队列,互不影响
- 如果没有队列绑定到 Exchange,消息会被丢弃(Fanout 不存储消息)
public class Producer { public static void main(String[] args) throws Exception {
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 1 消费者 1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 2 消费者 2 将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
4. 路由模式
- 使用 Direct Exchange
- 消息带 routing key,队列绑定到 Exchange 时指定 binding key
- routing key == binding key 时,消息才会路由到该队列
- 声明 Direct Exchange:
exchangeDeclare("direct_logs", "direct")
- 队列 Q1 绑定:
queueBind(Q1, "direct_logs", "error")
- 队列 Q2 绑定:
queueBind(Q2, "direct_logs", "info") 和 "warning"
- 生产者发送:
- routingKey =
"error" → 只有 Q1 收到
- routingKey =
"info" → 只有 Q2 收到
- 按业务类型/级别分发:
- 日志级别:error / warning / info
- 业务类型:订单支付 / 订单物流 / 订单退款
- 一个队列可以绑定多个 routing key
- 同一个 routing key 可以绑定多个队列(实现'多播')
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_direct";
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer2 将日志信息存储到数据库....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
5. 主题模式 / 通配符模式
- 使用 Topic Exchange
- routing key 是带分隔符的字符串,例如
order.payment.success
- 队列绑定的是模式,支持通配符:
- 绑定模式:
order.* → 匹配 order.payment、order.shipping
order.# → 匹配所有以 order. 开头的路由键
- routing key:
order.payment.success → 被 order.# 匹配
order.payment → 被 order.* 匹配
- 多维度分类路由:
- 日志:
error.app、error.db、warn.*
- 地域业务:
asia.china、asia.#
- 比 Direct 更灵活,可以实现'分类 + 层级'路由
- 注意不要滥用太复杂的模式,会影响性能和可读性
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null);
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue1"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue2"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
6. RPC 模式
- 利用 RabbitMQ 实现'请求 - 响应'式的远程调用
- 并不是'标准的 RPC 框架'(如 Dubbo、gRPC),而是一种基于消息的 RPC 模式
- 客户端(调用方):
- 声明一个临时回调队列(exclusive、autoDelete)
- 发送请求消息,设置属性:
replyTo:回调队列名
correlationId:唯一请求 ID
- 监听回调队列
- 服务端:
- 从请求队列取出消息
- 处理请求
- 将结果发送到
replyTo 指定的回调队列,并带上 correlationId
- 客户端:
- 从回调队列收到响应,根据
correlationId 找到对应的请求
- 需要自己处理
correlationId 与请求的映射
- 注意超时、重试、幂等等问题
7. Publisher Confirms(发布确认)
- 不是业务上的'消息模式',而是可靠性机制
- Channel 开启 confirms 模式后:
- 每条消息发布后,Broker 会回传一个确认(ack/nack)
- 生产者可以据此知道消息是否成功到达队列
- 对数据一致性要求高的场景:
- 订单、支付、重要业务事件
- 不允许消息在发送阶段丢失
- 可以批量发布 + 批量确认,提升性能
- 一般配合 事务(transaction) 或 idempotent 发布一起设计
四、四种 Exchange 类型与消息模式的对应
| Exchange 类型 | 路由规则 | 对应的消息模式 |
|---|
| Default(无名 Direct) | routing key = 队列名 | 简单模式、工作队列(不显式声明 Exchange 时) |
| Direct | routing key 精确匹配 binding key | 路由模式、简单点对点 |
| Fanout | 忽略 routing key,广播给所有绑定队列 | 发布/订阅模式 |
| Topic | routing key 模式匹配(* / #) | 主题/通配符模式 |
| Headers | 根据消息 headers 匹配 | 很少用,可实现复杂多属性路由 |
- '竞争消费'? → 用队列 + 多消费者(Work Queue)
- '广播'? → 用 Fanout Exchange
- '按业务类型分发'? → 用 Direct Exchange
- '多维度分类'? → 用 Topic Exchange
五、消息可靠性
1. 消息持久化
- Exchange 持久化:
durable = true
- Queue 持久化:
queueDeclare(..., durable=true, ...)
- Message 持久化:
deliveryMode = 2(PERSISTENT)
注意:持久化不等于绝对不丢,只是落到磁盘;仍可能因磁盘故障等丢失,需要配合集群、镜像等高可用方案。
2. ACK 与 NACK
- 消费者处理完消息后,发送
basic.ack 确认
- 如果处理失败,可以
basic.nack 或 basic.reject,配合:
requeue = true:重新入队
requeue = false:进入死信队列(DLX)
3. QoS(prefetch)
- 设置
basicQos(prefetchCount):
- 限制'未确认消息的最大数量'
- 避免某个消费者被压垮,实现更公平的任务分配
六、根据业务选择消息模式
- 只是异步处理一个任务,不需要广播:
→ 简单模式 / 工作队列
- 一条消息要被多个系统同时处理:
→ 发布/订阅 / 路由 / 主题
- 需要按业务维度/日志级别/地域等分类路由:
→ Direct 或 Topic
- 需要同步返回结果:
→ RPC 模式
七、死信队列
死信队列是 RabbitMQ 实现消息可靠性兜底、异常消息隔离、失败重试的核心机制,本质上它是一个普通队列,仅用于接收「触发了死信规则的异常消息」,配套的核心组件是死信交换机。
**1、**核心概念
死信:触发了 RabbitMQ 预设规则,无法被正常消费的消息,会被标记为死信。
死信交换机:用于接收死信消息的普通交换机,类型支持 direct/topic/fanout/headers,和常规交换机用法完全一致。
死信队列:绑定了 DLX 的普通队列,专门用于存储死信消息,供兜底消费、排查问题、归档处理。
核心逻辑:正常队列必须提前绑定 DLX 和死信路由键,消息触发死信规则后,会被 RabbitMQ 自动转发到 DLX,最终路由到死信队列。
2、死信消息的 3 大核心触发条件
只有满足以下任一条件,消息才会成为死信,缺一不可:
| 触发条件 | 详细规则 | 关键坑点 |
|---|
| 消息被消费者拒绝 | 消费者执行basicNack/basicReject,且 requeue=false(不重新入队) | requeue=true时消息会回到原队列循环投递,永远不会进入死信 |
| 消息 TTL 过期 | 消息在队列中超过了设置的存活时间,且未被消费 | 分为队列级 TTL(队列所有消息统一过期时间)和消息级 TTL(单条消息独立过期时间) |
| 队列达到最大长度 | 队列的消息数量 / 字节数达到预设的max-length/max-length-bytes阈值 | 溢出策略默认drop-head,队首最早入队的消息会被挤出,转为死信 |
3、死信队列核心架构
生产者 → 正常交换机 → 正常队列(绑定 DLX+ 死信路由键) ↓ 触发死信规则 死信消费者 ← 死信队列 ← 死信交换机(DLX)
4、核心使用场景
- 异常消息兜底:业务消费失败的消息,避免无限循环重试,转入死信队列做人工排查 / 兜底处理,防止消息丢失。
- 超时消息处理:订单未支付超时、会话超时等场景,通过 TTL 触发死信,实现超时逻辑。
- 队列溢出保护:设置队列最大长度,流量峰值时溢出的消息转入死信队列,避免 RabbitMQ 内存溢出崩溃。
- 消息重试机制:结合延迟队列,实现消费失败后的指数退避重试,多次重试失败后转入死信归档。
八、延迟队列
延迟队列是指:消息发送后,不希望消费者立即消费,而是等待指定的延迟时间后,再被消费者消费的队列。RabbitMQ 本身没有直接提供延迟队列功能,而是通过「TTL + 死信队列」或「官方延迟插件」两种方式实现,是订单超时、定时提醒、延迟重试等场景的核心解决方案。
1. 实现方式一:TTL + 死信队列
这是 RabbitMQ 原生支持的实现方式,无需额外安装插件,基于前文的死信队列能力实现。
核心原理
- 创建一个无消费者的缓冲队列,给队列 / 消息设置 TTL(延迟时间)。
- 缓冲队列绑定 DLX 和死信路由键,消息在缓冲队列中到期后,会触发 TTL 死信规则,被转发到 DLX。
- 死信队列绑定 DLX,消费者监听死信队列,此时收到的消息就是已经过了延迟时间的消息,实现延迟消费。
生产者 → 延迟交换机 → 缓冲队列(带 TTL,无消费者,绑定 DLX) ↓ TTL 到期,触发死信 延迟消费者 ← 目标消费队列 ← 死信交换机(DLX)
实现代码
基于前文的死信配置,仅需修改缓冲队列配置,无需额外依赖:
@Configuration public class DelayQueueConfig {
| 优点 | 缺点 |
|---|
| 原生支持,无需安装插件,兼容性强 | 队列级 TTL 只能实现固定延迟时间,灵活性差 |
| 实现简单,基于死信队列逻辑,易理解 | 消息级 TTL 存在队首惰性检查坑,不同延迟时间的消息会被阻塞 |
| 适合固定延迟时间的场景 | 大量不同延迟时间的消息会创建大量队列,运维成本高 |
2、实现方式二:官方延迟消息插件
RabbitMQ 官方推出了rabbitmq_delayed_message_exchange插件,完美解决了 TTL 方案的惰性检查问题,支持任意时间的延迟消息,是生产环境的首选方案。
核心原理
插件新增了一种x-delayed-message类型的交换机,核心逻辑:
- 消息发送到该交换机后,不会立即投递到队列,而是先存储到 Mnesia 分布式表中。
- 交换机根据消息头部的
x-delay参数(延迟时间),定时检查消息是否到期。
- 消息到期后,交换机将消息投递到绑定的目标队列,消费者直接监听目标队列即可。
- 下载与你的 RabbitMQ 版本完全匹配的插件
- 将下载的
.ez插件文件,放入 RabbitMQ 的plugins目录
- 执行启用命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重启 RabbitMQ 服务,管理端 Exchange 页面可看到
x-delayed-message类型,即为安装成功
注意:集群环境下,每个节点都必须安装并启用该插件,否则会出现路由失败。
@Configuration public class PluginDelayConfig { public static final String DELAYED_EXCHANGE = "delayed.exchange"; public static final String DELAYED_QUEUE = "delayed.queue"; public static final String DELAYED_ROUTING_KEY = "delayed.key";
| 优点 | 缺点 |
|---|
| 完美解决 TTL 惰性检查问题,支持任意时间的延迟,无顺序阻塞 | 需要安装插件,有少量运维成本 |
| 灵活性极高,单交换机支持不同延迟时间的消息,无需创建多个队列 | 不适合超长延迟场景 |
| 官方维护,稳定性强,支持集群部署 | 插件消息持久化依赖 Mnesia,极端宕机场景有极低概率丢失消息 |
3、两种方案选型对比
| 场景 | 推荐方案 |
|---|
| 固定延迟时间、无插件安装权限、兼容性要求高 | TTL + 死信队列方案 |
| 延迟时间不固定、多延迟时间并存、生产环境高可用要求 | 官方延迟插件方案 |
| 超长延迟 | 不建议用 RabbitMQ,推荐使用 XXL-Job 等定时任务框架 |
4、核心使用场景
- 订单超时自动取消(30 分钟未支付关闭订单、恢复库存)
- 预约提醒(会议前 15 分钟、出行前 1 小时提醒)
- 延迟重试(消费失败后,10s/30s/1min/5min 指数退避重试)
- 缓存预热(指定时间预热热点数据)
- 超时关闭会话(用户长时间无操作自动登出)
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online