RabbitMQ/Spring-AMQP 高级特性:TTL、死信队列与延迟队列详解
RabbitMQ 中 TTL 机制控制消息或队列生命周期,死信队列处理无法消费的消息,延迟队列通过插件实现定时投递。本文详解 Spring-AMQP 配置交换器、队列及绑定关系,演示 TTL 设置、死信触发条件(拒绝、过期、长度溢出)及延迟插件安装与代码集成方案。

RabbitMQ 中 TTL 机制控制消息或队列生命周期,死信队列处理无法消费的消息,延迟队列通过插件实现定时投递。本文详解 Spring-AMQP 配置交换器、队列及绑定关系,演示 TTL 设置、死信触发条件(拒绝、过期、长度溢出)及延迟插件安装与代码集成方案。

Java JDK 版本: Oracle OpenJDK 17.0.9
SpringBoot 版本: 3.5.9
RabbitMQ version: 3.12.1
RabbitMQ 实现延迟队列的插件: rabbitmq_delayed_message_exchange-3.12.0
TTL(Time-To-Live)是 RabbitMQ 中控制消息或队列生命周期的机制,用于在指定时间后自动删除消息或队列,避免资源堆积。
import jakarta.annotation.Resource;
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ttl")
public String ttl() {
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setExpiration("10000"); // 设置消息 TTL 10 秒
return message;
};
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "TTL 为 10 秒", messagePostProcessor);
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "未设置 TTL");
return "发送成功";
}
}
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build(); // 设置队列 TTL 为 20 秒
}
@Bean("ttlExchange")
public DirectExchange ttlExchange() {
return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
}
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("ttl");
}
}
注意: 两种 TTL 过期行为的触发条件不同
死信 (Dead Letter) 是指由于某些原因无法被正常消费的消息。RabbitMQ 不会自动丢弃这些消息,而是可以将其路由到 死信交换器 (Dead Letter Exchange,DLX) ,与 DLX 进行绑定的队列就是 死信队列 (Dead Letter Queue,DLQ)
消息变成死信一般是由于以下几种情况:
- 消息被拒绝 (Basic.Reject / Basic.Nack),并且设置 requeue 为 false
- 消息过期
- 队列达到最大长度
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
public class DeadLetterListener {
@RabbitListener(queues = Constants.NORMAL_QUEUE)
public void listenNormalQueue(Message message, Channel channel) throws Exception {
log.info("normal_queue 接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
try {
int num = 3 / 0;
log.info("处理成功");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
log.info("处理失败");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
}
}
@RabbitListener(queues = Constants.DEAD_LETTER_QUEUE)
public void listenDeadLetterQueue(Message message) {
log.info("dead_letter_queue 接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
}
}
import jakarta.annotation.Resource;
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/deadLetter")
public String deadLetter() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "deadLetter" + i);
}
return "发送成功";
}
}
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE).build();
}
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange(Constants.DEAD_LETTER_EXCHANGE).build();
}
@Bean("deadLetterBinding")
public Binding dlBinding(@Qualifier("deadLetterExchange") DirectExchange directExchange, @Qualifier("deadLetterQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("dead_letter");
}
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("dead_letter") // 绑定死信交换机
.ttl(10000) // 测试过期消息
.maxLength(10L) // 测试溢出的消息
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange directExchange, @Qualifier("normalQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("normal");
}
}
延迟队列 (Delayed Queue) 是一种特殊的消息队列,其中的消息不会立即被消费者获取,而是在指定的延迟时间过后才会变得可消费。 用户下单后,订单信息通过 fanout 交换机转发到正常队列和 ttlQueue;数据库消费正常队列并将订单信息写入数据库;数据库消费支付业务的队列,并完善订单的支付状态;30 分钟后,消费者处理死信队列,判断支付状态。若已支付,不对数据库进行修改;若未支付,则取消或回滚该订单。
RabbitMQ 没有内置延迟队列功能,我们可以使用 TTL+DLX 组合的方式来实现延迟队列,如上述死信队列代码。但由于 RabbitMQ 对过期消息进行判断时,采用的是惰性检查策略,这会导致无法第一时间判断出消息是否过期。
为了解决上述问题,RabbitMQ 官方提供了一个延迟插件 rabbitmq_delayed_message_exchange 来实现延迟队列的功能。
# 创建目录并将插件上传至该目录
root@VM-0-7-ubuntu:~# mkdir /usr/lib/rabbitmq/plugins
# 查看插件列表
root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins list
# 启动插件
root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启 RabbitMQ 服务
root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# service rabbitmq-server restart
import lombok.extern.slf4j.Slf4j;
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
public class DelayListener {
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void listenDelayQueue(Message message) {
log.info("delay_queue 接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
}
}
import jakarta.annotation.Resource;
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/delay")
public String delay() {
MessagePostProcessor messagePostProcessor1 = message -> {
message.getMessageProperties().setDelayLong(20000L);
return message;
};
MessagePostProcessor messagePostProcessor2 = message -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
};
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay:20000", messagePostProcessor1);
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay:10000", messagePostProcessor2);
return "发送成功";
}
}
import org.example.springrabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayExchange") DirectExchange directExchange, @Qualifier("delayQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("delay");
}
}

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online