为什么需要 basicQos
RabbitMQ 默认推模式会尽可能快地把消息塞给消费者。如果消费者处理能力跟不上,内存和线程直接被撑爆。更糟糕的是,一个服务崩溃可能连累整个微服务系统。
在手动确认模式下,RabbitMQ 允许我们通过 basicQos 控制未确认消息的数量。设置好这个值,服务端就会等消费者确认一条再发下一条,防止过载。
这是我的基础配置示例,也是线上常用的写法:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class BasicQosExample {
private static final String QUEUE_NAME = "limited_queue";
private static final String EXCHANGE_NAME = "limited_exchange";
private static final String ROUTING_KEY = "limited.routing.key";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.newConnection();
connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, );
channel.queueDeclare(QUEUE_NAME, , , , );
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
channel.basicQos(, );
(consumerTag, delivery) -> {
(delivery.getBody(), );
System.out.println( + message + );
{
processMessage(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), );
System.out.println( + message);
} (Exception e) {
System.err.println( + message + + e.getMessage());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), , );
}
};
consumerTag ->
System.out.println( + consumerTag);
channel.basicConsume(QUEUE_NAME, , deliverCallback, cancelCallback);
System.out.println();
Thread.sleep(Long.MAX_VALUE);
}
InterruptedException {
Thread.sleep();
}
}


