详解RabbitMQ高级特性之死信队列

详解RabbitMQ高级特性之死信队列

目录

死信队列

添加配置

常量类

声明队列和交换机并绑定二者关系

死信--消息过期

给队列设置TTL

编写生产消息代码

编写消费消息代码

观察现象

死信--消息超过队列最大长度

设置队列的最大长度

编写生产消息代码

编写消费消息代码

观察现象

死信--消息被拒绝

编写生产消息代码

编写消费消息代码

观察现象

面试题


死信队列

死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信.
有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器
中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(Dead
Letter Queue,简称DLQ).

消息变成死信⼀般是由于以下⼏种情况:

1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
2. 消息过期.
3. 队列达到最⼤⻓度.
添加配置
spring: application: name: rabbit-extensions-demo rabbitmq: addresses: amqp://study:[email protected]:5672/extension
常量类
public class Constants { //死信 public static final String NORMAL_QUEUE = "normal.queue"; public static final String NORMAL_EXCHANGE = "normal.exchange"; public static final String DL_QUEUE = "dl.queue"; public static final String DL_EXCHANGE= "dl.exchange"; }
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import rabbitextensionsdemo.constant.Constants; @Configuration public class DLConfig { //正常的交换机和队列 @Bean("normalQueue") public Queue normalQueue(){ return QueueBuilder.durable(Constants.NORMAL_QUEUE) .deadLetterExchange(Constants.DL_EXCHANGE) .deadLetterRoutingKey("dlx") .build(); } @Bean("normalExchange") public DirectExchange normalExchange(){ return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build(); } @Bean("normalBinding") public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("normal").noargs(); } //死信交换机和队列 @Bean("dlQueue") public Queue dlQueue(){ return QueueBuilder.durable(Constants.DL_QUEUE).build(); } @Bean("dlExchange") public DirectExchange dlExchange(){ return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build(); } @Bean("dlBinding") public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs(); } }
死信--消息过期
给队列设置TTL
编写生产消息代码
 @RequestMapping("/dl") public String dl() { System.out.println("dl..."); //发送普通消息 rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."); System.out.printf("%tc 消息发送成功 \n", new Date()); return "消息发送成功"; }
编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; import java.util.Date; @Component public class DLListener { @RabbitListener(queues = Constants.DL_QUEUE) public void dlHandMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); } }
观察现象

我们可以看到,消息在10秒后过期,从normal队列进入到了死信队列,消息进入到死信队列后被消费。

死信--消息超过队列最大长度
设置队列的最大长度
编写生产消息代码
 @RequestMapping("/dl") public String dl() { //测试队列长度 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i); } return "消息发送成功"; }
编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; import java.util.Date; @Component public class DLListener { @RabbitListener(queues = Constants.DL_QUEUE) public void dlHandMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); } }
观察现象

此时我们可以看到,给队列设置了最大长度为10,但是队列接收到了20条消息,就会导致前10条消息变成死信。

死信--消息被拒绝
编写生产消息代码
 @RequestMapping("/dl") public String dl() { System.out.println("dl..."); //发送普通消息 rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."); System.out.printf("%tc 消息发送成功 \n", new Date()); return "消息发送成功"; }
编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; @Component public class DLListener { @RabbitListener(queues = Constants.NORMAL_QUEUE) public void handMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //消费者逻辑 System.out.printf("[normal.queue]接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); //进行业务逻辑处理 System.out.println("业务逻辑处理"); int num = 3/0; System.out.println("业务处理完成"); //肯定确认 channel.basicAck(deliveryTag,false); } catch (Exception e) { //否定确认 channel.basicNack(deliveryTag, false, false); //requeue为false, 该消息成为死信 } } }
观察现象

可以看到,normal队列中的消息在被消费时因为发生了异常而执行到了拒绝消息的代码,而且设置了消息不重新入队,导致消息变成了死信,进而进入到了死信队列。

面试题

1.死信队列的概念

死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息。

2.死信的来源

1) 消息过期: 消息在队列中存活的时间超过了设定的TTL
2) 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
3) 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.

3.死信的应用场景 

对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统.

⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态
为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).

场景的应⽤场景还有:

• 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理.
• 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源.
• ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位.

Read more

安装 启动 使用 Neo4j的超详细教程

安装 启动 使用 Neo4j的超详细教程

