RabbitMQ 消息中间件详解

RabbitMQ 消息中间件详解

RabbitMQ 消息中间件详解

文章目录

什么是MQ

MQ(Message Queue,消息队列) 是一种跨进程的异步通信机制,基于队列的先进先出(FIFO)原则来传递消息。它实现了应用程序之间的解耦,使得发送方和接收方不需要同时在线或直接通信。

核心特性

  • 异步通信:发送方发送消息后无需等待接收方处理
  • 应用解耦:系统之间通过消息通信,降低耦合度
  • 流量削峰:缓冲瞬时高峰流量,保护后端系统
  • 可靠性:支持消息持久化、确认机制保证消息不丢失

为什么要用MQ

1. 流量削峰

场景:电商秒杀、抢购活动等瞬时高并发场景

// 不使用MQ:系统直接承受压力,容易崩溃publicvoidcreateOrder(OrderRequest request){// 直接处理订单,高峰期可能导致系统崩溃 orderService.process(request);}// 使用MQ:缓冲请求,平滑处理publicvoidcreateOrder(OrderRequest request){// 消息入队,立即返回响应 rabbitTemplate.convertAndSend("order-exchange","order.create", request);returnResponse.success("订单提交成功,正在处理中");}

2. 应用解耦

场景:订单系统需要通知库存、物流、支付等多个系统

// 紧耦合架构:一个系统故障影响整体publicvoidcreateOrder(Order order){// 同步调用各个系统 inventoryService.reduceStock(order);// 库存系统故障 → 订单失败 logisticsService.createDelivery(order);// 物流系统故障 → 订单失败 paymentService.processPayment(order);// 支付系统故障 → 订单失败}// 松耦合架构:通过MQ异步通信publicvoidcreateOrder(Order order){ orderService.save(order);// 发送消息到MQ,各系统独立消费 rabbitTemplate.convertAndSend("order-event","order.created", order);// 订单系统不关心其他系统处理结果}

3. 异步处理

场景:耗时操作不影响主流程,例如A调用B,B需要花费时间去执行,但是A需要知道B什么时候可以执行完。

// 同步处理:用户需要等待所有操作完成publicResponseplaceOrder(Order order){// 1. 验证订单(快速)validateOrder(order);// 2. 保存订单(快速)saveOrder(order);// 3. 发送短信通知(慢,可能阻塞)sendSms(order.getPhone());// 4. 发送邮件(慢,可能阻塞)sendEmail(order.getEmail());// 用户需要等待所有操作完成returnResponse.success();}// 异步处理:快速返回,后台处理publicResponseplaceOrder(Order order){validateOrder(order);saveOrder(order);// 异步发送通知 rabbitTemplate.convertAndSend("notification-exchange","order.notify",newNotification(order));returnResponse.success("订单已提交");}

MQ的分类

1. ActiveMQ

  • 特点:完全符合JMS规范,支持多种协议
  • 适用场景:传统企业应用,需要JMS标准支持
  • 优点:功能全面,支持多种语言客户端
  • 缺点:性能一般,社区活跃度下降

2. Kafka

  • 特点:高吞吐量,分布式,分区化,可水平扩展
  • 适用场景:日志收集、大数据处理、实时数据管道
  • 架构特点
    • 基于Pull模式消费
    • 分区存储,支持并行处理
    • 消息持久化到磁盘
  • 性能指标:单机可支持10万+ TPS

3. RabbitMQ

  • 特点:基于AMQP协议,功能丰富,管理界面友好
  • 适用场景:企业级应用,需要复杂路由、可靠性高的场景
  • 核心优势
    • 消息确认机制完善
    • 支持多种交换机类型
    • 丰富的插件生态系统
    • 管理界面功能强大

4. RocketMQ

  • 特点:阿里开源,高可用,高可靠,金融级
  • 适用场景:电商、金融等对可靠性要求高的场景
  • 特性
    • 支持事务消息
    • 消息轨迹追踪
    • 丰富的消息过滤机制
    • 支持定时/延迟消息

MQ的选择

特性RabbitMQKafkaRocketMQActiveMQ
吞吐量中等(万级)极高(十万级)高(十万级)低(千级)
延迟微秒级毫秒级毫秒级毫秒级
可靠性高(ACK机制)高(副本机制)非常高(金融级)
事务支持支持不支持支持支持
消息顺序队列内有序分区内有序队列内有序队列内有序
管理界面优秀一般一般一般
社区活跃非常高
学习曲线中等中等

选择建议

  1. 快速上手RabbitMQ
    • 功能完善,管理方便
    • 社区活跃,文档丰富
    • 适合大多数业务场景
  2. 大数据、日志处理Kafka
    • 高吞吐量,适合海量数据
    • 支持流式处理
    • 生态系统丰富
  3. 金融、电商等高可靠性场景RocketMQ
    • 经过双11考验
    • 事务消息支持
    • 消息轨迹追踪
  4. 传统JMS应用迁移ActiveMQ
    • 完全兼容JMS
    • 迁移成本低

RabbitMQ安装部署

环境要求

  • CentOS 7.x 或 Ubuntu 18.04+
  • Erlang 21.3+
  • RabbitMQ 3.8+

单机安装(CentOS 7)

# 1. 安装依赖 yum install-y epel-release yum install-y socat logrotate# 2. 安装Erlang# 下载地址:https://github.com/rabbitmq/erlang-rpm/releasesrpm-ivh erlang-23.3.4.4-1.el7.x86_64.rpm # 3. 安装RabbitMQ# 下载地址:https://github.com/rabbitmq/rabbitmq-server/releasesrpm-ivh rabbitmq-server-3.8.19-1.el7.noarch.rpm # 4. 启动服务 systemctl start rabbitmq-server systemctl enable rabbitmq-server # 5. 开启Web管理插件 rabbitmq-plugins enable rabbitmq_management # 6. 创建管理员用户 rabbitmqctl add_user admin your_password rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p / admin ".*"".*"".*"# 7. 删除默认guest用户(安全考虑) rabbitmqctl delete_user guest # 8. 开放防火墙端口 firewall-cmd --zone=public --add-port=5672/tcp --permanent# AMQP协议 firewall-cmd --zone=public --add-port=15672/tcp --permanent# 管理界面 firewall-cmd --reload# 9. 访问管理界面# http://your-server-ip:15672# 用户名:admin,密码:your_password

Docker安装(推荐)

# 拉取rabbitMQdocker pull rabbitmq:3.8-management 或者 docker pull rabbitmq:management # 1. 创建数据目录mkdir-p /docker/rabbitmq/{data,log,conf}chmod-R777 /docker/rabbitmq # 2. 创建配置文件cat> /docker/rabbitmq/conf/rabbitmq.conf <<EOF # 默认虚拟主机 default_vhost = / # 默认用户 default_user = admin default_pass = admin # 启用管理插件 management.tcp.port = 15672 management.tcp.ip = 0.0.0.0 # 允许远程访问 loopback_users = none EOF# 3. 启动容器docker run -d\--name rabbitmq \--restart always \-p5672:5672 \-p15672:15672 \-v /docker/rabbitmq/data:/var/lib/rabbitmq \-v /docker/rabbitmq/log:/var/log/rabbitmq \-v /docker/rabbitmq/conf:/etc/rabbitmq \-eRABBITMQ_DEFAULT_USER=admin \-eRABBITMQ_DEFAULT_PASS=admin \ rabbitmq:3.8-management # 4. 查看日志docker logs -f rabbitmq # 访问 你的IP地址:15672,如果无法访问,进入RabbitMQ容器开启控制台管理# 进入RabbitMQ容器dockerexec-it rabbitmq bash# 开启控制台管理 rabbitmq-plugins enable rabbitmq_management 

常用管理命令

# 服务管理 systemctl status rabbitmq-server # 查看状态 systemctl start rabbitmq-server # 启动 systemctl stop rabbitmq-server # 停止 systemctl restart rabbitmq-server # 重启# 用户管理 rabbitmqctl list_users # 查看用户 rabbitmqctl add_user username password # 添加用户 rabbitmqctl delete_user username # 删除用户 rabbitmqctl change_password username newpass # 修改密码# 虚拟主机管理 rabbitmqctl list_vhosts # 查看虚拟主机 rabbitmqctl add_vhost /myvhost # 添加虚拟主机 rabbitmqctl delete_vhost /myvhost # 删除虚拟主机# 权限管理 rabbitmqctl set_permissions -p / username ".*"".*"".*"# 设置权限 rabbitmqctl set_user_tags 用户名 administrator # 授权账号管理员权限 rabbitmqctl clear_permissions -p / username # 清除权限# 队列管理 rabbitmqctl list_queues name messages consumers # 查看队列 rabbitmqctl purge_queue queue_name # 清空队列# 插件管理 rabbitmq-plugins list # 查看插件 rabbitmq-plugins enable plugin_name # 启用插件 rabbitmq-plugins disable plugin_name # 禁用插件

RabbitMQ核心概念

1. Message(消息)

消息由消息头消息体组成:

// 消息头包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等// 消息头示例MessageProperties properties =newMessageProperties(); properties.setContentType("application/json"); properties.setContentEncoding("UTF-8"); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化 properties.setPriority(5);// 优先级 properties.setExpiration("10000");// TTL 10秒 properties.setHeader("custom-header","value");// 自定义头部// 消息体byte[] body ="{\"id\":1,\"name\":\"test\"}".getBytes();Message message =newMessage(body, properties);

2. Publisher(生产者)

向Exchange发送消息的客户端:

@ComponentpublicclassMessageProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendOrderMessage(Order order){// 发送消息到交换机 rabbitTemplate.convertAndSend("order-exchange",// 交换机名称"order.created",// 路由键 order,// 消息内容 message ->{// 设置消息属性 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setPriority(5);return message;});}}

3. Exchange(交换机)

接收生产者消息,根据路由键将消息路由到一个或多个队列:

Exchange属性

  • Name:交换机名称
  • Type:交换机类型(direct(point-to-point), fanout(multicast), topic(publish-subscibe), headers)
  • Durability:是否持久化(重启后是否保留)
  • Auto-delete:当所有队列都不再使用时是否自动删除
  • Arguments:扩展参数

4. Queue(队列)

存储消息的缓冲区:

@BeanpublicQueueorderQueue(){Map<String,Object> args =newHashMap<>(); args.put("x-max-length",10000);// 最大消息数 args.put("x-max-length-bytes",104857600);// 最大容量100MB args.put("x-message-ttl",60000);// 消息TTL 60秒 args.put("x-dead-letter-exchange","dlx-exchange");// 死信交换机 args.put("x-dead-letter-routing-key","dlx.routingkey");returnnewQueue("order.queue",true,// 持久化false,// 非排他false,// 不自动删除 args);// 参数}

5. Binding(绑定)

Exchange和Queue之间的关联规则:

@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.*");// 路由键匹配模式}

6. Connection & Channel(连接和通道)

  • Connection:TCP连接,开销较大,应复用。(publisher/consumer 和broker之间的TCP链接)
  • Channel:虚拟连接,复用Connection,轻量级
// 最佳实践:一个应用一个Connection,每个线程一个Channel@BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactory factory =newCachingConnectionFactory(); factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/"); factory.setPort(5672); factory.setConnectionTimeout(30000);// 30秒连接超时// 连接池配置 factory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); factory.setChannelCacheSize(25);// 通道缓存数量 factory.setChannelCheckoutTimeout(2000);// 获取通道超时时间return factory;}

7. Consumer(消费者)

从队列获取并处理消息:

@ComponentpublicclassOrderConsumer{@RabbitListener(queues ="order.queue")@RabbitHandlerpublicvoidprocessOrder(Order order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){try{// 处理订单逻辑 orderService.process(order);// 手动确认消息 channel.basicAck(deliveryTag,false);}catch(Exception e){// 处理失败,根据业务决定重试或进入死信队列 channel.basicNack(deliveryTag,false,false);}}}

8. Virtual Host(虚拟主机)

虚拟主机表示一批交换机,消息队列和相关对象。vhost是AMQP概念的基础,必须在连接时指定;RabbitMQ默认指定的vhost为/

AMQP的基本组件划分到一个虚拟分组中,类似于网络中namespace概念,当多个不同用户使用同一个RabbitMQ Server提供服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等

逻辑隔离,类似命名空间:

# 创建虚拟主机 rabbitmqctl add_vhost /prod rabbitmqctl add_vhost /test # 为用户分配虚拟主机权限 rabbitmqctl set_permissions -p /prod user1 ".*"".*"".*" rabbitmqctl set_permissions -p /test user2 ".*"".*"".*"

9. Broker(代理服务器)

RabbitMQ服务器实例,负责接收、存储和转发消息。

RabbitMQ工作原理

整体架构

生产者 (Publisher) ↓ (发布消息) 交换机 (Exchange) ↓ (根据路由规则) 队列 (Queue) ↓ (消费消息) 消费者 (Consumer) 

详细工作流程

  1. 生产者连接Broker,建立Connection和Channel
  2. 生产者声明Exchange(如果不存在则创建)
  3. 生产者发送消息到Exchange,指定Routing Key
  4. Exchange根据类型和Binding规则路由消息到Queue
  5. Queue存储消息,等待消费者拉取
  6. 消费者连接Broker,订阅Queue
  7. Broker推送消息给消费者(或消费者主动拉取)
  8. 消费者处理消息并发送确认(ACK)
  9. Broker收到ACK后删除消息

消息流转示例

// 1. 生产者发送消息 rabbitTemplate.convertAndSend("direct.exchange","user.register", user);// 2. Exchange根据路由键找到匹配的Queue// Binding: direct.exchange → user.register → user.queue// 3. 消息存入user.queue// 4. 消费者消费消息@RabbitListener(queues ="user.queue")publicvoidhandleUserRegister(User user){ userService.register(user);}

交换机类型详解

1. Direct Exchange(直连交换机)

发给交换机的消息,里面的RoutingKey必须和binding时候指定的RoutingKey完全一致才能发到对应队列。Direct模式就是指定队列模式,消息来了,只能给指定的Queue,其它Queue都收不到。

  • 将消息转给和消息中的RoutingKey一致绑定关系的队列

特点:精确匹配Routing Key

@ConfigurationpublicclassDirectExchangeConfig{@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct.exchange",true,false);}@BeanpublicQueueemailQueue(){returnnewQueue("email.queue",true);}@BeanpublicQueuesmsQueue(){returnnewQueue("sms.queue",true);}@BeanpublicBindingemailBinding(){returnBindingBuilder.bind(emailQueue()).to(directExchange()).with("email.send");// 精确匹配}@BeanpublicBindingsmsBinding(){returnBindingBuilder.bind(smsQueue()).to(directExchange()).with("sms.send");// 精确匹配}}// 使用:发送到指定队列 rabbitTemplate.convertAndSend("direct.exchange","email.send", emailMessage); rabbitTemplate.convertAndSend("direct.exchange","sms.send", smsMessage);

2. Fanout Exchange(扇出交换机)

发给交换机的数据,会转给该交换机的所有的对列。即使RoutingKey不满足也会转发。这是广播的模式,fanout模式就是广播的模式,消息来了,会发给所有的队列。

  • 广播,转给该交换机绑定的所有队列

特点:广播模式,忽略Routing Key

@ConfigurationpublicclassFanoutExchangeConfig{@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout.exchange",true,false);}@BeanpublicQueuelogQueue(){returnnewQueue("log.queue",true);}@BeanpublicQueueauditQueue(){returnnewQueue("audit.queue",true);}@BeanpublicQueuebackupQueue(){returnnewQueue("backup.queue",true);}@BeanpublicBindinglogBinding(){returnBindingBuilder.bind(logQueue()).to(fanoutExchange());}@BeanpublicBindingauditBinding(){returnBindingBuilder.bind(auditQueue()).to(fanoutExchange());}@BeanpublicBindingbackupBinding(){returnBindingBuilder.bind(backupQueue()).to(fanoutExchange());}}// 使用:所有绑定的队列都会收到消息 rabbitTemplate.convertAndSend("fanout.exchange","", auditEvent);// routing key被忽略

3. Topic Exchange(主题交换机)

发给交换机的数据会根据里面的RoutingKey转给该交换机下满足关系binding的模式匹配的队列

  • 将消息转给能匹配上绑定关系的RoutingKey对应的队列

特点:模式匹配Routing Key

@ConfigurationpublicclassTopicExchangeConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("topic.exchange",true,false);}@BeanpublicQueuechinaQueue(){returnnewQueue("china.queue",true);}@BeanpublicQueueusaQueue(){returnnewQueue("usa.queue",true);}@BeanpublicQueueallQueue(){returnnewQueue("all.queue",true);}@BeanpublicBindingchinaBinding(){// 匹配 china.开头的路由键returnBindingBuilder.bind(chinaQueue()).to(topicExchange()).with("china.*");}@BeanpublicBindingusaBinding(){// 匹配 usa.开头的路由键returnBindingBuilder.bind(usaQueue()).to(topicExchange()).with("usa.*");}@BeanpublicBindingallBinding(){// 匹配所有订单相关的路由键returnBindingBuilder.bind(allQueue()).to(topicExchange()).with("*.order.#");}}// 使用示例 rabbitTemplate.convertAndSend("topic.exchange","china.order.create", order);// → china.queue, all.queue rabbitTemplate.convertAndSend("topic.exchange","usa.order.pay", payment);// → usa.queue, all.queue rabbitTemplate.convertAndSend("topic.exchange","eu.order.cancel", cancel);// → all.queue
通配符说明
  • *:匹配一个单词
  • #:匹配零个或多个单词
  • 单词以.分隔
Routing KeyBinding Key是否匹配
china.order.createchina.*
china.order.create.order.
china.order.create#
china.orderchina.*
china.order.pay.successchina.*✗(三个单词)
在这里插入图片描述

4. Headers Exchange(头部交换机)

特点:根据消息Header匹配,忽略Routing Key

@ConfigurationpublicclassHeadersExchangeConfig{@BeanpublicHeadersExchangeheadersExchange(){returnnewHeadersExchange("headers.exchange",true,false);}@BeanpublicQueuevipQueue(){returnnewQueue("vip.queue",true);}@BeanpublicQueuenormalQueue(){returnnewQueue("normal.queue",true);}@BeanpublicBindingvipBinding(){Map<String,Object> headerMap =newHashMap<>(); headerMap.put("user-type","vip"); headerMap.put("x-match","any");// any: 匹配任意一个;all: 匹配所有returnBindingBuilder.bind(vipQueue()).to(headersExchange()).whereAny(headerMap).match();}@BeanpublicBindingnormalBinding(){Map<String,Object> headerMap =newHashMap<>(); headerMap.put("user-type","normal"); headerMap.put("x-match","any");returnBindingBuilder.bind(normalQueue()).to(headersExchange()).whereAny(headerMap).match();}}// 使用:设置消息头部 rabbitTemplate.convertAndSend("headers.exchange","", message, m ->{ m.getMessageProperties().setHeader("user-type","vip"); m.getMessageProperties().setHeader("priority","high");return m;});

交换机选择指南

场景推荐交换机理由
一对一精确路由Direct简单直接,性能好
广播通知所有系统Fanout无需路由键,所有绑定队列都收到
分类处理(如按地域、类型)Topic灵活的模式匹配
根据消息属性路由Headers基于Header内容路由
延迟消息Delayed插件提供,支持定时消息

SpringBoot与RabbitMQ整合

1. 项目配置

依赖引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><!-- version可以不写,spring-boot-starter-amqp 是一个由 Spring Boot 官方管理的启动器,它会根据你当前使用的 Spring Boot 版本,自动引入一个兼容且稳定的 Spring AMQP 版本 --><!-- <version>2.7.0</version> --></dependency><!-- 可选:用于JSON序列化 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
配置文件
spring:application:name: order-service rabbitmq:# 连接配置host: 192.168.1.100 port:5672username: admin password: your_password virtual-host: / ############# 看自己需要写下面的 ##################### 连接池配置connection-timeout:30000# 连接超时30秒cache:channel:size:25# 通道缓存数量checkout-timeout:2000# 获取通道超时时间# 生产者配置publisher-confirm-type: correlated # 发布确认publisher-returns:true# 返回回调template:mandatory:true# 消息不可达时返回retry:enabled:true# 发送重试initial-interval: 1000ms # 初始间隔max-interval: 10000ms # 最大间隔multiplier:2.0# 倍数max-attempts:3# 最大尝试次数# 消费者配置listener:simple:acknowledge-mode: manual # 手动确认prefetch:10# 每次拉取数量concurrency:5# 最小消费者数max-concurrency:10# 最大消费者数retry:enabled:true# 消费失败重试max-attempts:3# 最大重试次数initial-interval: 1000ms # 重试间隔default-requeue-rejected:false# 不重新入队# SSL配置(可选)ssl:enabled:falsekey-store: classpath:keystore.jks key-store-password: secret key-store-type: JKS 

2. 配置类

通用配置
importjakarta.annotation.Resource;importorg.springframework.amqp.core.AcknowledgeMode;importorg.springframework.amqp.rabbit.config.RetryInterceptorBuilder;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.boot.autoconfigure.amqp.RabbitProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.retry.RetryCallback;importorg.springframework.retry.RetryContext;importorg.springframework.retry.RetryListener;importorg.springframework.retry.backoff.ExponentialBackOffPolicy;importorg.springframework.retry.policy.SimpleRetryPolicy;importorg.springframework.retry.support.RetryTemplate;// 常用的三个配置如下// 1---设置手动应答(acknowledge-mode: manual)// 2---设置生产者消息发送的确认回调机制 ( # 这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调// publisher-confirm-type: correlated// # 保证交换机能把消息推送到队列中// publisher-returns: true// template:// # 以下是rabbitmqTemplate配置// mandatory: true)// 3---设置重试@ConfigurationpublicclassRabbitMQConfig{@ResourceprivateConnectionFactory rabbitConnectionFactory;//@Bean 缓存连接池//public CachingConnectionFactory rabbitConnectionFactory@ResourceprivateRabbitProperties properties;/** * 连接工厂配置 * 这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉 * 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称 */// @Bean// public ConnectionFactory connectionFactory() throws Exception {// // 创建工厂类// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();// // 用户名// cachingConnectionFactory.setUsername("gust");// // 密码// cachingConnectionFactory.setPassword("gust");// // rabbitMQ地址// cachingConnectionFactory.setHost("127.0.0.1");// // rabbitMQ端口// cachingConnectionFactory.setPort(Integer.parseInt("5672"));//// // 设置发布消息后回调// cachingConnectionFactory.setPublisherReturns(true);// // 设置发布后确认类型,此处确认类型为交互// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);// return cachingConnectionFactory;// }// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){SimpleRabbitListenerContainerFactory containerFactory =newSimpleRabbitListenerContainerFactory(); containerFactory.setConnectionFactory(rabbitConnectionFactory);// 并发消费者数量 containerFactory.setConcurrentConsumers(1); containerFactory.setMaxConcurrentConsumers(20);// 预加载消息数量 -- QOS containerFactory.setPrefetchCount(1);// 应答模式(此处设置为手动) containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消息序列化方式 containerFactory.setMessageConverter(newJackson2JsonMessageConverter());// 设置通知调用链 (这里设置的是重试机制的调用链) containerFactory.setAdviceChain(RetryInterceptorBuilder.stateless().recoverer(newRejectAndDontRequeueRecoverer()).retryOperations(rabbitRetryTemplate()).build());return containerFactory;}/** * RabbitTemplate配置 * 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称 */@BeanpublicRabbitTemplaterabbitTemplate(){RabbitTemplate rabbitTemplate =newRabbitTemplate(rabbitConnectionFactory);// 默认是用jdk序列化// 数据转换为json存入消息队列,方便可视化界面查看消息数据 rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true);// 此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链 rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());// CorrelationData correlationData, boolean b, String s rabbitTemplate.setConfirmCallback((correlationData, b, s)->{System.out.println("ConfirmCallback "+"相关数据:"+ correlationData);System.out.println("ConfirmCallback "+"确认情况:"+ b);System.out.println("ConfirmCallback "+"原因:"+ s);});// RabbitTemplate 在较新版本的 Spring AMQP 中已废弃 setReturnCallback 方法,改用 setReturnsCallback 方法。 rabbitTemplate.setReturnsCallback((returnMessage)->{System.out.println("ReturnCallback: "+"消息:"+ returnMessage.getMessage());System.out.println("ReturnCallback: "+"回应码:"+ returnMessage.getReplyCode());System.out.println("ReturnCallback: "+"回应消息:"+ returnMessage.getReplyText());System.out.println("ReturnCallback: "+"交换机:"+ returnMessage.getExchange());System.out.println("ReturnCallback: "+"路由键:"+ returnMessage.getRoutingKey());});return rabbitTemplate;}// 重试的Template@BeanpublicRetryTemplaterabbitRetryTemplate(){RetryTemplate retryTemplate =newRetryTemplate();// 设置监听 调用重试处理过程 retryTemplate.registerListener(newRetryListener(){@Overridepublic<T,EextendsThrowable>booleanopen(RetryContext retryContext,RetryCallback<T,E> retryCallback){// 执行之前调用 (返回false时会终止执行)returntrue;}@Overridepublic<T,EextendsThrowable>voidclose(RetryContext retryContext,RetryCallback<T,E> retryCallback,Throwable throwable){// 重试结束的时候调用 (最后一次重试 )System.out.println("---------------最后一次调用");}@Overridepublic<T,EextendsThrowable>voidonError(RetryContext retryContext,RetryCallback<T,E> retryCallback,Throwable throwable){// 异常 都会调用System.err.println("-----第{}次调用"+ retryContext.getRetryCount());}}); retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); retryTemplate.setRetryPolicy(retryPolicyByProperties());return retryTemplate;}@BeanpublicExponentialBackOffPolicybackOffPolicyByProperties(){ExponentialBackOffPolicy backOffPolicy =newExponentialBackOffPolicy();long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();// 重试间隔 backOffPolicy.setInitialInterval(initialInterval *1000);// 重试最大间隔 backOffPolicy.setMaxInterval(maxInterval *1000);// 重试间隔乘法策略 backOffPolicy.setMultiplier(multiplier);return backOffPolicy;}@BeanpublicSimpleRetryPolicyretryPolicyByProperties(){SimpleRetryPolicy retryPolicy =newSimpleRetryPolicy();int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts(); retryPolicy.setMaxAttempts(maxAttempts);return retryPolicy;}}
交换机与队列声明
importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassOrderRabbitConfig{// ==================== 订单相关 ====================@BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange("order.exchange",true,false);}@BeanpublicQueueorderQueue(){Map<String,Object> args =newHashMap<>();// 死信队列配置 args.put("x-dead-letter-exchange","dlx.exchange"); args.put("x-dead-letter-routing-key","dlx.order");// 队列最大长度 args.put("x-max-length",10000);// 消息TTL args.put("x-message-ttl",300000);// 5分钟returnnewQueue("order.queue",true,false,false, args);}@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.create");}// ==================== 支付相关 ====================@BeanpublicTopicExchangepaymentExchange(){returnnewTopicExchange("payment.exchange",true,false);}@BeanpublicQueuepaymentSuccessQueue(){returnnewQueue("payment.success.queue",true);}@BeanpublicQueuepaymentFailQueue(){returnnewQueue("payment.fail.queue",true);}@BeanpublicBindingpaymentSuccessBinding(){returnBindingBuilder.bind(paymentSuccessQueue()).to(paymentExchange()).with("payment.success.#");}@BeanpublicBindingpaymentFailBinding(){returnBindingBuilder.bind(paymentFailQueue()).to(paymentExchange()).with("payment.fail.#");}// ==================== 死信队列 ====================@BeanpublicDirectExchangedlxExchange(){returnnewDirectExchange("dlx.exchange",true,false);}@BeanpublicQueuedlxQueue(){returnnewQueue("dlx.queue",true);}@BeanpublicBindingdlxBinding(){returnBindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.#");}}

3. 生产者示例

importcom.example.test.pojo.entity.Order;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.MessageDeliveryMode;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.util.Date;importjava.util.UUID;/** * 生产者示例 */@Component@Slf4jpublicclassOrderMessageProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;/** * 发送订单创建消息 */publicvoidsendOrderCreate(Order order){try{CorrelationData correlationData =newCorrelationData(order.getOrderNo()); rabbitTemplate.convertAndSend("order.exchange","order.create", order, message ->{// 设置消息属性MessageProperties properties = message.getMessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setPriority(5); properties.setMessageId(UUID.randomUUID().toString()); properties.setCorrelationId(order.getOrderNo()); properties.setTimestamp(newDate());// 设置过期时间(5分钟) properties.setExpiration("300000");return message;}, correlationData ); log.info("订单消息发送成功: {}", order.getOrderNo());}catch(Exception e){ log.error("订单消息发送失败: {}", order.getOrderNo(), e);// 可以保存到本地数据库,定时重试saveFailedMessage(order);}}/** * 发送延迟消息(使用延迟插件) */publicvoidsendDelayMessage(Object message,int delaySeconds){ rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey", message, m ->{// MessageProperties 类中没有 setDelay 方法,该方法属于 RabbitMQ 延迟插件的扩展功能,// 使用 MessagePostProcessor 设置延迟消息时,应通过 MessageProperties.setHeader() 方法添加 x-delay 头部来实现延迟功能。 m.getMessageProperties().setHeader("x-delay", delaySeconds *1000);return m;});}privatevoidsaveFailedMessage(Order order){// 实现保存失败消息的逻辑}}

实体类Order创建:

packagecom.example.test.pojo.entity;importlombok.*;@Data@NoArgsConstructor@AllArgsConstructor@Setter@GetterpublicclassOrder{privateInteger id;privateString orderNo;}

OrderService接口创建:

packagecom.example.test.service;importcom.baomidou.mybatisplus.extension.service.IService;importcom.example.test.pojo.entity.Order;publicinterfaceOrderServiceextendsIService<Order>{voidprocessOrder(Order order);}

OrderServiceImpl创建:

packagecom.example.test.service.impl;importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.example.test.mapper.OrderMapper;importcom.example.test.pojo.entity.Order;importcom.example.test.service.OrderService;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;@Service@Slf4jpublicclassOrderServiceImplextendsServiceImpl<OrderMapper,Order>implementsOrderService{@OverridepublicvoidprocessOrder(Order order){ log.info("执行processOrder方法。。。");}}

OrderMapper接口创建:

packagecom.example.test.mapper;importcom.baomidou.mybatisplus.core.mapper.BaseMapper;importcom.example.test.pojo.entity.Order;publicinterfaceOrderMapperextendsBaseMapper<Order>{}

4. 消费者示例

packagecom.example.test.consumer;importcom.example.test.exception.BusinessException;importcom.example.test.pojo.entity.Order;importcom.example.test.service.OrderService;importcom.rabbitmq.client.Channel;importjakarta.annotation.Resource;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.List;// 添加正确的JSON解析库导入importcom.fasterxml.jackson.databind.ObjectMapper;/** * 消费者示例 */@Component@Slf4jpublicclassOrderMessageConsumer{@ResourceprivateOrderService orderService;// 添加ObjectMapper实例privatefinalObjectMapper objectMapper =newObjectMapper();/** * 消费订单创建消息 */@RabbitListener( queues ="order.queue", concurrency ="3-5",// 最小3个,最大5个消费者 containerFactory ="rabbitListenerContainerFactory")@RabbitHandlerpublicvoidhandleOrderCreate(Order order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag)throwsIOException{ log.info("收到订单消息: {}", order.getOrderNo());try{// 业务处理 orderService.processOrder(order);// 手动确认消息 channel.basicAck(deliveryTag,false); log.info("订单处理成功: {}", order.getOrderNo());}catch(BusinessException e){// 业务异常,记录日志,进入死信队列 log.error("订单处理业务异常: {}", order.getOrderNo(), e); channel.basicNack(deliveryTag,false,false);}catch(Exception e){// 系统异常,重试 log.error("订单处理系统异常: {}", order.getOrderNo(), e);// 获取重试次数Integer retryCount =getRetryCount(channel, deliveryTag);if(retryCount <3){// 重新入队,等待重试 channel.basicNack(deliveryTag,false,true);}else{// 超过重试次数,进入死信队列 channel.basicNack(deliveryTag,false,false); log.error("订单处理超过重试次数: {}", order.getOrderNo());}}}/** * 批量消费 */@RabbitListener( queues ="batch.queue", containerFactory ="batchContainerFactory")publicvoidhandleBatchMessages(List<Message> messages,Channel channel){for(Message message : messages){try{// 修复JSON解析错误Order order = objectMapper.readValue(message.getBody(),Order.class); orderService.processOrder(order);// 批量确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){ log.error("批量处理消息失败", e);// 可以记录失败的消息ID,后续处理}}}privateIntegergetRetryCount(Channel channel,long deliveryTag){// 从消息头获取重试次数// 实际实现需要从消息属性中读取return0;}}

创建BusinessException这个异常类

packagecom.example.test.exception;publicclassBusinessExceptionextendsRuntimeException{privateString errorCode;privateObject[] args;// 默认构造函数publicBusinessException(){super();}// 带错误信息的构造函数publicBusinessException(String message){super(message);}// 带错误码和错误信息的构造函数publicBusinessException(String errorCode,String message){super(message);this.errorCode = errorCode;}// 带错误信息和原因的构造函数publicBusinessException(String message,Throwable cause){super(message, cause);}// 带错误码、错误信息和原因的构造函数publicBusinessException(String errorCode,String message,Throwable cause){super(message, cause);this.errorCode = errorCode;}// 带参数占位符的构造函数(用于国际化)publicBusinessException(String errorCode,String message,Object[] args){super(message);this.errorCode = errorCode;this.args = args;}publicStringgetErrorCode(){return errorCode;}publicvoidsetErrorCode(String errorCode){this.errorCode = errorCode;}publicObject[]getArgs(){return args;}publicvoidsetArgs(Object[] args){this.args = args;}}

5. 批量消费配置

packagecom.example.test.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;@ConfigurationpublicclassBatchConsumerConfig{/** * 配置JSON消息转换器 */@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}@BeanpublicSimpleRabbitListenerContainerFactorybatchContainerFactory(ConnectionFactory connectionFactory,MessageConverter messageConverter){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(messageConverter); factory.setBatchListener(true);// 开启批量监听// 批量消费配置 factory.setBatchSize(100);// 每批最大消息数 factory.setConsumerBatchEnabled(true); factory.setReceiveTimeout(5000L);// 接收超时时间 factory.setDeBatchingEnabled(true);// 开启解批处理// 消费者配置 factory.setConcurrentConsumers(5); factory.setMaxConcurrentConsumers(10); factory.setPrefetchCount(500);// 预取数量return factory;}}

6. 启动类配置

Spring Boot 认识到健康检查的重要性,提供了一种通过其 Actuator 模块实现健康检查的无缝方法。如果您已有 Spring Boot 项目,可以手动添加 Actuator 依赖。该模块提供了生产就绪的功能,包括我们关注的健康检查机制。

在您的 中包含以下依赖项pom.xml:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
@SpringBootApplication@EnableRabbitpublicclassOrderServiceApplication{publicstaticvoidmain(String[] args){SpringApplication.run(OrderServiceApplication.class, args);}/** * 监控RabbitMQ健康状态 * (可选) */@ComponentpublicclassRabbitMQHealthIndicatorimplementsHealthIndicator{@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicHealthhealth(){try{// 尝试获取连接Connection connection = rabbitTemplate.getConnectionFactory().createConnection(); connection.close();returnHealth.up().withDetail("message","RabbitMQ连接正常").build();}catch(Exception e){returnHealth.down().withDetail("error", e.getMessage()).build();}}}}

消息可靠性保障

消息可靠性保障架构

生产者 → [确认机制] → Broker → [持久化] → 磁盘 → [ACK机制] → 消费者 

1. 生产者消息确认

ConfirmCallback(发布确认)
packagecom.example.test.callback;importcom.example.test.service.FailedMessageService;importjakarta.annotation.PostConstruct;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;@Component@Slf4jpublicclassRabbitConfirmCallbackimplementsRabbitTemplate.ConfirmCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateFailedMessageService failedMessageService;@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){if(ack){ log.info("消息发送成功: {}", correlationData.getId());// 可以更新数据库状态 failedMessageService.updateStatus(correlationData.getId(),"SENT");}else{ log.error("消息发送失败: {}, cause: {}", correlationData.getId(), cause);// 记录失败消息,定时重试 failedMessageService.saveFailedMessage(correlationData, cause);}}@PostConstructpublicvoidinit(){ rabbitTemplate.setConfirmCallback(this);}}

简单实现FailedMessageService接口:

packagecom.example.test.service;importorg.springframework.amqp.rabbit.connection.CorrelationData;publicinterfaceFailedMessageService{/** * 更新消息状态 * @param messageId 消息ID * @param status 状态 */voidupdateStatus(String messageId,String status);/** * 保存失败消息 * @param correlationData 关联数据 * @param cause 失败原因 */voidsaveFailedMessage(CorrelationData correlationData,String cause);}

简单实现FailedMessageServiceImpl:

packagecom.example.test.service.impl;importcom.example.test.service.FailedMessageService;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.stereotype.Service;@Service@Slf4jpublicclassFailedMessageServiceImplimplementsFailedMessageService{@OverridepublicvoidupdateStatus(String messageId,String status){ log.info("更新消息状态: messageId={}, status={}", messageId, status);// 这里可以实现具体的数据库更新逻辑// 例如: messageRepository.updateStatus(messageId, status);}@OverridepublicvoidsaveFailedMessage(CorrelationData correlationData,String cause){ log.error("保存失败消息: messageId={}, cause={}", correlationData.getId(), cause);// 这里可以实现具体的失败消息保存逻辑// 例如: failedMessageRepository.save(new FailedMessage(correlationData, cause));}}
ReturnsCallback(返回回调)
packagecom.example.test.callback;importjakarta.annotation.PostConstruct;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassRabbitReturnsCallbackimplementsRabbitTemplate.ReturnsCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicvoidreturnedMessage(ReturnedMessage returned){ log.error("消息路由失败: {}", returned.getMessage()); log.error("Exchange: {}, RoutingKey: {}, ReplyCode: {}, ReplyText: {}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText());// 处理无法路由的消息handleUnroutedMessage(returned);}privatevoidhandleUnroutedMessage(ReturnedMessage returned){// 1. 记录到数据库// 2. 发送到备用队列// 3. 发送告警通知}@PostConstructpublicvoidinit(){ rabbitTemplate.setReturnsCallback(this);}}
事务消息(不推荐,性能差)
@TransactionalpublicvoidsendWithTransaction(Order order){// 开启事务 rabbitTemplate.setChannelTransacted(true);try{// 发送消息 rabbitTemplate.convertAndSend("order.exchange","order.create", order);// 业务操作 orderService.save(order);// 提交事务(Spring会自动管理)}catch(Exception e){// 回滚事务TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();throw e;}}

2. Broker消息持久化

队列和消息持久化
@BeanpublicQueuedurableQueue(){// 队列持久化returnnewQueue("durable.queue",true);// 第二个参数:durable=true}// 发送持久化消息publicvoidsendPersistentMessage(Object message){ rabbitTemplate.convertAndSend("exchange","routingKey", message, m ->{// 设置消息持久化 m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return m;});}
镜像队列(高可用)
# 配置镜像队列策略 rabbitmqctl set_policy ha-all "^ha\."'{"ha-mode":"all","ha-sync-mode":"automatic"}'# 查看队列镜像状态 rabbitmqctl list_queues name policy slave_nodes synchronised_slave_nodes 

3. 消费者消息确认

自动确认(不推荐)
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动确认
手动确认(推荐)
@ComponentpublicclassManualAckConsumer{@RabbitListener(queues ="order.queue")publicvoidhandleMessage(Order order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){try{// 处理消息processOrder(order);// 成功确认 channel.basicAck(deliveryTag,false);}catch(BusinessException e){// 业务异常,不重试,直接确认 log.error("业务异常,消息丢弃", e); channel.basicAck(deliveryTag,false);}catch(Exception e){// 系统异常,重新入队 log.error("系统异常,消息重试", e); channel.basicNack(deliveryTag,false,true);}}}
批量确认
@ComponentpublicclassBatchAckConsumer{privateMap<Long,Order> pendingMessages =newConcurrentHashMap<>();@RabbitListener(queues ="order.queue")publicvoidhandleMessage(Order order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){// 保存到待处理Map pendingMessages.put(deliveryTag, order);// 每处理100条确认一次if(pendingMessages.size()>=100){batchAck(channel);}}privatevoidbatchAck(Channel channel){long lastDeliveryTag =Collections.max(pendingMessages.keySet());try{// 批量确认 channel.basicAck(lastDeliveryTag,true);// 清理已确认的消息 pendingMessages.keySet().removeIf(tag -> tag <= lastDeliveryTag);}catch(IOException e){ log.error("批量确认失败", e);}}}

4. 消息重试机制

Spring重试机制
spring:rabbitmq:listener:simple:retry:enabled:truemax-attempts:3# 最大重试次数initial-interval:1000# 初始间隔1秒multiplier:2.0# 倍数递增max-interval:10000# 最大间隔10秒stateless:true# 有状态重试(会重新入队)
自定义重试策略
packagecom.example.test.config;importorg.springframework.amqp.rabbit.config.RetryInterceptorBuilder;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.MessageRecoverer;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.retry.backoff.ExponentialBackOffPolicy;importorg.springframework.retry.policy.SimpleRetryPolicy;importorg.springframework.retry.support.RetryTemplate;/** * 自定义重试策略 */@ConfigurationpublicclassRetryConfig{@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactory connectionFactory,MessageRecoverer messageRecoverer){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory);// 配置重试RetryTemplate retryTemplate =newRetryTemplate();// 重试策略ExponentialBackOffPolicy backOffPolicy =newExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(1000); backOffPolicy.setMultiplier(2.0); backOffPolicy.setMaxInterval(10000);// 重试次数SimpleRetryPolicy retryPolicy =newSimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); retryTemplate.setBackOffPolicy(backOffPolicy); retryTemplate.setRetryPolicy(retryPolicy); factory.setRetryTemplate(retryTemplate);// 使用拦截器链配置消息恢复器 factory.setAdviceChain(RetryInterceptorBuilder.stateless().recoverer(messageRecoverer).retryOperations(retryTemplate).build());return factory;}@BeanpublicMessageRecoverermessageRecoverer(RabbitTemplate rabbitTemplate){// 重试失败后的处理returnnewRepublishMessageRecoverer( rabbitTemplate,"error.exchange",// 错误消息交换机"error.routingkey"// 错误消息路由键);}}

5. 消息幂等性处理

<!-- Redis依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
# Redis配置data:redis:host: 192.168.0.1 # Redis服务器地址port:6379# Redis端口password:# Redis密码(如果没有密码则留空)database:0# 使用的数据库索引timeout: 2000ms # 连接超时时间lettuce:pool:max-active:8# 连接池最大连接数max-wait:-1ms # 连接池最大阻塞等待时间max-idle:8# 连接池最大空闲连接min-idle:0# 连接池最小空闲连接
packagecom.example.test.consumer;importcom.example.test.pojo.entity.Order;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importcom.rabbitmq.client.Channel;importjava.io.IOException;importjava.util.concurrent.TimeUnit;@Component@Slf4jpublicclassIdempotentConsumer{@AutowiredprivateRedisTemplate<String,String> redisTemplate;privatestaticfinalStringMESSAGE_CACHE_PREFIX="msg:idempotent:";privatestaticfinallongEXPIRE_TIME=24*60*60;// 24小时@RabbitListener(queues ="order.queue")publicvoidhandleMessage(Order order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,@Header(AmqpHeaders.MESSAGE_ID)String messageId)throwsIOException{// 1. 检查幂等性if(!checkIdempotent(messageId)){ log.info("重复消息,直接确认: {}", messageId); channel.basicAck(deliveryTag,false);return;}try{// 2. 处理业务processOrder(order);// 3. 标记已处理markAsProcessed(messageId);// 4. 确认消息 channel.basicAck(deliveryTag,false);}catch(Exception e){ log.error("消息处理失败", e); channel.basicNack(deliveryTag,false,true);}}/** * 处理订单业务逻辑 */privatevoidprocessOrder(Order order){// 这里实现具体的业务处理逻辑 log.info("处理订单: {}", order.getOrderNo());}/** * 检查消息是否已处理 */privatebooleancheckIdempotent(String messageId){String key =MESSAGE_CACHE_PREFIX+ messageId;return!Boolean.TRUE.equals(redisTemplate.hasKey(key));}/** * 标记消息已处理 */privatevoidmarkAsProcessed(String messageId){String key =MESSAGE_CACHE_PREFIX+ messageId; redisTemplate.opsForValue().set(key,"1",EXPIRE_TIME,TimeUnit.SECONDS);}}

6. 可靠性保障最佳实践

发送端可靠性
packagecom.example.test.pojo.entity;importlombok.*;@Data@NoArgsConstructor@AllArgsConstructor@Getter@SetterpublicclassFailedMessage{privateString messageId;privateString exchange;privateString routingKey;privateObject content;privateString status;privateString cause;privateLong createTime;privateLong updateTime;}
packagecom.example.test.service;importcom.example.test.pojo.entity.FailedMessage;importjava.util.List;publicinterfaceMessageStoreService{/** * 保存消息到数据库 * @param messageId 消息ID * @param exchange 交换机 * @param routingKey 路由键 * @param message 消息内容 * @param status 状态 */voidsaveMessage(String messageId,String exchange,String routingKey,Object message,String status);/** * 更新消息状态 * @param messageId 消息ID * @param status 新状态 */voidupdateStatus(String messageId,String status);/** * 获取失败消息列表 * @return 失败消息列表 */List<FailedMessage>getFailedMessages();/** * 根据消息ID获取消息 * @param messageId 消息ID * @return 失败消息 */FailedMessagegetMessageById(String messageId);}
packagecom.example.test.service.impl;importcom.example.test.pojo.entity.FailedMessage;importcom.example.test.service.MessageStoreService;importorg.springframework.stereotype.Service;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.atomic.AtomicLong;@ServicepublicclassMessageStoreServiceImplimplementsMessageStoreService{// 使用内存存储模拟数据库(实际项目中应该使用真实数据库)privatefinalConcurrentHashMap<String,FailedMessage> messageStore =newConcurrentHashMap<>();privatefinalAtomicLong idGenerator =newAtomicLong(1);@OverridepublicvoidsaveMessage(String messageId,String exchange,String routingKey,Object message,String status){FailedMessage failedMessage =newFailedMessage(); failedMessage.setMessageId(messageId); failedMessage.setExchange(exchange); failedMessage.setRoutingKey(routingKey); failedMessage.setContent(message); failedMessage.setStatus(status); failedMessage.setCreateTime(System.currentTimeMillis()); failedMessage.setUpdateTime(System.currentTimeMillis()); messageStore.put(messageId, failedMessage);}@OverridepublicvoidupdateStatus(String messageId,String status){FailedMessage message = messageStore.get(messageId);if(message !=null){ message.setStatus(status); message.setUpdateTime(System.currentTimeMillis()); messageStore.put(messageId, message);}}@OverridepublicList<FailedMessage>getFailedMessages(){List<FailedMessage> failedMessages =newArrayList<>();for(FailedMessage message : messageStore.values()){if("FAILED".equals(message.getStatus())||"SENDING".equals(message.getStatus())){ failedMessages.add(message);}}return failedMessages;}@OverridepublicFailedMessagegetMessageById(String messageId){return messageStore.get(messageId);}}
packagecom.example.test.producer;importcom.example.test.pojo.entity.FailedMessage;importcom.example.test.service.MessageStoreService;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importlombok.extern.slf4j.Slf4j;importjava.util.UUID;importjava.util.List;@Component@Slf4jpublicclassReliableProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateMessageStoreService messageStoreService;/** * 可靠发送消息 */publicvoidsendReliably(String exchange,String routingKey,Object message){String messageId =UUID.randomUUID().toString();try{// 1. 保存到数据库(状态:发送中) messageStoreService.saveMessage(messageId, exchange, routingKey, message,"SENDING");// 2. 发送消息CorrelationData correlationData =newCorrelationData(messageId); rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);// 3. 定时检查确认状态(由ConfirmCallback更新状态)}catch(Exception e){ log.error("消息发送异常", e); messageStoreService.updateStatus(messageId,"FAILED");throw e;}}/** * 定时重试失败消息 */@Scheduled(fixedDelay =60000)// 每60秒执行一次publicvoidretryFailedMessages(){List<FailedMessage> messages = messageStoreService.getFailedMessages();for(FailedMessage msg : messages){try{// 重新发送 rabbitTemplate.convertAndSend( msg.getExchange(), msg.getRoutingKey(), msg.getContent());// 更新状态 messageStoreService.updateStatus(msg.getMessageId(),"RETRY_SENT");}catch(Exception e){ log.error("重试发送失败: {}", msg.getMessageId(), e);}}}}

死信队列

在 RabbitMQ 中,死信队列(Dead Letter Queue, DLQ) 是一种用于存储无法被正常消费的消息的队列。当消息满足特定条件成为“死信”后,它会被自动转发到预先指定的死信交换机(Dead Letter Exchange, DLX),再由该交换机路由到绑定的死信队列中。死信队列本质上是一个普通的队列,只是它的用途是收集、分析和重新处理那些“失败”的消息。

死信队列形成原因

  1. 消息被拒绝:消费者使用 basic.rejectbasic.nack 否定确认消息,且未设置重新入队requeue=false)。
  2. 消息过期:消息设置了存活时间(TTL),超时后尚未被消费。(TTL到期未被消费)
  3. 队列长度超限:队列达到最大长度限制,最早的消息可能被丢弃(取决于配置)并转为死信。

死信队列的工作原理

  • 在声明业务队列时,通过设置参数 x-dead-letter-exchange 指定一个死信交换机(可选地通过 x-dead-letter-routing-key 指定路由键)。
  • 当业务队列中的消息变为死信后,RabbitMQ 会自动将消息重新发布到指定的死信交换机。
  • 死信交换机根据路由键将消息路由到绑定的死信队列(如果未指定路由键,则保留原队列的路由键)。

死信队列配置

packagecom.example.test.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassDeadLetterConfig{// ==================== 业务队列(带死信配置) ====================@BeanpublicQueueorderQueueWithDLX(){Map<String,Object> args =newHashMap<>();// 死信交换机 args.put("x-dead-letter-exchange",DLX_EXCHANGE);// 死信路由键 args.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);// 队列最大长度 args.put("x-max-length",10000);// 消息TTL(毫秒) args.put("x-message-ttl",300000);// 5分钟returnnewQueue(ORDER_QUEUE,true,false,false, args);}// ==================== 死信交换机 ====================@BeanpublicDirectExchangedlxExchange(){returnnewDirectExchange(DLX_EXCHANGE,true,false);}// ==================== 死信队列 ====================@BeanpublicQueuedlxQueue(){returnnewQueue(DLX_QUEUE,true);}@BeanpublicBindingdlxBinding(){returnBindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}// ==================== 重试队列(处理死信) ====================@BeanpublicQueueretryQueue(){returnnewQueue(RETRY_QUEUE,true);}@BeanpublicDirectExchangeretryExchange(){returnnewDirectExchange(RETRY_EXCHANGE,true,false);}@BeanpublicBindingretryBinding(){returnBindingBuilder.bind(retryQueue()).to(retryExchange()).with(RETRY_ROUTING_KEY);}// 常量定义privatestaticfinalStringORDER_QUEUE="order.queue";privatestaticfinalStringDLX_EXCHANGE="dlx.exchange";privatestaticfinalStringDLX_QUEUE="dlx.queue";privatestaticfinalStringDLX_ROUTING_KEY="dlx.order";privatestaticfinalStringRETRY_EXCHANGE="retry.exchange";privatestaticfinalStringRETRY_QUEUE="retry.queue";privatestaticfinalStringRETRY_ROUTING_KEY="retry.order";}

死信消息处理

在Order,OrderService和OrderServiceImpl加入以下代码:

packagecom.example.test.pojo.entity;importlombok.*;@Data@NoArgsConstructor@AllArgsConstructor@Setter@GetterpublicclassOrder{privateInteger id;privateString orderNo;privateString skuId;// 添加SKU ID字段privateInteger quantity;// 添加数量字段}
packagecom.example.test.service;importcom.baomidou.mybatisplus.extension.service.IService;importcom.example.test.pojo.entity.Order;publicinterfaceOrderServiceextendsIService<Order>{voidprocessOrder(Order order);/** * 取消订单 * @param orderNo 订单号 * @param reason 取消原因 */voidcancelOrder(String orderNo,String reason);}
packagecom.example.test.service.impl;importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.example.test.mapper.OrderMapper;importcom.example.test.pojo.entity.Order;importcom.example.test.service.OrderService;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;@Service@Slf4jpublicclassOrderServiceImplextendsServiceImpl<OrderMapper,Order>implementsOrderService{@OverridepublicvoidprocessOrder(Order order){ log.info("执行processOrder方法。。。");}@OverridepublicvoidcancelOrder(String orderNo,String reason){ log.info("取消订单: {}, 原因: {}", orderNo, reason);// 这里实现实际的订单取消逻辑// 示例:update().eq("order_no", orderNo).set("status", "CANCELLED").update();}}

创建NotificationService和NotificationServiceImpl

packagecom.example.test.service;importcom.example.test.pojo.entity.Order;publicinterfaceNotificationService{/** * 发送订单取消通知 * @param order 订单信息 */voidsendOrderCancelNotification(Order order);/** * 发送通用通知 * @param message 通知内容 */voidsendNotification(String message);}
packagecom.example.test.service.impl;importcom.example.test.pojo.entity.Order;importcom.example.test.service.NotificationService;importorg.springframework.stereotype.Service;importlombok.extern.slf4j.Slf4j;@Service@Slf4jpublicclassNotificationServiceImplimplementsNotificationService{@OverridepublicvoidsendOrderCancelNotification(Order order){ log.info("发送订单取消通知 - 订单号: {}, 原因: 超时未支付", order.getOrderNo());// 这里可以集成短信、邮件、微信等通知服务// 示例:smsService.sendSms(phone, "您的订单已取消...");}@OverridepublicvoidsendNotification(String message){ log.info("发送通知: {}", message);// 通用通知逻辑}}

创建InventoryService和InventoryServiceImpl:

packagecom.example.test.service;publicinterfaceInventoryService{/** * 释放库存 * @param skuId 商品SKU ID * @param quantity 数量 */voidreleaseStock(String skuId,Integer quantity);/** * 扣减库存 * @param skuId 商品SKU ID * @param quantity 数量 */voiddeductStock(String skuId,Integer quantity);}
packagecom.example.test.service.impl;importcom.example.test.service.InventoryService;importorg.springframework.stereotype.Service;importlombok.extern.slf4j.Slf4j;@Service@Slf4jpublicclassInventoryServiceImplimplementsInventoryService{@OverridepublicvoidreleaseStock(String skuId,Integer quantity){ log.info("释放库存 - SKU: {}, 数量: {}", skuId, quantity);// 这里实现实际的库存释放逻辑// 示例:inventoryMapper.updateStock(skuId, quantity);}@OverridepublicvoiddeductStock(String skuId,Integer quantity){ log.info("扣减库存 - SKU: {}, 数量: {}", skuId, quantity);// 这里实现实际的库存扣减逻辑}}
packagecom.example.test.handler;importorg.springfrexampleamework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importlombok.extern.slf4j.Slf4j;importcom.example.test.pojo.entity.Order;importcom.example.test.service.OrderService;importcom.example.test.service.InventoryService;importcom.example.test.service.NotificationService;importcom.rabbitmq.client.Channel;importjava.io.IOException;importjava.util.Map;@Component@Slf4jpublicclassDeadLetterHandler{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateOrderService orderService;@AutowiredprivateInventoryService inventoryService;@AutowiredprivateNotificationService notificationService;/** * 处理死信消息 */@RabbitListener(queues ="dlx.queue")publicvoidhandleDeadLetter(Message message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag)throwsIOException{try{// 解析原始消息String originalQueue =getOriginalQueue(message);Object originalMessage =extractOriginalMessage(message); log.warn("收到死信消息,原始队列: {}, 消息: {}", originalQueue, originalMessage);// 根据不同队列采取不同策略if(originalQueue.contains("order")){handleOrderDeadLetter(originalMessage);}elseif(originalQueue.contains("payment")){handlePaymentDeadLetter(originalMessage);}// 确认死信消息 channel.basicAck(deliveryTag,false);}catch(Exception e){ log.error("处理死信消息失败", e);// 死信消息处理失败,可以记录日志或发送告警 channel.basicNack(deliveryTag,false,false);}}/** * 订单死信处理(如:订单超时未支付) */privatevoidhandleOrderDeadLetter(Object message){// 1. 解析订单信息Order order =parseOrder(message);// 2. 取消订单 orderService.cancelOrder(order.getOrderNo(),"超时未支付");// 3. 释放库存 inventoryService.releaseStock(order.getSkuId(), order.getQuantity());// 4. 发送通知 notificationService.sendOrderCancelNotification(order); log.info("订单超时取消: {}", order.getOrderNo());}/** * 支付死信处理 */privatevoidhandlePaymentDeadLetter(Object message){// 支付相关的死信处理逻辑 log.info("处理支付死信消息: {}", message);}/** * 重试消费失败的消息 */@RabbitListener(queues ="retry.queue")publicvoidhandleRetryMessage(Message message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag)throwsIOException{try{Object originalMessage =extractOriginalMessage(message);Integer retryCount =getRetryCount(message);if(retryCount <3){// 再次尝试处理boolean success =processWithRetry(originalMessage);if(success){ channel.basicAck(deliveryTag,false); log.info("重试处理成功");}else{// 重试失败,重新放入死信队列 channel.basicNack(deliveryTag,false,false);}}else{// 超过重试次数,人工处理 channel.basicAck(deliveryTag,false);alertManualIntervention(message);}}catch(Exception e){ log.error("重试处理异常", e); channel.basicNack(deliveryTag,false,false);}}privateStringgetOriginalQueue(Message message){// 从消息头获取原始队列return(String) message.getMessageProperties().getHeaders().get("x-original-queue");}privateObjectextractOriginalMessage(Message message){// 提取原始消息内容return message.getBody();}privateIntegergetRetryCount(Message message){Map<String,Object> headers = message.getMessageProperties().getHeaders();Integer count =(Integer) headers.get("x-retry-count");return count !=null? count :0;}privateOrderparseOrder(Object message){// 解析订单对象的逻辑returnnewOrder();// 这里需要根据实际情况实现}privatebooleanprocessWithRetry(Object message){// 带重试的处理逻辑returntrue;// 这里需要根据实际情况实现}privatevoidalertManualIntervention(Message message){// 发送人工干预告警 log.warn("需要人工干预的消息: {}", message);}}

延迟重试实现

packagecom.example.test.service;importorg.springframework.amqp.core.Message;publicinterfaceDelayRetryService{/** * 发送延迟消息 * @param message 消息内容 * @param delaySeconds 延迟时间(秒) * @param retryCount 重试次数 */voidsendDelayRetry(Object message,int delaySeconds,int retryCount);/** * 处理延迟消息 * @param message 延迟消息 */voidhandleDelayRetry(Message message);/** * 计算下一次延迟时间 * @param retryCount 重试次数 * @return 下一次延迟时间(秒) */intcalculateNextDelay(int retryCount);}
packagecom.example.test.service.impl;importcom.example.test.service.DelayRetryService;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.util.HashMap;importjava.util.Map;@Service@Slf4jpublicclassDelayRetryServiceImplimplementsDelayRetryService{@AutowiredprivateRabbitTemplate rabbitTemplate;/** * 发送延迟重试消息 */publicvoidsendDelayRetry(Object message,int delaySeconds,int retryCount){Map<String,Object> headers =newHashMap<>(); headers.put("x-retry-count", retryCount); headers.put("x-delay", delaySeconds *1000); rabbitTemplate.convertAndSend("retry.delay.exchange","retry.key", message, m ->{ m.getMessageProperties().setHeaders(headers);return m;});}/** * 处理延迟重试消息 */@RabbitListener(queues ="retry.delay.queue")publicvoidhandleDelayRetry(Message message){Integer retryCount =(Integer) message.getMessageProperties().getHeaders().get("x-retry-count");if(retryCount ==null) retryCount =0;try{// 处理消息processMessage(message);}catch(Exception e){if(retryCount <3){// 增加重试次数,延迟更长时间int nextDelay =calculateNextDelay(retryCount);sendDelayRetry(message, nextDelay, retryCount +1);}else{// 超过重试次数,进入死信队列 log.error("超过最大重试次数", e);}}}publicintcalculateNextDelay(int retryCount){// 指数退避策略return(int)Math.pow(2, retryCount)*5;// 5, 10, 20秒}/** * 处理消息的具体逻辑 * @param message 消息对象 */privatevoidprocessMessage(Message message){// 这里实现具体的消息处理逻辑 log.info("处理延迟重试消息: {}",newString(message.getBody()));// 模拟业务处理// 实际项目中这里应该是具体的业务逻辑String messageContent =newString(message.getBody());if(messageContent.contains("error")){thrownewRuntimeException("模拟处理失败");} log.info("消息处理成功");}}

延迟队列

在 RabbitMQ 中,延迟队列(Delayed Queue) 指的是消息在发送到队列后,不会立即被消费者消费,而是等待指定的延迟时间后才变为可消费状态。RabbitMQ 本身并未内置延迟队列功能,但可以通过以下两种主流方式实现延迟消息:

  1. 基于死信队列(DLX) + 消息 TTL
  2. 基于 RabbitMQ 延迟消息插件(rabbitmq_delayed_message_exchange)

延迟队列实现方案对比

方案优点缺点适用场景
TTL+DLX无需插件,简单消息排序问题,不灵活固定延迟时间
延迟插件灵活,支持任意延迟需要安装插件复杂延迟需求
Redis ZSet高精度,分布式需要额外维护大量延迟任务
时间轮算法高性能,内存友好实现复杂海量定时任务

方案一:TTL+DLX实现延迟队列

实现原理
  • 给普通队列设置 消息存活时间(TTL)死信交换机(DLX)
  • 生产者发送消息时,为每条消息设置 expiration 属性,或为整个队列设置 x-message-ttl 参数。
  • 消息到达队列后,如果超过 TTL 未被消费,会自动转为死信,被投递到指定的死信交换机,再路由到绑定的死信队列。
  • 消费者监听死信队列,即可在延迟时间后消费到原消息。
packagecom.example.test.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.stereotype.Component;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassTTLDelayQueueConfig{// ==================== 延迟队列配置 ====================// 创建多个不同TTL的队列@BeanpublicQueuedelayQueue5s(){Map<String,Object> args =newHashMap<>(); args.put("x-dead-letter-exchange",DELAY_EXCHANGE); args.put("x-dead-letter-routing-key",DELAY_ROUTING_KEY); args.put("x-message-ttl",5000);// 5秒returnnewQueue("delay.queue.5s",true,false,false, args);}@BeanpublicQueuedelayQueue30s(){Map<String,Object> args =newHashMap<>(); args.put("x-dead-letter-exchange",DELAY_EXCHANGE); args.put("x-dead-letter-routing-key",DELAY_ROUTING_KEY); args.put("x-message-ttl",30000);// 30秒returnnewQueue("delay.queue.30s",true,false,false, args);}@BeanpublicQueuedelayQueue5m(){Map<String,Object> args =newHashMap<>(); args.put("x-dead-letter-exchange",DELAY_EXCHANGE); args.put("x-dead-letter-routing-key",DELAY_ROUTING_KEY); args.put("x-message-ttl",300000);// 5分钟returnnewQueue("delay.queue.5m",true,false,false, args);}// ==================== 延迟交换机 ====================@BeanpublicDirectExchangedelayExchange(){returnnewDirectExchange(DELAY_EXCHANGE,true,false);}@BeanpublicQueuedelayProcessQueue(){returnnewQueue(DELAY_PROCESS_QUEUE,true);}@BeanpublicBindingdelayBinding(){returnBindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);}// ==================== 发送延迟消息 ====================@ComponentpublicclassTTLDelaySender{@AutowiredprivateRabbitTemplate rabbitTemplate;/** * 发送延迟消息 * @param delaySeconds 延迟秒数 */publicvoidsendDelayMessage(Object message,int delaySeconds){String queueName =getQueueByDelay(delaySeconds); rabbitTemplate.convertAndSend("", queueName, message);}privateStringgetQueueByDelay(int delaySeconds){if(delaySeconds <=5){return"delay.queue.5s";}elseif(delaySeconds <=30){return"delay.queue.30s";}elseif(delaySeconds <=300){return"delay.queue.5m";}else{// 超过5分钟,使用最大延迟队列return"delay.queue.5m";}}}// 常量privatestaticfinalStringDELAY_EXCHANGE="delay.exchange";privatestaticfinalStringDELAY_PROCESS_QUEUE="delay.process.queue";privatestaticfinalStringDELAY_ROUTING_KEY="delay.process";}
优缺点
  • 优点:无需安装额外插件,利用 RabbitMQ 原生机制即可实现。
  • 缺点
    • 消息顺序问题:队列中的消息按 FIFO 顺序处理,只有队列头部的消息过期后才会处理后续消息。如果先发送的一条消息 TTL 很长,后发送的一条消息 TTL 很短,后一条消息必须等到前一条过期才能被投递到死信队列,导致延迟不精确。
    • 灵活性受限:如果需要为不同消息设置不同延迟时间,必须使用消息级别的 TTL(expiration 属性),但上述顺序问题依然存在。

方案二:延迟插件实现(推荐)

实现原理
  • 安装官方插件 rabbitmq_delayed_message_exchange
  • 声明一个 x-delayed-message 类型的交换机,生产者将消息发送到该交换机,并通过 x-delay 头指定延迟时间。
  • 插件内部将消息暂存到 Mnesia 表中,通过定时器检查到期时间,到期后将消息投递到绑定的队列中。
1. 安装延迟插件
# 1. 下载插件(版本需与RabbitMQ匹配)wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez # 2. 复制到插件目录cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.19/plugins/ # 3. 启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 4. 重启RabbitMQ systemctl restart rabbitmq-server # 5. 验证插件 rabbitmq-plugins list |grep delay 
2. SpringBoot配置

在OrderService中添加getOrderStatus方法,并实现该方法:

packagecom.example.test.service;importcom.baomidou.mybatisplus.extension.service.IService;importcom.example.test.pojo.entity.Order;publicinterfaceOrderServiceextendsIService<Order>{voidprocessOrder(Order order);/** * 取消订单 * @param orderNo 订单号 * @param reason 取消原因 */voidcancelOrder(String orderNo,String reason);/** * 获取订单状态 * @param orderNo 订单号 * @return 订单状态 */OrderStatusgetOrderStatus(String orderNo);// 订单状态枚举enumOrderStatus{UNPAID,// 未支付PAID,// 已支付CANCELLED,// 已取消COMPLETED// 已完成}}
packagecom.example.test.service.impl;importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.example.test.mapper.OrderMapper;importcom.example.test.pojo.entity.Order;importcom.example.test.service.OrderService;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;@Service@Slf4jpublicclassOrderServiceImplextendsServiceImpl<OrderMapper,Order>implementsOrderService{@OverridepublicvoidprocessOrder(Order order){ log.info("执行processOrder方法。。。");}@OverridepublicvoidcancelOrder(String orderNo,String reason){ log.info("取消订单: {}, 原因: {}", orderNo, reason);// 这里实现实际的订单取消逻辑// 示例:update().eq("order_no", orderNo).set("status", "CANCELLED").update();}@OverridepublicOrderStatusgetOrderStatus(String orderNo){ log.info("查询订单状态: {}", orderNo);// 这里实现实际的订单状态查询逻辑// 示例:从数据库查询订单状态// Order order = getOne(new QueryWrapper<Order>().eq("order_no", orderNo));// return order != null ? OrderStatus.valueOf(order.getStatus()) : OrderStatus.CANCELLED;// 临时返回示例状态returnOrderStatus.UNPAID;}}
packagecom.example.test.config;importcom.example.test.pojo.entity.Order;importcom.example.test.service.OrderService;importcom.example.test.service.OrderService.OrderStatus;importlombok.Data;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.CustomExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.stereotype.Component;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassDelayedQueueConfig{// 延迟交换机@BeanpublicCustomExchangedelayedExchange(){Map<String,Object> args =newHashMap<>(); args.put("x-delayed-type","direct");// 延迟类型returnnewCustomExchange("delayed.exchange",// 交换机名称"x-delayed-message",// 交换机类型(固定)true,// 持久化false,// 不自动删除 args // 参数);}// 延迟队列@BeanpublicQueuedelayedQueue(){returnnewQueue("delayed.queue",true);}// 绑定@BeanpublicBindingdelayedBinding(){returnBindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed.routingkey").noargs();// 必须调用noargs()}// 延迟消息发送器@Component@Slf4jpublicclassDelayedMessageSender{@AutowiredprivateRabbitTemplate rabbitTemplate;/** * 发送延迟消息 * @param message 消息内容 * @param delayMillis 延迟毫秒数 */publicvoidsendDelayedMessage(Object message,int delayMillis){ rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey", message, m ->{// 使用消息头设置延迟时间(RabbitMQ延迟插件的方式) m.getMessageProperties().setHeader("x-delay", delayMillis);return m;}); log.info("发送延迟消息,延迟{}ms: {}", delayMillis, message);}/** * 发送订单超时延迟消息 */publicvoidsendOrderTimeoutMessage(Order order,int timeoutMinutes){OrderTimeoutMessage timeoutMessage =newOrderTimeoutMessage(); timeoutMessage.setOrderNo(order.getOrderNo()); timeoutMessage.setTimeoutAt(System.currentTimeMillis()+ timeoutMinutes *60*1000);sendDelayedMessage(timeoutMessage, timeoutMinutes *60*1000);}}// 延迟消息消费者@Component@Slf4jpublicclassDelayedMessageConsumer{@AutowiredprivateOrderService orderService;@RabbitListener(queues ="delayed.queue")publicvoidhandleDelayedMessage(OrderTimeoutMessage message){ log.info("收到延迟消息: {}", message);// 检查订单状态OrderStatus status = orderService.getOrderStatus(message.getOrderNo());if(status ==OrderStatus.UNPAID){// 订单仍未支付,执行取消逻辑 orderService.cancelOrder(message.getOrderNo(),"超时未支付"); log.info("订单超时取消: {}", message.getOrderNo());}else{ log.info("订单已处理,忽略超时消息: {}", message.getOrderNo());}}}// 延迟消息实体@DatapublicstaticclassOrderTimeoutMessage{privateString orderNo;privateLong timeoutAt;// 超时时间戳}}
3. 延迟消息应用场景

创建CreateOrderRequest类

packagecom.example.test.pojo.request;importlombok.Data;importlombok.NoArgsConstructor;importlombok.AllArgsConstructor;@Data@NoArgsConstructor@AllArgsConstructorpublicclassCreateOrderRequest{privateLong userId;privateLong productId;privateInteger quantity;privateString remark;}

在NotificationService和NotificationServiceImpl添加以下方法:

packagecom.example.test.service;importcom.example.test.pojo.entity.Order;publicinterfaceNotificationService{/** * 发送订单取消通知 * @param order 订单信息 */voidsendOrderCancelNotification(Order order);/** * 发送通用通知 * @param message 通知内容 */voidsendNotification(String message);/** * 发送支付提醒 * @param orderNo 订单号 */voidsendPaymentReminder(String orderNo);}
packagecom.example.test.service.impl;importcom.example.test.pojo.entity.Order;importcom.example.test.service.NotificationService;importorg.springframework.stereotype.Service;importlombok.extern.slf4j.Slf4j;@Service@Slf4jpublicclassNotificationServiceImplimplementsNotificationService{@OverridepublicvoidsendOrderCancelNotification(Order order){ log.info("发送订单取消通知 - 订单号: {}, 原因: 超时未支付", order.getOrderNo());// 这里可以集成短信、邮件、微信等通知服务// 示例:smsService.sendSms(phone, "您的订单已取消...");}@OverridepublicvoidsendNotification(String message){ log.info("发送通知: {}", message);// 通用通知逻辑}@OverridepublicvoidsendPaymentReminder(String orderNo){ log.info("发送支付提醒 - 订单号: {}", orderNo);// 这里可以集成短信、邮件、微信等通知服务// 示例:smsService.sendSms(phone, "您的订单即将超时,请尽快支付...");}}

创建OrderTimeoutService和OrderTimeoutServiceImpl:

packagecom.example.test.service;importcom.example.test.pojo.entity.Order;importcom.example.test.pojo.request.CreateOrderRequest;publicinterfaceOrderTimeoutService{/** * 创建订单并启动超时检查 * @param request 创建订单请求 * @return 订单对象 */OrdercreateOrder(CreateOrderRequest request);/** * 取消订单超时检查 * @param orderNo 订单号 */voidcancelTimeoutCheck(String orderNo);}
packagecom.example.test.service.impl;importcom.example.test.config.DelayedQueueConfig;importcom.example.test.pojo.entity.Order;importcom.example.test.pojo.request.CreateOrderRequest;importcom.example.test.service.NotificationService;importcom.example.test.service.OrderService;importcom.example.test.service.OrderTimeoutService;importlombok.Data;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjava.util.UUID;@Service@Slf4jpublicclassOrderTimeoutServiceImplimplementsOrderTimeoutService{@AutowiredprivateDelayedQueueConfig.DelayedMessageSender delayedMessageSender;@AutowiredprivateOrderService orderService;@AutowiredprivateNotificationService notificationService;/** * 创建订单并设置支付超时 */@TransactionalpublicOrdercreateOrder(CreateOrderRequest request){// 1. 创建订单对象Order order =newOrder(); order.setOrderNo(generateOrderNo()); order.setSkuId(String.valueOf(request.getProductId())); order.setQuantity(request.getQuantity());// 2. 保存订单到数据库 orderService.save(order);// 3. 发送15分钟支付超时延迟消息 delayedMessageSender.sendOrderTimeoutMessage(order,15);// 4. 发送5分钟未支付提醒 delayedMessageSender.sendDelayedMessage(newPaymentReminder(order.getOrderNo()),5*60*1000// 5分钟); log.info("订单创建成功,已设置超时检测: {}", order.getOrderNo());return order;}/** * 支付成功,取消超时检测 */publicvoidcancelTimeoutCheck(String orderNo){// 实际中可能需要更复杂的逻辑来处理已发送的延迟消息 log.info("订单支付成功,取消超时检测: {}", orderNo);}/** * 生成订单号 */privateStringgenerateOrderNo(){return"ORD"+System.currentTimeMillis()+UUID.randomUUID().toString().substring(0,8).toUpperCase();}@ComponentpublicstaticclassPaymentReminderConsumer{@AutowiredprivateOrderService orderService;@AutowiredprivateNotificationService notificationService;@RabbitListener(queues ="reminder.queue")publicvoidhandlePaymentReminder(PaymentReminder reminder){// 检查订单是否已支付OrderService.OrderStatus status = orderService.getOrderStatus(reminder.getOrderNo());if(status ==OrderService.OrderStatus.UNPAID){// 发送支付提醒 notificationService.sendPaymentReminder(reminder.getOrderNo()); log.info("发送支付提醒: {}", reminder.getOrderNo());}}}/** * 支付提醒消息类 */@DatapublicstaticclassPaymentReminder{privateString orderNo;publicPaymentReminder(){}publicPaymentReminder(String orderNo){this.orderNo = orderNo;}}}
优缺点
  • 优点
    • 延迟精确,支持任意精度的延迟(实际受插件实现限制,通常毫秒级)。
    • 消息顺序由交换机控制,不存在队列头部阻塞问题。
    • 支持动态设置不同消息的延迟时间。
  • 缺点
    • 需要安装并启用插件,依赖外部组件。
    • 消息存储在 Mnesia 中,大量延迟消息可能占用内存,需评估性能。

方案三:Redis实现延迟队列(备用方案)

添加fastjson依赖:

<!-- FastJSON依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>
packagecom.example.test.queue;importcom.alibaba.fastjson.JSON;importlombok.Data;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.data.redis.core.script.DefaultRedisScript;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importjava.sql.Date;importjava.util.Arrays;importjava.util.Set;@Component@Slf4jpublicclassRedisDelayQueue{@AutowiredprivateRedisTemplate<String,String> redisTemplate;privatestaticfinalStringDELAY_QUEUE_KEY="delay:queue";privatestaticfinalStringPROCESSING_QUEUE_KEY="delay:processing";/** * 添加延迟任务 */publicvoidaddDelayTask(String taskId,Object data,long delaySeconds){long score =System.currentTimeMillis()+ delaySeconds *1000;DelayTask task =newDelayTask(); task.setTaskId(taskId); task.setData(data); task.setExecuteTime(score); redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY,JSON.toJSONString(task), score); log.info("添加延迟任务: {}, 执行时间: {}", taskId,newDate(score));}/** * 处理延迟任务 */@Scheduled(fixedDelay =1000)// 每秒执行一次publicvoidprocessDelayTasks(){long now =System.currentTimeMillis();// 获取到期的任务Set<String> tasks = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_KEY,0, now,0,100);if(tasks !=null&&!tasks.isEmpty()){for(String taskJson : tasks){try{DelayTask task =JSON.parseObject(taskJson,DelayTask.class);// 移动到处理中队列(防止重复处理)if(moveToProcessing(task)){// 处理任务processTask(task);// 从处理中队列删除 redisTemplate.opsForHash().delete(PROCESSING_QUEUE_KEY, task.getTaskId());}// 从延迟队列删除 redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskJson);}catch(Exception e){ log.error("处理延迟任务失败: {}", taskJson, e);}}}}/** * 将任务移动到处理中队列(原子操作) */privatebooleanmoveToProcessing(DelayTask task){// 使用Lua脚本保证原子性String script ="if redis.call('hexists', KEYS[2], ARGV[1]) == 0 then "+" redis.call('hset', KEYS[2], ARGV[1], ARGV[2]) "+" return 1 "+"else "+" return 0 "+"end";Long result = redisTemplate.execute(newDefaultRedisScript<>(script,Long.class),Arrays.asList(DELAY_QUEUE_KEY,PROCESSING_QUEUE_KEY), task.getTaskId(),JSON.toJSONString(task));return result !=null&& result ==1;}/** * 处理具体任务 */privatevoidprocessTask(DelayTask task){ log.info("执行延迟任务: {}", task.getTaskId());// 根据任务类型执行不同的处理逻辑// ...}@DatapublicstaticclassDelayTask{privateString taskId;privateObject data;privateLong executeTime;privateString type;// 任务类型}}

RabbitMQ高级特性

1. 优先级队列

@ConfigurationpublicclassPriorityQueueConfig{@BeanpublicQueuepriorityQueue(){Map<String,Object> args =newHashMap<>(); args.put("x-max-priority",10);// 最大优先级10returnnewQueue("priority.queue",true,false,false, args);}@ComponentpublicclassPriorityMessageSender{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendHighPriority(Object message){ rabbitTemplate.convertAndSend("exchange","routingkey", message, m ->{ m.getMessageProperties().setPriority(9);// 高优先级return m;});}publicvoidsendLowPriority(Object message){ rabbitTemplate.convertAndSend("exchange","routingkey", message, m ->{ m.getMessageProperties().setPriority(1);// 低优先级return m;});}}}

2. 备用交换器(Alternate Exchange)

@ConfigurationpublicclassAlternateExchangeConfig{@BeanpublicDirectExchangemainExchange(){Map<String,Object> args =newHashMap<>(); args.put("alternate-exchange","alternate.exchange");// 备用交换器returnnewDirectExchange("main.exchange",true,false, args);}@BeanpublicFanoutExchangealternateExchange(){returnnewFanoutExchange("alternate.exchange",true,false);}@BeanpublicQueuealternateQueue(){returnnewQueue("alternate.queue",true);}@BeanpublicBindingalternateBinding(){returnBindingBuilder.bind(alternateQueue()).to(alternateExchange());}// 使用:无法路由到队列的消息会自动转到备用交换器}

3. 消息追踪(Firehose Tracer)

# 启用消息追踪插件 rabbitmq-plugins enable rabbitmq_firehose # 创建追踪队列 rabbitmqctl trace_on rabbitmqctl trace_off # 创建追踪队列(接收所有消息) rabbitmqctl set_policy firehose \".*"\'{"firehose-window-size":1000, "firehose-payloads":true}'\ --apply-to queues 

4. 流控(Flow Control)

@ConfigurationpublicclassFlowControlConfig{@BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactory factory =newCachingConnectionFactory();// 设置流控参数 factory.getRabbitConnectionFactory().setRequestedChannelMax(100); factory.getRabbitConnectionFactory().setRequestedFrameMax(131072); factory.getRabbitConnectionFactory().setRequestedHeartbeat(60);return factory;}}

性能调优建议

1. 连接和通道管理

spring:rabbitmq:# 连接配置connection-timeout:30000# 连接池配置cache:connection:mode: CONNECTION # 或CHANNELsize:10# 连接池大小channel:size:25# 通道缓存数量checkout-timeout:2000# 心跳检测requested-heartbeat:60

2. 消费者调优

@ConfigurationpublicclassConsumerOptimizationConfig{@BeanpublicSimpleRabbitListenerContainerFactoryoptimizedContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory);// 消费者数量 factory.setConcurrentConsumers(5);// 初始消费者数 factory.setMaxConcurrentConsumers(20);// 最大消费者数// 预取数量(根据消息处理时间调整) factory.setPrefetchCount(100);// 每个消费者预取数量// 批处理 factory.setBatchSize(50);// 批处理大小 factory.setConsumerBatchEnabled(true);// 空闲消费者回收 factory.setIdleEventInterval(30000L);// 30秒空闲检测 factory.setFailedDeclarationRetryInterval(5000L);// 重试间隔return factory;}}

3. 生产者调优

@ConfigurationpublicclassProducerOptimizationConfig{@BeanpublicRabbitTemplateoptimizedRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template =newRabbitTemplate(connectionFactory);// 使用异步发送(提高吞吐量) template.setUsePublisherConnection(true);// 批量发送 template.setBatchingStrategy(newSimpleBatchingStrategy(100,// 批处理大小10000L,// 缓冲区大小5000L// 超时时间(毫秒)));// 确认模式(平衡可靠性和性能) template.setConfirmCallback((correlationData, ack, cause)->{// 异步处理确认,不阻塞发送线程});// 通道缓存 template.setChannelTransacted(false);// 非事务模式,性能更好return template;}}

4. 队列优化

# 设置队列参数 rabbitmqctl set_policy queue-optimization \"^optimized\."\'{"max-length":100000,"max-length-bytes":1073741824,"overflow":"reject-publish"}'\ --apply-to queues # 启用Lazy Queues(磁盘存储,内存优化) rabbitmqctl set_policy lazy-queues \"^lazy\."\'{"queue-mode":"lazy"}'\ --apply-to queues 

5. 监控和告警

management:endpoints:web:exposure:include: health,metrics,rabbit metrics:export:rabbitmq:enabled:truespring:rabbitmq:metrics:enabled:trueexport:enabled:true

常见问题与解决方案

1. 消息丢失问题

场景:生产者发送成功,但消费者没收到

解决方案

@ComponentpublicclassMessageReliabilityService{// 方案1:生产者确认publicvoidsendWithConfirm(Object message){CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("exchange","routingKey", message, correlationData);// 异步检查确认结果CompletableFuture.runAsync(()->{try{// 等待确认(可设置超时)boolean confirmed = correlationData.getFuture().get(10,TimeUnit.SECONDS);if(!confirmed){// 重试或记录retryOrLog(message);}}catch(Exception e){// 处理异常}});}// 方案2:消息落库@TransactionalpublicvoidsendWithPersistence(Object message){// 1. 保存到数据库(状态:发送中) messageLogService.save(message,"SENDING");// 2. 发送消息 rabbitTemplate.convertAndSend("exchange","routingKey", message);// 3. 异步更新状态(通过ConfirmCallback)}// 方案3:定时补偿@Scheduled(fixedDelay =60000)publicvoidcompensateMessages(){List<MessageLog> pendingMessages = messageLogService.getPendingMessages();for(MessageLog msg : pendingMessages){if(msg.getRetryCount()<3){// 重新发送sendWithConfirm(msg.getContent()); msg.incrementRetryCount(); messageLogService.update(msg);}else{// 告警,人工介入 alertService.sendAlert(msg);}}}}

2. 消息重复消费

场景:网络问题导致ACK失败,消息重新投递

解决方案

@ServicepublicclassIdempotentConsumerService{@AutowiredprivateRedisTemplate<String,String> redisTemplate;privatestaticfinalStringCONSUMED_PREFIX="msg:consumed:";privatestaticfinallongEXPIRE_HOURS=24;/** * 幂等性检查 */publicbooleanisMessageConsumed(String messageId){String key =CONSUMED_PREFIX+ messageId;returnBoolean.TRUE.equals(redisTemplate.hasKey(key));}/** * 标记消息已消费 */publicvoidmarkMessageConsumed(String messageId){String key =CONSUMED_PREFIX+ messageId; redisTemplate.opsForValue().set( key,"1",EXPIRE_HOURS,TimeUnit.HOURS);}/** * 安全的消费消息 */@RabbitListener(queues ="order.queue")publicvoidsafeConsume(Order order,@Header(AmqpHeaders.MESSAGE_ID)String messageId,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){// 1. 幂等性检查if(isMessageConsumed(messageId)){ log.info("重复消息,直接确认: {}", messageId); channel.basicAck(deliveryTag,false);return;}try{// 2. 业务处理 orderService.process(order);// 3. 标记已消费markMessageConsumed(messageId);// 4. 确认消息 channel.basicAck(deliveryTag,false);}catch(Exception e){ log.error("消息处理失败", e); channel.basicNack(deliveryTag,false,true);// 重新入队}}}

3. 消息积压处理

场景:消费者处理速度跟不上生产者

解决方案

@ComponentpublicclassMessageBacklogHandler{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateRabbitAdmin rabbitAdmin;/** * 监控队列积压 */@Scheduled(fixedRate =60000)// 每分钟检查一次publicvoidmonitorQueueBacklog(){Properties queueProperties = rabbitAdmin.getQueueProperties("order.queue");if(queueProperties !=null){int messageCount =(int) queueProperties.get("QUEUE_MESSAGE_COUNT");int consumerCount =(int) queueProperties.get("QUEUE_CONSUMER_COUNT"); log.info("队列状态 - 消息数: {}, 消费者数: {}", messageCount, consumerCount);// 动态调整消费者数量if(messageCount >1000&& consumerCount <10){scaleConsumers("order.queue",10);}elseif(messageCount <100&& consumerCount >3){scaleConsumers("order.queue",3);}// 告警if(messageCount >10000){ alertService.sendBacklogAlert("order.queue", messageCount);}}}/** * 动态扩展消费者 */privatevoidscaleConsumers(String queueName,int targetCount){// 通过调整@RabbitListener的concurrency参数// 或者重启应用时修改配置 log.info("调整队列{}的消费者数量到: {}", queueName, targetCount);}/** * 紧急处理:消息转移 */publicvoidtransferMessages(String sourceQueue,String targetQueue){while(true){Message message = rabbitTemplate.receive(sourceQueue,5000);if(message ==null)break;// 转移到备用队列 rabbitTemplate.send(targetQueue, message);}}/** * 紧急处理:增加临时消费者 */publicvoidaddTemporaryConsumer(String queueName){// 启动新的消费者实例// 可以使用线程池或临时微服务实例ExecutorService executor =Executors.newFixedThreadPool(5);for(int i =0; i <5; i++){ executor.submit(()->{while(true){Message message = rabbitTemplate.receive(queueName,5000);if(message ==null)break;// 处理消息processMessage(message);}});}}}

4. 顺序消息保障

场景:同一订单的消息需要按顺序处理

解决方案

@ComponentpublicclassOrderSequenceHandler{/** * 方案1:单队列单消费者 */@RabbitListener( queues ="order.sequence.queue", concurrency ="1"// 单消费者保证顺序)publicvoidhandleOrderSequence(OrderEvent event){// 按顺序处理订单事件 orderService.processEvent(event);}/** * 方案2:使用消息分组 */@RabbitListener(queues ="order.shard.queue")publicvoidhandleShardedOrder(@PayloadOrderEvent event,@Header("shardKey")String shardKey){// 相同shardKey的消息路由到同一个消费者// shardKey可以是订单ID的hash}/** * 方案3:使用Redis维护顺序 */@RabbitListener(queues ="order.queue")publicvoidhandleWithRedisSequence(OrderEvent event){String orderId = event.getOrderId();long eventSequence = event.getSequence();// 获取当前处理到的序号Long currentSequence = redisTemplate.opsForValue().get("order:sequence:"+ orderId);if(currentSequence ==null|| eventSequence == currentSequence +1){// 处理消息 orderService.processEvent(event);// 更新序号 redisTemplate.opsForValue().set("order:sequence:"+ orderId, eventSequence);}else{// 顺序不对,重新入队// 可以延迟重试或放入重试队列 redisTemplate.opsForList().leftPush("order:waiting:"+ orderId,JSON.toJSONString(event));}}}

5. 集群和高可用

spring:rabbitmq:# 集群配置addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672username: admin password: password # 集群故障转移connection-timeout:30000requested-heartbeat:60# 镜像队列(高可用)# 需要在管理界面配置策略

6. 安全配置

# 1. 启用SSL# 生成证书 openssl req -x509-newkey rsa:4096 -keyout key.pem -out cert.pem -days365# RabbitMQ配置 listeners.ssl.default =5671 ssl_options.cacertfile = /path/to/ca_certificate.pem ssl_options.certfile = /path/to/server_certificate.pem ssl_options.keyfile = /path/to/server_key.pem ssl_options.verify = verify_peer ssl_options.fail_if_no_peer_cert =false# 2. 网络隔离# 使用虚拟主机隔离不同环境 rabbitmqctl add_vhost /prod rabbitmqctl add_vhost /test # 3. 访问控制# 创建只读用户 rabbitmqctl add_user monitor monitor_pass rabbitmqctl set_user_tags monitor monitoring rabbitmqctl set_permissions -p / monitor """"".*"# 4. 审计日志# 启用审计插件 rabbitmq-plugins enable rabbitmq_auth_backend_http rabbitmq-plugins enable rabbitmq_event_exchange 

总结

RabbitMQ是一个功能强大、灵活可靠的消息中间件,适用于大多数企业级应用场景。通过合理配置和使用,可以构建出高性能、高可用的分布式系统。

关键要点总结:

  1. 选择合适的交换机类型:根据业务需求选择direct、fanout、topic或headers
  2. 保障消息可靠性:使用ConfirmCallback、ReturnsCallback和手动ACK
  3. 处理异常情况:合理使用死信队列和延迟队列
  4. 性能优化:合理配置连接池、预取数量和消费者并发
  5. 监控告警:建立完善的监控体系,及时发现和处理问题
  6. 安全防护:做好网络隔离、访问控制和数据加密

最佳实践建议:

  1. 生产环境:使用集群+镜像队列保证高可用
  2. 消息设计:消息体尽量小,使用JSON格式
  3. 错误处理:所有消费逻辑都要有try-catch和重试机制
  4. 资源隔离:不同业务使用不同的虚拟主机
  5. 容量规划:预估业务量,合理设置队列长度和TTL
  6. 文档维护:维护交换机、队列和路由键的文档

Read more

【论文阅读】DSRL: Steering Your Diffusion Policy with Latent Space Reinforcement Learning

【论文阅读】Steering Your Diffusion Policy with Latent Space Reinforcement Learning * 1 团队与发表时间 * 2. 问题背景与核心思路 * 3. 具体做法 * 3.1 模型设计 * 3.2 Loss 设计 * 3.3 数据设计 * 4 实验效果 * 5 结论 * 6 扩散模型进行RL的方案 * 6.1 纯离线设置 (Purely Offline Setting) * 6.2 在线设置 (Online Setting) * 6.3 残差策略 (Residual Policy) 1 团队与发表时间

By Ne0inhk
PI发布的Human to Robot数采工作——头戴iPhone且手戴两相机采集数据:混合数据中像“用机器人数据一样”用人类数据,而无需显式对齐

PI发布的Human to Robot数采工作——头戴iPhone且手戴两相机采集数据:混合数据中像“用机器人数据一样”用人类数据,而无需显式对齐

前言 为了推动中国具身的更快落地,今26年第一季度,我司将帮更多工厂落地umi这种数采模式,在工人们干活的同时 把数据给采集了,以训练机器人自主干活 当然,VR或动捕采集,也能落地工厂的,适合单一任务 重点攻克;而大批量且多任务,umi模式 有优势 如果说,25年的人形运控的元年,26年,我愿称之为具身落地的元年 本文来解读下PI公司发布的Human to Robot数采工作 第一部分 VLA中「人类数据映射到机器人能力」的涌现 1.1 引言、相关工作、预备知识 1.1.1 引言 如原论文所述,通过观看他人执行任务的视频来学习,仍然是一个活跃的研究领域 [9,2,31,5,22,27]。利用这种类型数据的技术有望为通用机器人策略解锁大规模的人类数据 1. 受语言模型的启发,最近的研究发现,能否利用某些数据来源在本质上与模型规模密切相关[47,

By Ne0inhk

EpicDesigner低代码设计器完全配置指南

EpicDesigner低代码设计器完全配置指南 【免费下载链接】epic-designer 项目地址: https://gitcode.com/gh_mirrors/ep/epic-designer 你是否曾经为重复编写表单页面而感到厌倦?EpicDesigner正是为解决这一痛点而生的可视化低代码设计工具。作为基于Vue3开发的多UI组件库兼容设计器,它能够通过拖拽方式快速生成页面配置,显著提升开发效率。 为什么选择EpicDesigner 在当今快速迭代的开发环境中,EpicDesigner提供了三个核心价值:首先,它支持Element Plus、Ant Design Vue和Naive UI三套主流UI组件库,让团队可以根据现有技术栈灵活选择;其次,通过JSON配置生成页面的方式,实现了配置的可视化管理和版本控制;最后,强大的扩展机制允许开发者根据业务需求自定义组件和功能。 环境准备与基础配置 系统要求检查 开始之前,请确保你的开发环境满足以下要求: * Node.js版本14.x或更高 * 包管理器(npm、yarn或pnpm) * 现代浏览器支持(

By Ne0inhk
深度拆解 RPA 机器人:定义、应用、价值与未来方向

深度拆解 RPA 机器人:定义、应用、价值与未来方向

在数字化转型的浪潮中,RPA机器人正悄然改变着企业的运营模式和我们的工作场景。从银行自动处理的对账单据,到电商平台的订单同步,再到政务大厅的审批流程,这个看不见的“虚拟员工”正在默默承担大量重复繁琐的工作,成为提升效率、降低成本的关键力量。可能很多人对RPA机器人还感到陌生,它究竟是什么?能做哪些事?又为何能成为企业数字化转型的“标配”?今天我们就来深入聊聊RPA机器人的世界。 一、解密RPA机器人:不止是“自动点鼠标” 先给大家一个最直白的定义:RPA机器人,全称是机器人流程自动化(Robotic Process Automation),简单说就是部署在电脑里的“软件机器人”,它能模仿人类在计算机上的操作行为,比如点击鼠标、输入文字、打开文件、跨系统录入数据等,按照预设的规则自动完成一系列重复性工作。很多人会把它和工业机器人混淆,其实两者差别很大——工业机器人是有实体的,负责车间里的物理操作;而RPA机器人是纯软件形态,专注于电脑上的数字化流程操作。 从工作原理来看,RPA机器人的核心逻辑并不复杂,主要靠“三件套”协同工作:设计平台负责可视化编辑自动化流程,就像给机器人制定

By Ne0inhk