RabbitMQ高级特性-惰性队列
目录
一、消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积的问题。
二、解决消息堆积的三种思路
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
三、惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列。
1、命令行修改惰性队列
rabbitmqctl set_policy Lazy "^myqueue$" '{"queue-mode":"lazy"}' --apply-to-queues
2、用SpringAMQP声明惰性队列
Bean的方式
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
// 开启x-queue-mode: lazy
.lazy()
.build();
}
注解方式
/**
* 接收到了 lazy.queue 队列的消息:{}",msg);
*/
@RabbitListener(queues = "lazy.queue")
public void listenLazyQueue(String msg) {
log.info("接收到了 lazy.queue 队列的消息:{}",msg);
}
测试发送消息
@Test
public void testLazyQueue() throws InterruptedException {
long b = System.nanoTime();
for (int i = 0; i < 1000000; i++) {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("lazy.queue", message);
}
long e = System.nanoTime();
System.out.println(e - b);
}
@Test
public void testNormalQueue() throws InterruptedException {
long b = System.nanoTime();
for (int i = 0; i < 1000000; i++) {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("normal.queue", message);
}
long e = System.nanoTime();
System.out.println(e - b);
}
我们可以发现惰性队列直接回写入磁盘,page-out,而普通队列隔一段时间写入磁盘。
3、惰性队列的优点
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
4、惰性队列的缺点
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
代码
地址: