掌控消息全链路(3)——RabbitMQ/Spring-AMQP高级特性详解之TTL、死信和延迟

掌控消息全链路(3)——RabbitMQ/Spring-AMQP高级特性详解之TTL、死信和延迟
在这里插入图片描述

🔥我的主页:九转苍翎⭐️个人专栏:《Java SE 》《Java集合框架系统精讲》《MySQL高手之路:从基础到高阶 》《计算机网络 》《Java工程师核心能力体系构建》天行健,君子以自强不息。


Java JDK版本:Oracle OpenJDK 17.0.9
SpringBoot版本:3.5.9

  • Spring Web
  • Lombok
  • Spring for RabbitMQ

RabbitMQ version:3.12.1
RabbitMQ实现延迟队列的插件:rabbitmq_delayed_message_exchange-3.12.0(已免费上传至我的资源)

1.TTL

TTL(Time-To-Live)是RabbitMQ中控制消息或队列生命周期的机制,用于在指定时间后自动删除消息或队列,避免资源堆积消息TTL:为单条消息设置过期时间队列TTL:设置整个队列的过期时间

发送消息

importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/ttl")publicStringttl(){MessagePostProcessor messagePostProcessor = message ->{ message.getMessageProperties().setExpiration("10000");// 设置消息TTL10秒return message;}; rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","TTL为10秒", messagePostProcessor); rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","未设置TTL");return"发送成功";}}

声明和配置交换器、队列和绑定关系

importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("ttlQueue")publicQueuettlQueue(){returnQueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build();// 设置队列TTL为20秒}@Bean("ttlExchange")publicDirectExchangettlExchange(){returnExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")publicBindingttlBinding(@Qualifier("ttlExchange")DirectExchange directExchange,@Qualifier("ttlQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("ttl");}}

注意:两种TTL过期行为的触发条件不同

  • 队列:超过TTL设置的时间后将该队列上的所有消息删除
  • 消息:在即将投递时检查是否过期。为了避免为每条消息维护独立的计时器或对队列进行不间断的全量扫描带来的巨大性能开销,RabbitMQ采用了这种惰性检查策略
    • 例如:一条已过期的消息位于队列中部,前面还有未过期的消息,它将不会被立即删除,而会滞留在队列中,直到它成为队列头部的下一条待投递消息时,才会被检查并丢弃

2.死信队列

死信(Dead Letter) 是指由于某些原因无法被正常消费的消息。RabbitMQ不会自动丢弃这些消息,而是可以将其路由到 死信交换器(Dead Letter Exchange,DLX) ,与DLX进行绑定的队列就是 死信队列(Dead Letter Queue,DLQ)
消息变成死信一般是由于以下几种情况:消息被拒绝(Basic.Reject / Basic.Nack),并且设置requeue为false消息过期队列达到最大长度

配置消费者

importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.nio.charset.StandardCharsets;@Component@Slf4jpublicclassDeadLetterListener{@RabbitListener(queues =Constants.NORMAL_QUEUE)publicvoidlistenNormalQueue(Message message,Channel channel)throwsException{ log.info("normal_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());try{int num =3/0; log.info("处理成功"); channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch(Exception e){ log.info("处理失败"); channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}@RabbitListener(queues =Constants.DEAD_LETTER_QUEUE)publicvoidlistenDeadLetterQueue(Message message){ log.info("dead_letter_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());}}

发送消息

importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/deadLetter")publicStringdeadLetter(){for(int i =0; i <20; i++){ rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","deadLetter"+ i);}return"发送成功";}}

声明和配置交换器、队列和绑定关系

importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("deadLetterQueue")publicQueuedeadLetterQueue(){returnQueueBuilder.durable(Constants.DEAD_LETTER_QUEUE).build();}@Bean("deadLetterExchange")publicDirectExchangedeadLetterExchange(){returnExchangeBuilder.directExchange(Constants.DEAD_LETTER_EXCHANGE).build();}@Bean("deadLetterBinding")publicBindingdlBinding(@Qualifier("deadLetterExchange")DirectExchange directExchange,@Qualifier("deadLetterQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("dead_letter");}@Bean("normalQueue")publicQueuenormalQueue(){returnQueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_LETTER_EXCHANGE).deadLetterRoutingKey("dead_letter")// 绑定死信交换机.ttl(10000)// 测试过期消息.maxLength(10L)// 测试溢出的消息.build();}@Bean("normalExchange")publicDirectExchangenormalExchange(){returnExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")publicBindingnormalBinding(@Qualifier("normalExchange")DirectExchange directExchange,@Qualifier("normalQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("normal");}}

3.延迟队列

延迟队列(Delayed Queue) 是一种特殊的消息队列,其中的消息不会立即被消费者获取,而是在指定的延迟时间过后才会变得可消费用户下单后,订单信息通过fanout交换机转发到正常队列和ttlQueue数据库消费正常队列并将订单信息写入数据库数据库消费支付业务的队列,并完善订单的支付状态30分钟后,消费者处理死信队列,判断支付状态。若已支付,不对数据库进行修改;若未支付,则取消或回滚该订单

3.1 ttl + DLX

RabbitMQ没有内置延迟队列功能,我们可以使用TTL+DLX组合的方式来实现延迟队列,如上述死信队列代码。但由于RabbitMQ对过期消息进行判断时,采用的是惰性检查策略,这会导致无法第一时间判断出消息是否过期

3.2 延迟插件

3.2.1 插件介绍

为了解决上述问题,RabbitMQ官方提供了⼀个延迟插件rabbitmq_delayed_message_exchange来实现延迟队列的功能

# 创建目录并将插件上传至该目录 root@VM-0-7-ubuntu:~# mkdir /usr/lib/rabbitmq/plugins# 查看插件列表 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins list
在这里插入图片描述
# 启动插件 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 重启RabbitMQ服务 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# service rabbitmq-server restart
在这里插入图片描述

3.2.1 代码编写

配置消费者

importlombok.extern.slf4j.Slf4j;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.nio.charset.StandardCharsets;@Component@Slf4jpublicclassDelayListener{@RabbitListener(queues =Constants.DELAY_QUEUE)publicvoidlistenDelayQueue(Message message){ log.info("delay_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());}}
运行结果

发送消息

importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/delay")publicStringdelay(){MessagePostProcessor messagePostProcessor1 = message ->{ message.getMessageProperties().setDelayLong(20000L);return message;};MessagePostProcessor messagePostProcessor2 = message ->{ message.getMessageProperties().setDelayLong(10000L);return message;}; rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay:20000", messagePostProcessor1); rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay:10000", messagePostProcessor2);return"发送成功";}}

声明和配置交换器、队列和绑定关系

importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("delayQueue")publicQueuedelayQueue(){returnQueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")publicDirectExchangedelayExchange(){returnExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")publicBindingdelayBinding(@Qualifier("delayExchange")DirectExchange directExchange,@Qualifier("delayQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("delay");}}

Read more

C++手撕红黑树:从0到200行,拿下STL map底层核心

C++手撕红黑树:从0到200行,拿下STL map底层核心

文章目录 * C++手撕红黑树:从0到200行,拿下STL map底层核心 * 1. 红黑树的概念 * 1.1 红黑树的规则 * 1.2 红黑树如何确保最长路径不超过最短路径的2倍? * 1.3 红黑树的效率 * 2. 红黑树的实现 * 2.1 红黑树的结构 * 2.2 红黑树的插入 * 2.2.1 插入的大概过程 * 2.2.2 情况1:变色 * 2.2.3 情况2:单旋 + 变色 * 2.2.4 情况3:双旋 + 变色 * 2.3 红黑树的插入代码实现 * 2.

By Ne0inhk
C++ 拷贝构造函数与赋值运算符:深拷贝与浅拷贝的核心辨析

C++ 拷贝构造函数与赋值运算符:深拷贝与浅拷贝的核心辨析

C++ 拷贝构造函数与赋值运算符:深拷贝与浅拷贝的核心辨析 💡 学习目标:掌握拷贝构造函数与赋值运算符的定义及调用场景,理解深拷贝与浅拷贝的本质区别,能够在实际开发中避免内存泄漏与野指针问题。 💡 学习重点:拷贝构造函数的触发条件、浅拷贝的缺陷、深拷贝的实现方法、赋值运算符的重载原则。 一、拷贝构造函数的概念与触发场景 ✅ 结论:拷贝构造函数是一种特殊的构造函数,用于通过一个已存在的对象创建一个新对象,其参数必须是本类对象的常量引用(const 类名&)。 1.1 拷贝构造函数的语法格式 class 类名 {public:// 普通构造函数 类名(参数列表);// 拷贝构造函数 类名(const 类名& other);}; ⚠️ 注意事项: 1. 拷贝构造函数的参数必须是常量引用,使用 const 防止实参被修改,使用引用避免无限递归调用拷贝构造函数。 2. 如果没有手动定义拷贝构造函数,编译器会自动生成一个默认拷贝构造函数,实现简单的成员变量值拷贝。 1.2 拷贝构造函数的触发条件

By Ne0inhk
华为OD机试双机位C卷:补种未成活胡杨 (C/C++/Py/Java/Js/Go)

华为OD机试双机位C卷:补种未成活胡杨 (C/C++/Py/Java/Js/Go)

补种未成活胡杨 华为OD机试双机位C卷 - 华为OD上机考试双机位C卷 100分题型 华为OD机试双机位C卷真题目录点击查看: 华为OD机试双机位C卷真题题库目录|机考题库 + 算法考点详解 题目描述 近些年来,我国防沙治沙取得显著成果。某沙漠新种植N棵胡杨(编号1-N),排成一排。 一个月后,有M棵胡杨未能成活。 现可补种胡杨K棵,请问如何补种(只能补种,不能新种),可以得到最多的连续胡杨树? 输入描述 * N 总种植数量,1 <= N <= 100000 * M 未成活胡杨数量,M 个空格分隔的数,按编号从小到大排列,1 <= M <= N * K 最多可以补种的数量,0 <= K <= M 输出描述 * 最多的连续胡杨棵树 示例1

By Ne0inhk
【Java-MySQL】主键、外键有什么区别?

【Java-MySQL】主键、外键有什么区别?

以下是主键(Primary Key)与外键(Foreign Key)的核心区别及适用场景的总结,结合最新数据库实践整理: 🔑 一、本质区别 特性主键外键核心作用唯一标识本表记录(如身份证号)关联其他表的主键(如订单关联用户ID)唯一性✅ 值唯一且不可重复❌ 值可重复(可关联同个主键)空值❌ 不允许为空✅ 允许为空(如“未绑定用户”订单)数量限制每张表仅能有一个主键每张表可有多个外键数据约束强制非空 + 唯一性强制引用其他表的主键值(保证数据一致性) 💡 通俗比喻:主键👉🏻 学生的学号(唯一标识一个人)外键👉🏻 成绩单上的学号(关联学生表,证明成绩属于谁) ⚙️ 二、实际应用场景 ✅ 主键适用场景 1. 标识核心实体: * 用户表 users 中的 user_id(不可重复) 2. 加速查询: * 自动创建聚集索引,大幅提升检索效率

By Ne0inhk