掌控消息全链路(3)——RabbitMQ/Spring-AMQP高级特性详解之TTL、死信和延迟

掌控消息全链路(3)——RabbitMQ/Spring-AMQP高级特性详解之TTL、死信和延迟
在这里插入图片描述

🔥我的主页:九转苍翎⭐️个人专栏:《Java SE 》《Java集合框架系统精讲》《MySQL高手之路:从基础到高阶 》《计算机网络 》《Java工程师核心能力体系构建》天行健,君子以自强不息。


Java JDK版本:Oracle OpenJDK 17.0.9
SpringBoot版本:3.5.9

  • Spring Web
  • Lombok
  • Spring for RabbitMQ

RabbitMQ version:3.12.1
RabbitMQ实现延迟队列的插件:rabbitmq_delayed_message_exchange-3.12.0(已免费上传至我的资源)

1.TTL

TTL(Time-To-Live)是RabbitMQ中控制消息或队列生命周期的机制,用于在指定时间后自动删除消息或队列,避免资源堆积消息TTL:为单条消息设置过期时间队列TTL:设置整个队列的过期时间

发送消息

importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/ttl")publicStringttl(){MessagePostProcessor messagePostProcessor = message ->{ message.getMessageProperties().setExpiration("10000");// 设置消息TTL10秒return message;}; rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","TTL为10秒", messagePostProcessor); rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","未设置TTL");return"发送成功";}}

声明和配置交换器、队列和绑定关系

importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("ttlQueue")publicQueuettlQueue(){returnQueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build();// 设置队列TTL为20秒}@Bean("ttlExchange")publicDirectExchangettlExchange(){returnExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")publicBindingttlBinding(@Qualifier("ttlExchange")DirectExchange directExchange,@Qualifier("ttlQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("ttl");}}

注意:两种TTL过期行为的触发条件不同

  • 队列:超过TTL设置的时间后将该队列上的所有消息删除
  • 消息:在即将投递时检查是否过期。为了避免为每条消息维护独立的计时器或对队列进行不间断的全量扫描带来的巨大性能开销,RabbitMQ采用了这种惰性检查策略
    • 例如:一条已过期的消息位于队列中部,前面还有未过期的消息,它将不会被立即删除,而会滞留在队列中,直到它成为队列头部的下一条待投递消息时,才会被检查并丢弃

2.死信队列

死信(Dead Letter) 是指由于某些原因无法被正常消费的消息。RabbitMQ不会自动丢弃这些消息,而是可以将其路由到 死信交换器(Dead Letter Exchange,DLX) ,与DLX进行绑定的队列就是 死信队列(Dead Letter Queue,DLQ)
消息变成死信一般是由于以下几种情况:消息被拒绝(Basic.Reject / Basic.Nack),并且设置requeue为false消息过期队列达到最大长度

配置消费者

importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.nio.charset.StandardCharsets;@Component@Slf4jpublicclassDeadLetterListener{@RabbitListener(queues =Constants.NORMAL_QUEUE)publicvoidlistenNormalQueue(Message message,Channel channel)throwsException{ log.info("normal_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());try{int num =3/0; log.info("处理成功"); channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch(Exception e){ log.info("处理失败"); channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}@RabbitListener(queues =Constants.DEAD_LETTER_QUEUE)publicvoidlistenDeadLetterQueue(Message message){ log.info("dead_letter_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());}}

发送消息

importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/deadLetter")publicStringdeadLetter(){for(int i =0; i <20; i++){ rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","deadLetter"+ i);}return"发送成功";}}

声明和配置交换器、队列和绑定关系

importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("deadLetterQueue")publicQueuedeadLetterQueue(){returnQueueBuilder.durable(Constants.DEAD_LETTER_QUEUE).build();}@Bean("deadLetterExchange")publicDirectExchangedeadLetterExchange(){returnExchangeBuilder.directExchange(Constants.DEAD_LETTER_EXCHANGE).build();}@Bean("deadLetterBinding")publicBindingdlBinding(@Qualifier("deadLetterExchange")DirectExchange directExchange,@Qualifier("deadLetterQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("dead_letter");}@Bean("normalQueue")publicQueuenormalQueue(){returnQueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_LETTER_EXCHANGE).deadLetterRoutingKey("dead_letter")// 绑定死信交换机.ttl(10000)// 测试过期消息.maxLength(10L)// 测试溢出的消息.build();}@Bean("normalExchange")publicDirectExchangenormalExchange(){returnExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")publicBindingnormalBinding(@Qualifier("normalExchange")DirectExchange directExchange,@Qualifier("normalQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("normal");}}

3.延迟队列

延迟队列(Delayed Queue) 是一种特殊的消息队列,其中的消息不会立即被消费者获取,而是在指定的延迟时间过后才会变得可消费用户下单后,订单信息通过fanout交换机转发到正常队列和ttlQueue数据库消费正常队列并将订单信息写入数据库数据库消费支付业务的队列,并完善订单的支付状态30分钟后,消费者处理死信队列,判断支付状态。若已支付,不对数据库进行修改;若未支付,则取消或回滚该订单

3.1 ttl + DLX

RabbitMQ没有内置延迟队列功能,我们可以使用TTL+DLX组合的方式来实现延迟队列,如上述死信队列代码。但由于RabbitMQ对过期消息进行判断时,采用的是惰性检查策略,这会导致无法第一时间判断出消息是否过期

3.2 延迟插件

3.2.1 插件介绍

为了解决上述问题,RabbitMQ官方提供了⼀个延迟插件rabbitmq_delayed_message_exchange来实现延迟队列的功能

# 创建目录并将插件上传至该目录 root@VM-0-7-ubuntu:~# mkdir /usr/lib/rabbitmq/plugins# 查看插件列表 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins list
在这里插入图片描述
# 启动插件 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 重启RabbitMQ服务 root@VM-0-7-ubuntu:/usr/lib/rabbitmq/plugins# service rabbitmq-server restart
在这里插入图片描述

3.2.1 代码编写

配置消费者

importlombok.extern.slf4j.Slf4j;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.nio.charset.StandardCharsets;@Component@Slf4jpublicclassDelayListener{@RabbitListener(queues =Constants.DELAY_QUEUE)publicvoidlistenDelayQueue(Message message){ log.info("delay_queue接收到消息:{},deliveryTag:{}",newString(message.getBody(),StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());}}
运行结果

发送消息

importjakarta.annotation.Resource;importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")publicclassProducerController{@Resource(name ="rabbitTemplate")privateRabbitTemplate rabbitTemplate;@RequestMapping("/delay")publicStringdelay(){MessagePostProcessor messagePostProcessor1 = message ->{ message.getMessageProperties().setDelayLong(20000L);return message;};MessagePostProcessor messagePostProcessor2 = message ->{ message.getMessageProperties().setDelayLong(10000L);return message;}; rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay:20000", messagePostProcessor1); rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay:10000", messagePostProcessor2);return"发送成功";}}

声明和配置交换器、队列和绑定关系

importorg.example.springrabbitmqextensions.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("delayQueue")publicQueuedelayQueue(){returnQueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")publicDirectExchangedelayExchange(){returnExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")publicBindingdelayBinding(@Qualifier("delayExchange")DirectExchange directExchange,@Qualifier("delayQueue")Queue queue){returnBindingBuilder.bind(queue).to(directExchange).with("delay");}}

Read more

【OpenClaw从入门到精通】第10篇:OpenClaw生产环境部署全攻略:性能优化+安全加固+监控运维(2026实测版)

【OpenClaw从入门到精通】第10篇:OpenClaw生产环境部署全攻略:性能优化+安全加固+监控运维(2026实测版)

摘要:本文聚焦OpenClaw从测试环境走向生产环境的核心痛点,围绕“性能优化、安全加固、监控运维”三大维度展开实操讲解。先明确生产环境硬件/系统选型标准,再通过硬件层资源管控、模型调度策略、缓存优化等手段提升响应速度(实测响应效率提升50%+);接着从网络、权限、数据三层构建安全防护体系,集成火山引擎安全方案拦截高危操作;最后落地TenacitOS可视化监控与Prometheus告警体系,配套完整故障排查清单和虚拟实战案例。全文所有配置、代码均经实测验证,兼顾新手入门实操性和进阶读者的生产级部署需求,帮助开发者真正实现OpenClaw从“能用”到“放心用”的跨越。 优质专栏欢迎订阅! 【DeepSeek深度应用】【Python高阶开发:AI自动化与数据工程实战】【YOLOv11工业级实战】 【机器视觉:C# + HALCON】【大模型微调实战:平民级微调技术全解】 【人工智能之深度学习】【AI 赋能:Python 人工智能应用实战】【数字孪生与仿真技术实战指南】 【AI工程化落地与YOLOv8/v9实战】【C#工业上位机高级应用:高并发通信+性能优化】 【Java生产级避坑指南:

By Ne0inhk
ARM Linux 驱动开发篇--- Linux 并发与竞争实验(互斥体实现 LED 设备互斥访问)--- Ubuntu20.04互斥体实验

ARM Linux 驱动开发篇--- Linux 并发与竞争实验(互斥体实现 LED 设备互斥访问)--- Ubuntu20.04互斥体实验

🎬 渡水无言:个人主页渡水无言 ❄专栏传送门: 《linux专栏》《嵌入式linux驱动开发》《linux系统移植专栏》 ❄专栏传送门: 《freertos专栏》《STM32 HAL库专栏》 ⭐️流水不争先,争的是滔滔不绝  📚博主简介:第二十届中国研究生电子设计竞赛全国二等奖 |国家奖学金 | 省级三好学生 | 省级优秀毕业生获得者 | ZEEKLOG新星杯TOP18 | 半导纵横专栏博主 | 211在读研究生 在这里主要分享自己学习的linux嵌入式领域知识;有分享错误或者不足的地方欢迎大佬指导,也欢迎各位大佬互相三连 目录 前言  一、实验基础说明 1.1、互斥体简介 1.2 本次实验设计思路 二、硬件原理分析(看过之前博客的可以忽略) 三、实验程序编写 3.1 互斥体 LED 驱动代码(mutex.c) 3.2.1、设备结构体定义(28-39

By Ne0inhk
Flutter for OpenHarmony:swagger_dart_code_generator 接口代码自动化生成的救星(OpenAPI/Swagger) 深度解析与鸿蒙适配指南

Flutter for OpenHarmony:swagger_dart_code_generator 接口代码自动化生成的救星(OpenAPI/Swagger) 深度解析与鸿蒙适配指南

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 后端工程师扔给你一个 Swagger (OpenAPI) 文档地址,你会怎么做? 1. 对着文档,手写 Dart Model 类(容易写错字段类型)。 2. 手写 Retrofit/Dio 的 API 接口定义(容易拼错 URL)。 3. 当后端修改了字段名,你对着报错修半天。 这是重复劳动的地狱。 swagger_dart_code_generator 可以将 Swagger (JSON/YAML) 文件直接转换为高质量的 Dart 代码,包括: * Model 类:支持 json_serializable,带 fromJson/

By Ne0inhk
Linux 开发别再卡壳!makefile/git/gdb 全流程实操 + 作业解析,新手看完直接用----《Hello Linux!》(5)

Linux 开发别再卡壳!makefile/git/gdb 全流程实操 + 作业解析,新手看完直接用----《Hello Linux!》(5)

文章目录 * 前言 * make/makefile * 文件的三个时间 * Linux第一个小程序-进度条 * 回车和换行 * 缓冲区 * 程序的代码展示 * git指令 * 关于gitee * Linux调试器-gdb使用 * 作业部分 前言 做 Linux 开发时,你是不是也遇到过这些 “卡脖子” 时刻?写 makefile 时,明明语法没错却报错,最后发现是依赖方法行没加 Tab;想提交代码到 gitee,记不清 git add/commit/push 的 “三板斧”,还得反复搜教程;用 gdb 调试程序,输了命令没反应,才想起编译时没加-g生成 debug 版本;甚至连写个进度条,都搞不懂\r和\n的区别,导致进度条乱跳…… 其实这些问题,

By Ne0inhk