
延迟队列插件
RabbitMQ 官方提供了一个延迟插件来实现延迟功能。
延迟队列插件的下载
插件下载地址:GitHub Releases
根据自己的 RabbitMQ 版本选择相应版本的延迟插件,下载后上传到服务器。
查看 RabbitMQ 版本的命令:
rabbitmqctl status | grep "RabbitMQ"

/usr/lib/rabbitmq/plugins 是一个附加目录,RabbitMQ 包本身不会在此安装任何内容。如果没有这个路径,可以自己进行创建。

延迟队列插件的安装
把下载好的 .ez 文件上传到 /usr/lib/rabbitmq/plugins 这个路径下。
查看 RabbitMQ 插件的命令:
rabbitmq-plugins list

延迟队列插件的启用
启用延迟队列插件的命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

添加配置
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://guest:guest@localhost:5672/
常量类
public class Constants {
// 延迟队列
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_EXCHANGE = "delay.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;
@Configuration
public class DelayConfig {
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public Exchange delayExchange() {
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
}
}
编写生产消息代码
@RequestMapping("/delay2")
public String delay2() {
System.out.println("delay2...");
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 30s...", message -> {
message.getMessageProperties().setDelayLong(30000L); // 单位:毫秒,过期时间为 30s
return message;
});
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 10s...", message -> {
message.getMessageProperties().setDelayLong(10000L); // 单位:毫秒,延迟时间为 10s
return message;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
编写消费消息代码
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;
import java.util.Date;
@Component
public class DelayListener {
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void delayHandMessage(Message message, Channel channel) throws Exception {
// 消费者逻辑
System.out.printf("[delay.queue] %tc 接收到消息:%s \n", new Date(), new String(message.getBody(), "UTF-8"));
}
}
观察效果



此时我们可以看到,哪怕把 TTL 为 30 秒的消息先于 TTL 为 10 秒的消息进行发送,也不会影响根据延迟时间进行消息的发送。
面试题
1. 介绍下 RabbitMQ 的延迟队列
延迟队列是一个特殊的队列,消息发送之后,并不立即给消费者,而是等待特定的时间,才发送给消费者。
延迟队列的应用场景有很多,比如:
- 订单在十分钟内未支付自动取消
- 用户注册成功后,3 天后发调查问卷
- 用户发起退款,24 小时后商家未处理,则默认同意,自动退款
但 RabbitMQ 本身并没直接实现延迟队列,通常有两种方法:
- TTL + 死信队列组合的方式
- 使用官方提供的延迟插件实现延迟功能
2. 上面两种实现方法二者对比
- 基于死信实现的延迟队列
- 优点:灵活不需要额外的插件支持
- 缺点:存在消息顺序问题;需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性
- 基于插件实现的延迟队列
- 优点:通过插件可以直接创建延迟队列,简化延迟消息的实现;避免了 DLX 的时序问题
- 缺点:需要依赖特定的插件,有运维工作;只适用特定版本