最近在做一个基于知识图谱的智能生成项目。需要用到Neo4j图数据库。写这篇文章记录一下Neo4j的安装及其使用。 一.Neo4j的安装 1.首先安装JDK,配环境变量。(参照网上教程,很多) Neo4j是基于Java的图形数据库,运行Neo4j需要启动JVM进程,因此必须安装JAVA SE的JDK。从Oracle官方网站下载 Java SE JDK。我使用的版本是JDK1.8 2.官网上安装neo4j。 官方网址:https://neo4j.com/deployment-center/  在官网上下载对应版本。Neo4j应用程序有如下主要的目录结构: bin目录:用于存储Neo4j的可执行程序; conf目录:用于控制Neo4j启动的配置文件; data目录:用于存储核心数据库文件; plugins目录:用于存储Neo4j的插件; 3.配置环境变量 创建主目录环境变量NEO4J_HOME,并把主目录设置为变量值。复制具体的neo4j文件地址作为变量值。 配置文档存储在conf目录下,Neo4j通过配置文件neo4j.conf控制服务器的工作。默认情况下,不需

企业微信群机器人Webhook配置全攻略:从创建到发送消息的完整流程

企业微信群机器人Webhook配置全攻略:从创建到发送消息的完整流程 在数字化办公日益普及的今天,企业微信作为国内领先的企业级通讯工具,其群机器人功能为团队协作带来了极大的便利。本文将手把手教你如何从零开始配置企业微信群机器人Webhook,实现自动化消息推送,提升团队沟通效率。 1. 准备工作与环境配置 在开始创建机器人之前,需要确保满足以下基本条件: * 企业微信账号:拥有有效的企业微信管理员或成员账号 * 群聊条件:至少包含3名成员的群聊(这是创建机器人的最低人数要求) * 网络环境:能够正常访问企业微信服务器 提示:如果是企业管理员,建议先在"企业微信管理后台"确认机器人功能是否已对企业开放。某些企业可能出于安全考虑会限制此功能。 2. 创建群机器人 2.1 添加机器人到群聊 1. 打开企业微信客户端,进入目标群聊 2. 点击右上角的群菜单按钮(通常显示为"..."或"⋮") 3. 选择"添加群机器人"选项 4.

Flowise物联网融合:与智能家居设备联动的应用设想

Flowise物联网融合:与智能家居设备联动的应用设想 1. Flowise:让AI工作流变得像搭积木一样简单 Flowise 是一个真正把“AI平民化”落地的工具。它不像传统开发那样需要写几十行 LangChain 代码、配置向量库、调试提示词模板,而是把所有这些能力打包成一个个可拖拽的节点——就像小时候玩乐高,你不需要懂塑料怎么合成,只要知道哪块该拼在哪,就能搭出一座城堡。 它诞生于2023年,短短一年就收获了45.6k GitHub Stars,MIT协议开源,意味着你可以放心把它用在公司内部系统里,甚至嵌入到客户交付的产品中,完全不用担心授权问题。最打动人的不是它的技术多炫酷,而是它真的“不挑人”:产品经理能搭出知识库问答机器人,运营同学能配出自动抓取竞品文案的Agent,连刚学Python两周的实习生,也能在5分钟内跑通一个本地大模型的RAG流程。 它的核心逻辑很朴素:把LangChain里那些抽象概念——比如LLM调用、文档切分、向量检索、工具调用——变成画布上看得见、摸得着的方块。你拖一个“Ollama LLM”节点,再拖一个“Chroma Vector

OpenClaw配置Bot接入飞书机器人+Kimi2.5

OpenClaw配置Bot接入飞书机器人+Kimi2.5

上一篇文章写了Ubuntu_24.04下安装OpenClaw的过程,这篇文档记录一下接入飞书机器+Kimi2.5。 准备工作 飞书 创建飞书机器人 访问飞书开放平台:https://open.feishu.cn/app,点击创建应用: 填写应用名称和描述后就直接创建: 复制App ID 和 App Secret 创建成功后,在“凭证与基础信息”中找到 App ID 和 App Secret,把这2个信息复制记录下来,后面需要配置到openclaw中 配置权限 点击【权限管理】→【开通权限】 或使用【批量导入/导出权限】,选择导入,输入以下内容,如下图 点击【下一步,确认新增权限】即可开通所需要的权限。 配置事件与回调 说明:这一步的配置需要先讲AppId和AppSecret配置到openclaw成功之后再设置订阅方式,