详解RabbitMQ高级特性之死信队列

目录
死信队列
死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信.
有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器
中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(Dead
Letter Queue,简称DLQ).

消息变成死信⼀般是由于以下⼏种情况:
1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
2. 消息过期.
3. 队列达到最⼤⻓度.
添加配置
spring: application: name: rabbit-extensions-demo rabbitmq: addresses: amqp://study:[email protected]:5672/extension
常量类
public class Constants { //死信 public static final String NORMAL_QUEUE = "normal.queue"; public static final String NORMAL_EXCHANGE = "normal.exchange"; public static final String DL_QUEUE = "dl.queue"; public static final String DL_EXCHANGE= "dl.exchange"; }声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import rabbitextensionsdemo.constant.Constants; @Configuration public class DLConfig { //正常的交换机和队列 @Bean("normalQueue") public Queue normalQueue(){ return QueueBuilder.durable(Constants.NORMAL_QUEUE) .deadLetterExchange(Constants.DL_EXCHANGE) .deadLetterRoutingKey("dlx") .build(); } @Bean("normalExchange") public DirectExchange normalExchange(){ return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build(); } @Bean("normalBinding") public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("normal").noargs(); } //死信交换机和队列 @Bean("dlQueue") public Queue dlQueue(){ return QueueBuilder.durable(Constants.DL_QUEUE).build(); } @Bean("dlExchange") public DirectExchange dlExchange(){ return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build(); } @Bean("dlBinding") public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs(); } }死信--消息过期
给队列设置TTL

编写生产消息代码
@RequestMapping("/dl") public String dl() { System.out.println("dl..."); //发送普通消息 rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."); System.out.printf("%tc 消息发送成功 \n", new Date()); return "消息发送成功"; }编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; import java.util.Date; @Component public class DLListener { @RabbitListener(queues = Constants.DL_QUEUE) public void dlHandMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); } }观察现象




我们可以看到,消息在10秒后过期,从normal队列进入到了死信队列,消息进入到死信队列后被消费。
死信--消息超过队列最大长度
设置队列的最大长度

编写生产消息代码
@RequestMapping("/dl") public String dl() { //测试队列长度 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i); } return "消息发送成功"; }编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; import java.util.Date; @Component public class DLListener { @RabbitListener(queues = Constants.DL_QUEUE) public void dlHandMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); } }观察现象



此时我们可以看到,给队列设置了最大长度为10,但是队列接收到了20条消息,就会导致前10条消息变成死信。
死信--消息被拒绝
编写生产消息代码
@RequestMapping("/dl") public String dl() { System.out.println("dl..."); //发送普通消息 rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."); System.out.printf("%tc 消息发送成功 \n", new Date()); return "消息发送成功"; }编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; @Component public class DLListener { @RabbitListener(queues = Constants.NORMAL_QUEUE) public void handMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //消费者逻辑 System.out.printf("[normal.queue]接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); //进行业务逻辑处理 System.out.println("业务逻辑处理"); int num = 3/0; System.out.println("业务处理完成"); //肯定确认 channel.basicAck(deliveryTag,false); } catch (Exception e) { //否定确认 channel.basicNack(deliveryTag, false, false); //requeue为false, 该消息成为死信 } } }观察现象



可以看到,normal队列中的消息在被消费时因为发生了异常而执行到了拒绝消息的代码,而且设置了消息不重新入队,导致消息变成了死信,进而进入到了死信队列。
面试题
1.死信队列的概念
死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息。
2.死信的来源
1) 消息过期: 消息在队列中存活的时间超过了设定的TTL
2) 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
3) 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.
3.死信的应用场景
对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统.
⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态
为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).
场景的应⽤场景还有:
• 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理.
• 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源.
• ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位.