掌控消息全链路(3)——RabbitMQ/Spring-AMQP高级特性详解之TTL、死信和延迟
🔥我的主页:九转苍翎⭐️个人专栏:《Java SE 》《Java集合框架系统精讲》《MySQL高手之路:从基础到高阶 》《计算机网络 》《Java工程师核心能力体系构建》天行健,君子以自强不息。
Java JDK版本:Oracle OpenJDK 17.0.9
SpringBoot版本:3.5.9
- Spring Web
- Lombok
- Spring for RabbitMQ
RabbitMQ version:3.12.1
RabbitMQ实现延迟队列的插件:rabbitmq_delayed_message_exchange-3.12.0(已免费上传至我的资源)
1.TTL
TTL(Time-To-Live)是RabbitMQ中控制消息或队列生命周期的机制,用于在指定时间后自动删除消息或队列,避免资源堆积消息TTL:为单条消息设置过期时间队列TTL:设置整个队列的过期时间发送消息
importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/ttl")publicStringttl(){MessagePostProcessor messagePostProcessor = message ->{ message.getMessageProperties().setExpiration("10000");// 设置消息TTL10秒return message;}; rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","TTL为10秒", messagePostProcessor); rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","未设置TTL");return"发送成功";}}声明和配置交换器、队列和绑定关系
importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("ttlQueue")publicQueuettlQueue(){returnQueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build();// 设置队列TTL为20秒}@Bean("ttlExchange")publicDirectExchangettlExchange(){returnExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")publicBindingttlBinding(@Qualifier("ttlExchange")DirectExchange directExchange,@Qualifier("ttlQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("ttl");}}注意:两种TTL过期行为的触发条件不同
- 队列:超过TTL设置的时间后将该队列上的所有消息删除
- 消息:在即将投递时检查是否过期。为了避免为每条消息维护独立的计时器或对队列进行不间断的全量扫描带来的巨大性能开销,RabbitMQ采用了这种惰性检查策略
- 例如:一条已过期的消息位于队列中部,前面还有未过期的消息,它将不会被立即删除,而会滞留在队列中,直到它成为队列头部的下一条待投递消息时,才会被检查并丢弃
2.死信队列
死信(Dead Letter) 是指由于某些原因无法被正常消费的消息。RabbitMQ不会自动丢弃这些消息,而是可以将其路由到 死信交换器(Dead Letter Exchange,DLX) ,与DLX进行绑定的队列就是 死信队列(Dead Letter Queue,DLQ)
消息变成死信一般是由于以下几种情况:消息被拒绝(Basic.Reject / Basic.Nack),并且设置requeue为false消息过期队列达到最大长度
配置消费者
importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.nio.charset.StandardCharsets;@Component@Slf4jpublicclassDeadLetterListener{@RabbitListener(queues =Constants.NORMAL_QUEUE)publicvoidlistenNormalQueue(Message message,Channel channel)throwsException{ log.info("normal_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());try{int num =3/0; log.info("处理成功"); channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch(Exception e){ log.info("处理失败"); channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}@RabbitListener(queues =Constants.DEAD_LETTER_QUEUE)publicvoidlistenDeadLetterQueue(Message message){ log.info("dead_letter_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());}}发送消息
importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/deadLetter")publicStringdeadLetter(){for(int i =0; i <20; i++){ rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","deadLetter"+ i);}return"发送成功";}}声明和配置交换器、队列和绑定关系
importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("deadLetterQueue")publicQueuedeadLetterQueue(){returnQueueBuilder.durable(Constants.DEAD_LETTER_QUEUE).build();}@Bean("deadLetterExchange")publicDirectExchangedeadLetterExchange(){returnExchangeBuilder.directExchange(Constants.DEAD_LETTER_EXCHANGE).build();}@Bean("deadLetterBinding")publicBindingdlBinding(@Qualifier("deadLetterExchange")DirectExchange directExchange,@Qualifier("deadLetterQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("dead_letter");}@Bean("normalQueue")publicQueuenormalQueue(){returnQueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_LETTER_EXCHANGE).deadLetterRoutingKey("dead_letter")// 绑定死信交换机.ttl(10000)// 测试过期消息.maxLength(10L)// 测试溢出的消息.build();}@Bean("normalExchange")publicDirectExchangenormalExchange(){returnExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")publicBindingnormalBinding(@Qualifier("normalExchange")DirectExchange directExchange,@Qualifier("normalQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("normal");}}3.延迟队列
延迟队列(Delayed Queue) 是一种特殊的消息队列,其中的消息不会立即被消费者获取,而是在指定的延迟时间过后才会变得可消费用户下单后,订单信息通过fanout交换机转发到正常队列和ttlQueue数据库消费正常队列并将订单信息写入数据库数据库消费支付业务的队列,并完善订单的支付状态30分钟后,消费者处理死信队列,判断支付状态。若已支付,不对数据库进行修改;若未支付,则取消或回滚该订单
3.1 ttl + DLX
RabbitMQ没有内置延迟队列功能,我们可以使用TTL+DLX组合的方式来实现延迟队列,如上述死信队列代码。但由于RabbitMQ对过期消息进行判断时,采用的是惰性检查策略,这会导致无法第一时间判断出消息是否过期
3.2 延迟插件
3.2.1 插件介绍
为了解决上述问题,RabbitMQ官方提供了⼀个延迟插件rabbitmq_delayed_message_exchange来实现延迟队列的功能
# 创建目录并将插件上传至该目录 root@VM-0-7-ubuntu:~# mkdir /usr/lib/rabbitmq/plugins# 查看插件列表 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins list# 启动插件 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 重启RabbitMQ服务 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# service rabbitmq-server restart3.2.1 代码编写
配置消费者
importlombok.extern.slf4j.Slf4j;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.nio.charset.StandardCharsets;@Component@Slf4jpublicclassDelayListener{@RabbitListener(queues =Constants.DELAY_QUEUE)publicvoidlistenDelayQueue(Message message){ log.info("delay_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());}}运行结果
发送消息
importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/delay")publicStringdelay(){MessagePostProcessor messagePostProcessor1 = message ->{ message.getMessageProperties().setDelayLong(20000L);return message;};MessagePostProcessor messagePostProcessor2 = message ->{ message.getMessageProperties().setDelayLong(10000L);return message;}; rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay:20000", messagePostProcessor1); rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay:10000", messagePostProcessor2);return"发送成功";}}声明和配置交换器、队列和绑定关系
importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("delayQueue")publicQueuedelayQueue(){returnQueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")publicDirectExchangedelayExchange(){returnExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")publicBindingdelayBinding(@Qualifier("delayExchange")DirectExchange directExchange,@Qualifier("delayQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("delay");}}