一、RabbitMQ 总体长什么样?
RabbitMQ 是一个消息代理(Message Broker),实现 AMQP 协议,核心角色是:
- 接收生产者发出的消息
- 根据规则路由到队列
- 再推(或拉)给消费者
官方 tutorials 里把常用模式分成:Hello World、Work Queues、Publish/Subscribe、Routing、Topics、RPC、Publisher Confirms 等几种。
核心概念
| 概念 | 作用 | 简要说明 |
|---|
系统介绍了 RabbitMQ 的核心概念(Broker、Exchange、Queue 等)及七种常用消息模式(简单、工作队列、发布订阅、路由、主题、RPC、发布确认)。详细阐述了消息持久化、ACK 确认机制、QoS 配置以及死信队列的处理逻辑。此外,还对比了基于 TTL+DLX 和官方插件两种延迟队列实现方案,并结合代码示例说明了如何在实际业务中选择合适的模式以确保消息可靠投递。
RabbitMQ 是一个消息代理(Message Broker),实现 AMQP 协议,核心角色是:
官方 tutorials 里把常用模式分成:Hello World、Work Queues、Publish/Subscribe、Routing、Topics、RPC、Publisher Confirms 等几种。
| 概念 | 作用 | 简要说明 |
|---|
| Broker | RabbitMQ 服务器实例 | 负责接收、路由、存储、投递消息 |
| Virtual Host (vhost) | 逻辑'租户/命名空间' | 每个 vhost 有独立的 Exchange/Queue/Binding,用于权限隔离 |
| Connection | TCP 连接 | 客户端与 Broker 之间的物理连接 |
| Channel | 连接里的'轻量级连接' | 复用一个 TCP,多线程时一般每个线程一个 Channel |
| Exchange | 交换机(消息路由器) | 接收生产者消息,根据类型和路由规则分发到队列 |
| Queue | 消息队列 | 真正存储消息,等待消费者消费 |
| Binding | 绑定 | 队列 → 交换机的关系,说明'这个队列想要哪些消息' |
| Routing Key | 路由键 | Direct/Topic Exchange 用来匹配绑定规则 |
| Message | 消息 | 包含 body(业务数据)+ 属性(deliveryMode、headers 等) |

特点:
流程:
适用场景:
关键点:
代码:
生产者:
public class Producer { public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置主机地址 connectionFactory.setHost("127.0.0.1"); // 设置连接端口号:默认为 5672 connectionFactory.setPort(5672); // 虚拟主机名称:默认为 / connectionFactory.setVirtualHost("/"); // 设置连接用户名;默认为 guest connectionFactory.setUsername("guest"); // 设置连接密码;默认为 guest connectionFactory.setPassword("guest"); // 创建连接 Connection connection = connectionFactory.newConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 // queue 参数 1:队列名称 // durable 参数 2:是否定义持久化队列,当 MQ 重启之后还在 // exclusive 参数 3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列 // autoDelete 参数 4:是否在不使用的时候自动删除队列,也就是在没有 Consumer 时自动删除 // arguments 参数 5:队列其它参数 channel.queueDeclare("simple_queue", true, false, false, null); // 要发送的信息 String message = "你好;小兔子!"; // 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange // 参数 2:路由 key,简单模式可以传递队列名称 // 参数 3:配置信息 // 参数 4:消息内容 channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message); // 关闭资源 channel.close(); connection.close(); } }
消费者:
public class Consume { public static void main(String[] args) throws Exception { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2. 设置参数 factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); // 3. 创建连接 Connection Connection connection = factory.newConnection(); // 4. 创建 Channel Channel channel = connection.createChannel(); // 5. 创建队列 // 如果没有一个名字叫 simple_queue 的队列,则会创建该队列,如果有则不会创建 // 参数 1. queue:队列名称 // 参数 2. durable:是否持久化。如果持久化,则当 MQ 重启之后还在 // 参数 3. exclusive:是否独占。 // 参数 4. autoDelete:是否自动删除。当没有 Consumer 时,自动删除掉 // 参数 5. arguments:其它参数。 channel.queueDeclare("simple_queue",true,false,false,null); // 接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 回调方法,当收到消息后,会自动执行该方法 // 参数 1. consumerTag:标识 // 参数 2. envelope:获取一些信息,交换机,路由 key... // 参数 3. properties:配置信息 // 参数 4. body:数据 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; // 参数 1. queue:队列名称 // 参数 2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息 // 参数 3. callback:回调对象 // 消费者类似一个监听程序,主要是用来监听消息 channel.basicConsume("simple_queue",true,consumer); } }
特点:
流程:
适用场景:
关键点:
代码:
生产者:
public class Producer { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~"; channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close(); } }
消费者 1:
public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1 body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
消费者 2:
public class Consumer2 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer2 body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
特点:
流程:
exchangeDeclare("logs", "fanout")queueDeclare() → 得到一个临时队列(exclusive/autoDelete)queueBind(queueName, "logs", "")logs Exchange 发消息,所有队列都会收到一份适用场景:
关键点:
代码:
生产者:
public class Producer { public static void main(String[] args) throws Exception { // 1、获取连接 Connection connection = ConnectionUtil.getConnection(); // 2、创建频道 Channel channel = connection.createChannel(); // 参数 1. exchange:交换机名称 // 参数 2. type:交换机类型 // DIRECT("direct"):定向 // FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。 // TOPIC("topic"):通配符的方式 // HEADERS("headers"):参数匹配 // 参数 3. durable:是否持久化 // 参数 4. autoDelete:自动删除 // 参数 5. internal:内部使用。一般 false // 参数 6. arguments:其它参数 String exchangeName = "test_fanout"; // 3、创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); // 4、创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 5、绑定队列和交换机 // 参数 1. queue:队列名称 // 参数 2. exchange:交换机名称 // 参数 3. routingKey:路由键,绑定规则 // 如果交换机的类型为 fanout,routingKey 设置为"" channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了 findAll 方法...日志级别:info..."; // 6、发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); // 7、释放资源 channel.close(); connection.close(); } }
消费者 1:
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 1 消费者 1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
消费者 2:
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 2 消费者 2 将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
特点:
流程:
exchangeDeclare("direct_logs", "direct")queueBind(Q1, "direct_logs", "error")queueBind(Q2, "direct_logs", "info") 和 "warning""error" → 只有 Q1 收到"info" → 只有 Q2 收到适用场景:
关键点:
代码:
生产者:
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_direct"; // 创建交换机 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null); // 创建队列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; // 声明(创建)队列 channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 队列绑定交换机 // 队列 1 绑定 error channel.queueBind(queue1Name,exchangeName,"error"); // 队列 2 绑定 info error warning channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String message = "日志信息:张三调用了 delete 方法。错误了,日志级别 warning"; // 发送消息 channel.basicPublish(exchangeName,"warning",null,message.getBytes()); System.out.println(message); // 释放资源 channel.close(); connection.close(); } }
消费者 1:
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
消费者 2:
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer2 将日志信息存储到数据库....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
特点:
order.payment.success*:匹配恰好一个单词#:匹配 0 或多个单词例子:
order.* → 匹配 order.payment、order.shippingorder.# → 匹配所有以 order. 开头的路由键order.payment.success → 被 order.# 匹配order.payment → 被 order.* 匹配适用场景:
error.app、error.db、warn.*asia.china、asia.#关键点:
代码:
生产者:
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 绑定队列和交换机 // 参数 1. queue:队列名称 // 参数 2. exchange:交换机名称 // 参数 3. routingKey:路由键,绑定规则 // 如果交换机的类型为 fanout ,routingKey 设置为"" // routing key 常用格式:系统的名称。日志的级别。 // 需求:所有 error 级别的日志存入数据库,所有 order 系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); // 分别发送消息到队列:order.info、goods.info、goods.error String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]"; channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]"; channel.basicPublish(exchangeName,"goods.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]"; channel.basicPublish(exchangeName,"goods.error",null,body.getBytes()); channel.close(); connection.close(); } }
消费者 1:
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue1"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
消费者 2:
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue2"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
特点:
典型流程:
replyTo:回调队列名correlationId:唯一请求 IDreplyTo 指定的回调队列,并带上 correlationIdcorrelationId 找到对应的请求适用场景:
关键点:
correlationId 与请求的映射特点:
适用场景:
关键点:
| Exchange 类型 | 路由规则 | 对应的消息模式 |
|---|---|---|
| Default(无名 Direct) | routing key = 队列名 | 简单模式、工作队列(不显式声明 Exchange 时) |
| Direct | routing key 精确匹配 binding key | 路由模式、简单点对点 |
| Fanout | 忽略 routing key,广播给所有绑定队列 | 发布/订阅模式 |
| Topic | routing key 模式匹配(* / #) | 主题/通配符模式 |
| Headers | 根据消息 headers 匹配 | 很少用,可实现复杂多属性路由 |

durable = truequeueDeclare(..., durable=true, ...)deliveryMode = 2(PERSISTENT)注意:持久化不等于绝对不丢,只是落到磁盘;仍可能因磁盘故障等丢失,需要配合集群、镜像等高可用方案。
basic.ack 确认basic.nack 或 basic.reject,配合:
requeue = true:重新入队requeue = false:进入死信队列(DLX)basicQos(prefetchCount):

死信队列是 RabbitMQ 实现消息可靠性兜底、异常消息隔离、失败重试的核心机制,本质上它是一个普通队列,仅用于接收「触发了死信规则的异常消息」,配套的核心组件是死信交换机。
死信:触发了 RabbitMQ 预设规则,无法被正常消费的消息,会被标记为死信。
死信交换机:用于接收死信消息的普通交换机,类型支持 direct/topic/fanout/headers,和常规交换机用法完全一致。
死信队列:绑定了 DLX 的普通队列,专门用于存储死信消息,供兜底消费、排查问题、归档处理。
核心逻辑:正常队列必须提前绑定 DLX 和死信路由键,消息触发死信规则后,会被 RabbitMQ 自动转发到 DLX,最终路由到死信队列。
只有满足以下任一条件,消息才会成为死信,缺一不可:
| 触发条件 | 详细规则 | 关键坑点 |
|---|---|---|
| 消息被消费者拒绝 | 消费者执行basicNack/basicReject,且 requeue=false(不重新入队) | requeue=true时消息会回到原队列循环投递,永远不会进入死信 |
| 消息 TTL 过期 | 消息在队列中超过了设置的存活时间,且未被消费 | 分为队列级 TTL(队列所有消息统一过期时间)和消息级 TTL(单条消息独立过期时间) |
| 队列达到最大长度 | 队列的消息数量 / 字节数达到预设的max-length/max-length-bytes阈值 | 溢出策略默认drop-head,队首最早入队的消息会被挤出,转为死信 |
生产者 → 正常交换机 → 正常队列(绑定 DLX+ 死信路由键) ↓ 触发死信规则 死信消费者 ← 死信队列 ← 死信交换机(DLX)
延迟队列是指:消息发送后,不希望消费者立即消费,而是等待指定的延迟时间后,再被消费者消费的队列。RabbitMQ 本身没有直接提供延迟队列功能,而是通过「TTL + 死信队列」或「官方延迟插件」两种方式实现,是订单超时、定时提醒、延迟重试等场景的核心解决方案。
这是 RabbitMQ 原生支持的实现方式,无需额外安装插件,基于前文的死信队列能力实现。
生产者 → 延迟交换机 → 缓冲队列(带 TTL,无消费者,绑定 DLX) ↓ TTL 到期,触发死信 延迟消费者 ← 目标消费队列 ← 死信交换机(DLX)
基于前文的死信配置,仅需修改缓冲队列配置,无需额外依赖:
@Configuration public class DelayQueueConfig { // 延迟缓冲交换机、队列 public static final String DELAY_BUFFER_EXCHANGE = "delay.buffer.exchange"; public static final String DELAY_BUFFER_QUEUE = "delay.buffer.queue"; public static final String DELAY_BUFFER_ROUTING_KEY = "delay.buffer.key"; // 延迟目标交换机、队列(实际消费的队列) public static final String DELAY_TARGET_EXCHANGE = "delay.target.exchange"; public static final String DELAY_TARGET_QUEUE = "delay.target.queue"; public static final String DELAY_TARGET_ROUTING_KEY = "delay.target.key"; // 1. 声明缓冲交换机 @Bean public DirectExchange delayBufferExchange() { return ExchangeBuilder.directExchange(DELAY_BUFFER_EXCHANGE).durable(true).build(); } // 2. 声明目标交换机(即死信交换机) @Bean public DirectExchange delayTargetExchange() { return ExchangeBuilder.directExchange(DELAY_TARGET_EXCHANGE).durable(true).build(); } // 3. 声明延迟缓冲队列,设置 TTL+DLX,无消费者监听 @Bean public Queue delayBufferQueue() { Map<String, Object> args = new HashMap<>(); // 绑定死信交换机(目标交换机) args.put("x-dead-letter-exchange", DELAY_TARGET_EXCHANGE); // 死信路由键(目标路由键) args.put("x-dead-letter-routing-key", DELAY_TARGET_ROUTING_KEY); // 设置延迟时间:30 分钟,单位毫秒(订单超时场景) args.put("x-message-ttl", 30 * 60 * 1000); return QueueBuilder.durable(DELAY_BUFFER_QUEUE).withArguments(args).build(); } // 4. 声明目标消费队列 @Bean public Queue delayTargetQueue() { return QueueBuilder.durable(DELAY_TARGET_QUEUE).build(); } // 5. 绑定关系 @Bean public Binding delayBufferBinding() { return BindingBuilder.bind(delayBufferQueue()).to(delayBufferExchange()).with(DELAY_BUFFER_ROUTING_KEY); } @Bean public Binding delayTargetBinding() { return BindingBuilder.bind(delayTargetQueue()).to(delayTargetExchange()).with(DELAY_TARGET_ROUTING_KEY); } }
消息发送与消费
// 消息发送:发送到缓冲队列,无需消费 rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_BUFFER_EXCHANGE, DelayQueueConfig.DELAY_BUFFER_ROUTING_KEY, "订单 123456"); // 延迟消费:监听目标队列,30 分钟后收到消息 @RabbitListener(queues = "delay.target.queue") public void consumeDelay(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { System.out.println("收到延迟消息,执行订单超时取消:" + message); channel.basicAck(deliveryTag, false); }
优缺点
| 优点 | 缺点 |
|---|---|
| 原生支持,无需安装插件,兼容性强 | 队列级 TTL 只能实现固定延迟时间,灵活性差 |
| 实现简单,基于死信队列逻辑,易理解 | 消息级 TTL 存在队首惰性检查坑,不同延迟时间的消息会被阻塞 |
| 适合固定延迟时间的场景 | 大量不同延迟时间的消息会创建大量队列,运维成本高 |
RabbitMQ 官方推出了rabbitmq_delayed_message_exchange插件,完美解决了 TTL 方案的惰性检查问题,支持任意时间的延迟消息,是生产环境的首选方案。
插件新增了一种x-delayed-message类型的交换机,核心逻辑:
x-delay参数(延迟时间),定时检查消息是否到期。步骤 1:插件安装
.ez插件文件,放入 RabbitMQ 的plugins目录rabbitmq-plugins enable rabbitmq_delayed_message_exchangex-delayed-message类型,即为安装成功注意:集群环境下,每个节点都必须安装并启用该插件,否则会出现路由失败。
步骤 2:代码实现
@Configuration public class PluginDelayConfig { public static final String DELAYED_EXCHANGE = "delayed.exchange"; public static final String DELAYED_QUEUE = "delayed.queue"; public static final String DELAYED_ROUTING_KEY = "delayed.key"; // 声明延迟交换机:类型为 x-delayed-message,指定 x-delayed-type 为 direct @Bean public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(); // 指定底层交换机类型,支持 direct/topic/fanout args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args); } // 声明目标消费队列,无特殊配置 @Bean public Queue delayedQueue() { return QueueBuilder.durable(DELAYED_QUEUE).build(); } // 绑定队列与延迟交换机 @Bean public Binding delayedBinding() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs(); } }
消息发送与消费
// 消息发送:通过 header 设置 x-delay,单位毫秒,支持任意延迟时间 String orderId = "123456"; int delayTime = 10 * 60 * 1000; // 10 分钟延迟 rabbitTemplate.convertAndSend(PluginDelayConfig.DELAYED_EXCHANGE, PluginDelayConfig.DELAYED_ROUTING_KEY, orderId, message -> { // 设置延迟时间,核心参数 message.getMessageProperties().setHeader("x-delay", delayTime); return message; }); // 延迟消费:直接监听目标队列,到期自动收到消息 @RabbitListener(queues = "delayed.queue") public void consumeDelayed(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { System.out.println("收到插件延迟消息:" + message); channel.basicAck(deliveryTag, false); }
优缺点
| 优点 | 缺点 |
|---|---|
| 完美解决 TTL 惰性检查问题,支持任意时间的延迟,无顺序阻塞 | 需要安装插件,有少量运维成本 |
| 灵活性极高,单交换机支持不同延迟时间的消息,无需创建多个队列 | 不适合超长延迟场景 |
| 官方维护,稳定性强,支持集群部署 | 插件消息持久化依赖 Mnesia,极端宕机场景有极低概率丢失消息 |
| 场景 | 推荐方案 |
|---|---|
| 固定延迟时间、无插件安装权限、兼容性要求高 | TTL + 死信队列方案 |
| 延迟时间不固定、多延迟时间并存、生产环境高可用要求 | 官方延迟插件方案 |
| 超长延迟 | 不建议用 RabbitMQ,推荐使用 XXL-Job 等定时任务框架 |

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online