跳到主要内容 RabbitMQ 后端消息队列技术详解 | 极客日志
Java java
RabbitMQ 后端消息队列技术详解 本文详细介绍了 RabbitMQ 消息队列的核心概念、安装部署及在 Spring Boot 中的应用。内容包括 AMQP 协议基础、常见 MQ 产品对比、消息收发模式(Hello World、Work Queues、Pub/Sub)、RPC 实现、消息有效期(TTL)与死信队列、延迟队列实现方案(插件与 DLX)、消息可靠性保障(发送与消费确认机制)以及集群部署模式。文章提供了完整的代码示例和配置说明,适合后端开发人员学习掌握 RabbitMQ 技术栈。
RabbitMQ
初识 MQ
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的 Broker。
协议
AMQP
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个应用层的开放式标准协议,用于在分布式系统中实现消息的可靠传递。它定义了消息的结构、交换方式、路由规则等规范,使不通过厂商的消息中间件能够互联互通。
在 AMQP 协议中,消息收发涉及到如下一些概念:
Broker :接收和分发消息的应用
Virtual host :出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 中创建 等
exchange/queue
Connection :publisher/consumer 和 broker 之间的 TCP 连接,断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非出现网络故障或 broker 服务出现问题
Channel :如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP method 包含了 Channel id 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销
Exchange :Message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (点对点), topic(发布订阅) 以及 fanout (广播)
Queue :消息最终被送到这里等待 Consumer 取走,一个 Message 可以被同时拷贝到多个 queue 中
Binding :Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 Exchange 中的查询表中,作为 Message 的分发依据
XMPP XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是一个基于 XML 的协议,多用于即时消息(IM)以及在线现场探测,适用于服务器之间的准即时操作。核心是基于 XML 流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。它的优点是通用公开、兼容性强、可扩展、安全性高,缺点是 XML 编码格式占用带宽大。
MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,目前看来算是物联网开发中比较重要的协议之一了,该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和 Actuator(比如通过 Twitter 让房屋联网)的通信协议,它的优点是格式简洁、占用带宽小、支持移动端通信、支持 PUSH、适用于嵌入式系统。
产品
ActiveMQ ActiveMQ 是 Apache 下的一个子项目,使用完全支持 JMS1.1 和 J2EE1.4 规范的 JMS Provider 实现,少量代码就可以高效地实现高级应用场景,并且支持可插拔的传输协议,如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。
ActiveMQ 支持常用的多种语言客户端如 C++、Java、.Net,、Python、Php、Ruby 等。
ActiveMQ Classic
ActiveMQ Artemis
这里的 ActiveMQ Classic 就是原来的 ActiveMQ,而 ActiveMQ Artemis 是在 RedHat 捐赠的 HornetQ 服务器代码的基础上开发的,两者代码完全不同,后者支持 JMS2.0,使用基于 Netty 的异步 IO,大大提升了性能,更为神奇的是,后者不仅支持 JMS 协议,还支持 AMQP 协议、STOMP 以及 MQTT,可以说后者的玩法相当丰富。
因此大家在使用时,建议直接选择 ActiveMQ Artemis。
RocketMQ RocketMQ 是阿里开源的一款分布式消息中间件,原名 Metaq,从 3.0 版本开始改名为 RocketMQ,是阿里参照 Kafka 设计思想使用 Java 语言实现的一套 MQ。RocketMQ 将阿里内部多款 MQ 产品(Notify、Metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下 MQ 的架构,目前主要用于订单交易系统。
保证严格的消息顺序
提供针对消息的过滤功能
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
亿级消息堆积能力
Kafka Kafka 是 Apache 下的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
快速持久化 :通过磁盘顺序读写与零拷贝机制,可以在 O(1) 的系统开销下进行消息持久化
高吞吐 :在一台普通的服务器上既可以达到 10W/s 的吞吐速率
高堆积 :支持 topic 下消费者较长时间离线,消息堆积量大
完全的分布式系统 :Broker、Producer、Consumer 都原生自动支持分布式,通过 Zookeeper 可以自动实现更加复杂的负载均衡
支持 Hadoop 数据并行加载
RabbitMQ RabbitMQ 支持 AMQP、XMPP、SMTP、STOMP 等多种协议,功能强大,适用于企业级开发。
比较 RabbitMQ ActiveMQ RocketMQ Kafka 社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java Scala&Java 协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫秒以内 消息可靠性 高 一般 高 一般
安装 mkdir -p /opt/rabbitmq/data
docker run -d -v /opt/rabbitmq/data:/var/lib/rabbitmq -p5672:5672 -p15672:15672 --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
消息收发方式
架构简介
生产者(Producer):发布消息到 RabbitMQ 中的交换机(Exchange)上
交换机(Exchange):和生产者建立连接并接收生产者的消息
消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息
队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互
路由(Routes):交换机转发消息到队列的规则
准备工作 RabbitMQ 是 AMQP 的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖,如下:
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-amqp</artifactId >
</dependency >
在 application.yaml 中配置 RabbitMQ 的基本连接信息,如下:
spring:
rabbitmq:
host: 192.168 .159 .128
port: 5672
username: freedom
password: freedom
virtual-host: /freedom
消费者和生产者的启动类都需要添加 @EnableRabbit 注解。
在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
RabbitMQ 官网介绍了如下几种消息分发的形式:
消息收发
Hello World 一个生产者,一个队列,一个消费者。消息传播图如下:
public class Constants {
public static final String HELLO_WORLD_QUEUE = "hello_world.queue" ;
}
@Configuration
public class RabbitmqConfig {
@Bean
public Queue queueOne () {
return new Queue (Constants.HELLO_WORLD_QUEUE);
}
}
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.HELLO_WORLD_QUEUE)
public void receive (String msg) {
System.out.println("receive msg: " + msg);
}
}
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessageQueue () {
String message = "Hello World" ;
rabbitTemplate.convertAndSend(Constants.HELLO_WORLD_QUEUE, message);
}
}
使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 hello_world.queue,则 routing key 为 hello_world.queue 的消息会被该消息队列接收。
Work Queues 一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者。
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.HELLO_WORLD_QUEUE)
public void receive (String msg) {
System.out.println("receive msg: " + msg);
}
@RabbitListener(queues = Constants.HELLO_WORLD_QUEUE, concurrency = "10")
public void receive2 (String msg) {
System.out.println("receive2 msg: " + msg + "--------" + Thread.currentThread().getName());
}
}
由此可见,第二个消费者配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去消费消息。
如果生产者发送 10 条消息,就会一下都被消费掉。
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessageQueue () {
String message = "Hello World" ;
for (int i = 0 ; i < 10 ; i++) {
rabbitTemplate.convertAndSend(Constants.HELLO_WORLD_QUEUE, message);
}
}
}
可以看到,消息不是都被第二个消费者都消费了,也有可能被第一个消费者消费(由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)。
消费者可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.HELLO_WORLD_QUEUE)
public void receive (Message message, Channel channel) throws IOException {
System.out.println("receive msg: " + message.getPayload());
channel.basicAck((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG), true );
System.out.println("receive" );
}
@RabbitListener(queues = Constants.HELLO_WORLD_QUEUE, concurrency = "10")
public void receive2 (Message message, Channel channel) throws IOException {
System.out.println("receive2 msg: " + message.getPayload() + "--------" + Thread.currentThread().getName());
channel.basicReject((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG), true );
}
}
默认情况下,RabbitMQ 的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此设置 preFetch 值为 1,确保同一时刻最多投递给消费者 1 条消息。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
Publish/Subscribe 每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange 上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力。
[图示:Publish/Subscribe 架构]
真正生产环境会经过 exchange 来发送消息,而不是直接发送到队列。
Direct:定向
Fanout:广播
Topic:话题
Fanout 交换机 Fanout Exchange 会将接受到的消息广播到每一个跟其绑定的 queue,所以也叫广播模式 。
public class Constants {
public static final String FREEDOM_FANOUT_EXCHANGE = "freedom.fanout" ;
public static final String FANOUT_ONE_QUEUE = "fanout_one.queue" ;
public static final String FANOUT_TWO_QUEUE = "fanout_two.queue" ;
}
@Configuration
public class RabbitmqConfig {
@Bean
public Exchange fanoutExchange () {
return new FanoutExchange (Constants.FREEDOM_FANOUT_EXCHANGE, true , false );
}
@Bean
public Queue queueOne () {
return new Queue (Constants.FANOUT_ONE_QUEUE);
}
@Bean
public Queue queueTwo () {
return new Queue (Constants.FANOUT_TWO_QUEUE);
}
@Bean
public Binding bindingOne () {
return BindingBuilder.bind(queueOne()).to(fanoutExchange()).with("" ).noargs();
}
@Bean
public Binding bindingTwo () {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange()).with("" ).noargs();
}
}
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.FANOUT_ONE_QUEUE)
public void receiveOne (String message) throws IOException {
System.out.println("receiveOne msg: " + message);
}
@RabbitListener(queues = Constants.FANOUT_TWO_QUEUE)
public void receiveTwo (String message) throws IOException {
System.out.println("receiveTwo msg: " + message);
}
}
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessageQueue () {
rabbitTemplate.convertAndSend(Constants.FREEDOM_FANOUT_EXCHANGE, null , "hello fanout" );
}
}
两个消费者分别消费两个消费队列中的消息,发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null,生产者发送一条消息,被两个消费者同时消费。
Direct 交换机 Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此成为定向 路由。
每一个 Queue 都与 Exchange 设置一个 BindingKey
发布者发送消息时,指定消息的 RoutingKey
Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列
public class Constants {
public static final String FREEDOM_DIRECT_EXCHANGE = "freedom.direct" ;
public static final String DIRECT_QUEUE = "direct.queue" ;
}
@Configuration
public class RabbitmqConfig {
@Bean
public Exchange directExchange () {
return new DirectExchange (Constants.FREEDOM_DIRECT_EXCHANGE, true , false );
}
@Bean
public Queue queueOne () {
return new Queue (Constants.DIRECT_QUEUE);
}
@Bean
public Binding bindingOne () {
return BindingBuilder.bind(queueOne()).to(directExchange()).with("hello-direct" ).noargs();
}
}
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.DIRECT_QUEUE)
public void receiveOne (String message) throws IOException {
System.out.println("direct msg: " + message);
}
}
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessageQueue () {
rabbitTemplate.convertAndSend(Constants.FREEDOM_DIRECT_EXCHANGE, "hello-direct" , "hello direct" );
}
}
Topic 交换机 TopicExchange 与 DirectExchange 类似,区别在于 routingKey 可以是多个单词的列表,并且以.分割。
Queue 与 Exchange 指定 BindingKey 时可以使用通配符:
public class Constants {
public static final String FREEDOM_TOPIC_EXCHANGE = "freedom.topic" ;
public static final String TOPIC_XIAOMI_QUEUE = "topic_xiaomi.queue" ;
public static final String TOPIC_IPHONE_QUEUE = "topic_iphone.queue" ;
}
@Configuration
public class RabbitmqConfig {
@Bean
public Exchange topicExchange () {
return new TopicExchange (Constants.FREEDOM_TOPIC_EXCHANGE, true , false );
}
@Bean
public Queue queueOne () {
return new Queue (Constants.TOPIC_XIAOMI_QUEUE);
}
@Bean
public Queue queueTwo () {
return new Queue (Constants.TOPIC_IPHONE_QUEUE);
}
@Bean
public Binding bindingOne () {
return BindingBuilder.bind(queueOne()).to(topicExchange()).with("xiaomi.#" ).noargs();
}
@Bean
public Binding bindingTwo () {
return BindingBuilder.bind(queueTwo()).to(topicExchange()).with("iphone.#" ).noargs();
}
}
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.TOPIC_XIAOMI_QUEUE)
public void receiveOne (String message) throws IOException {
System.out.println("xiaomi msg: " + message);
}
@RabbitListener(queues = Constants.TOPIC_IPHONE_QUEUE)
public void receiveTwo (String message) throws IOException {
System.out.println("iphone msg: " + message);
}
}
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessageQueue () {
rabbitTemplate.convertAndSend(Constants.FREEDOM_TOPIC_EXCHANGE, "xiaomi.queue" , "小米手机" );
rabbitTemplate.convertAndSend(Constants.FREEDOM_TOPIC_EXCHANGE, "iphone.queue" , "苹果手机" );
}
}
消息转换器 Spring 的对消息对相关的处理是由 org.springframework.amqp.support.converter.MessageConverter 来处理的。而默认是 SimpleMessageConverter,基于 JDK 的 ObjectOutputStream 完成序列化。
JDK 的序列化有安全风险
JDK 序列化的消息太大
JDK 序列化的消息可读性
建议采用 JSON 序列化代替默认的 JDK 序列化,要做两件事情:
在 publisher 和 consumer 中都要引入 jackson 依赖:
<dependency >
<groupId > com.fasterxml.jackson.core</groupId >
<artifactId > jackson-databind</artifactId >
</dependency >
在 publisher 和 consumer 中都要配置 MessaggeConverter:
@Bean
public MessageConverter messageConverter () {
return new Jackson2JsonMessageConverter ();
}
RPC RPC (Remote Procedure Call Protocol 远程过程调用协议),除了 RESTful API、Dubbo、WebService、Java RMI、CORBA 等等,RabbitMQ 也提供了 RPC 功能。
架构简介
首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字
Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中
Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了
实践
客户端 spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
配置消息确认方式,通过 correlated 来确认,只有开启了这个配置,消息中才会带 correlation_id,只有通过 correlation_id 才能将发送的消息和返回值之间关联,最后一行配置则是开启发送失败退回。
public class Constants {
public static final String RPC_QUEUE_ONE = "queue_one" ;
public static final String RPC_QUEUE_TWO = "queue_two" ;
public static final String RPC_EXCHANGE = "rpc_exchange" ;
}
@Configuration
public class RabbitmqConfig {
@Bean
public Queue messageQueue () {
return new Queue (Constants.RPC_QUEUE_ONE);
}
@Bean
public Queue replyQueue () {
return new Queue (Constants.RPC_QUEUE_TWO);
}
@Bean
public TopicExchange exchange () {
return new TopicExchange (Constants.RPC_EXCHANGE);
}
@Bean
public Binding messageBinding () {
return BindingBuilder.bind(messageQueue()).to(exchange()).with(Constants.RPC_QUEUE_ONE);
}
@Bean
public Binding replyBinding () {
return BindingBuilder.bind(replyQueue()).to(exchange()).with(Constants.RPC_QUEUE_TWO);
}
@Bean
public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate (connectionFactory);
rabbitTemplate.setReplyAddress(Constants.RPC_QUEUE_TWO);
rabbitTemplate.setReplyTimeout(6000 );
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyContainer (ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer ();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(Constants.RPC_QUEUE_TWO);
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
}
配置类中分别配置了消息请求队列和消息回调队列,将这两个队列和消息交换机进行绑定。对消息发送工具 RabbitTemplate 重新进行定制,主要是添加消息发送的回调队列 ,给回调队列设置一个监听器 。
@RestController
public class RpcClientController {
private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send (String message) {
Message newMessage = MessageBuilder.withBody(message.getBytes()).build();
logger.info("client send: {}" , newMessage);
Message result = rabbitTemplate.sendAndReceive(Constants.RPC_EXCHANGE, Constants.RPC_QUEUE_ONE, newMessage);
String response = "" ;
if (result != null ) {
String correlationId = newMessage.getMessageProperties().getCorrelationId();
logger.info("correlationId: {}" , correlationId);
Map<String, Object> headers = result.getMessageProperties().getHeaders();
String messageId = (String) headers.get("spring_returned_message_correlation" );
if (messageId.equals(correlationId)) {
response = new String (result.getBody());
logger.info("client receive: {}" , response);
}
}
return response;
}
}
消息发送给调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息
服务端返回的消息中,头消息中包含了 spring_returned_messageg_correlation 字段,这个就是消息发送时候的 correlation_id,通过消息发送时候的 correlation_id 以及返回消息头中的 spring_returned_message_correlation 字段值,就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容够就是针对这个发送的消息
服务端 spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
@Configuration
public class RabbitmqConfig {
@Bean
public Queue messageQueue () {
return new Queue (Constants.RPC_QUEUE_ONE);
}
@Bean
public Queue replyQueue () {
return new Queue (Constants.RPC_QUEUE_TWO);
}
@Bean
public TopicExchange exchange () {
return new TopicExchange (Constants.RPC_EXCHANGE);
}
@Bean
public Binding messageBinding () {
return BindingBuilder.bind(messageQueue()).to(exchange()).with(Constants.RPC_QUEUE_ONE);
}
@Bean
public Binding replyBinding () {
return BindingBuilder.bind(replyQueue()).to(exchange()).with(Constants.RPC_QUEUE_TWO);
}
}
@Component
public class RpcServerController {
private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = Constants.RPC_QUEUE_ONE)
public void process (Message message) {
logger.info("server receive: {}" , message.toString());
MessageBuilder.withBody(("I'm receive: " + new String (message.getBody())).getBytes()).build();
CorrelationData correlationData = new CorrelationData (message.getMessageProperties().getCorrelationId());
rabbitTemplate.sendAndReceive(Constants.RPC_EXCHANGE, Constants.RPC_QUEUE_TWO, message, correlationData);
}
}
服务端首先收到消息并打印出来
服务端提取出原消息中的 correlation_id
服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数
测试 启动客户端和服务端,在 Postman 中调用客户端的接口进行测试。
消息有效期 默认情况下,消息是不会过期的。如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
TTL TTL (Time-To-Live),消息存活的时间,即消息的有效期。如果消息的存活时间超过了 TTL 并且环没有被消息,此时消息就会变成死信 。
在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期
在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期
设置了消息有效期后,消息过期了就会被从队列中删除了,但是两种方式对应的删除时机有一些差异:
对于第一种方式,当消息队列设置过期时间的时候,那么消息过期了就会被删除,因为消息进入 RabbitMQ 后是存在一个消息队列中,队列的头部是最早要过期的消息,所以 RabbitMQ 只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除
对于第二种方式,当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除
单条消息过期 配置消息队列,new 一个 Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他 ,一般都是设置为 false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般使用于临时队列。
关于排他性,如果设置为 true,则该消息队列只有创建它的 Connection 才能访问,其他的 Connection 都不能访问该消息队列,如果试图在不同的连接中重新声明或者访问排他性队列,那么系统会报一个资源被锁定的错误。另一方面,对于排他性队列而言,当连接断掉的时候,该消息队列也会自动删除(无论该队列是否被声明为持久性队列都会被删除)
单条消息设置过期时间,就是在消息发送的时候设置一下消息有效期即可。
Message message = MessageBuilder.withBody("hello world" .getBytes()).setExpiration("10000" ).build();
队列消息过期 @Bean
public Queue queue () {
Map<String, Object> args = new HashMap <>();
args.put("x-message-ttl" , 10000 );
return new Queue (Constants.HELLO_WORLD_QUEUE, true , false , false , args);
}
给队列设置消息过期时间,消息正常发送即可,不用设置消息过期时间。
消息队列的 Features 属性为 D 和 TTL,D 表示消息队列中消息持久化,TTL 则表示消息会过期。
死信队列
死信交换机 死信交换机 ,Dead-Letter-Exchange 即 DLX,用来接收死信消息的。一般消息变成死信消息有如下几种情况:
消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false
消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
队列达到最大长度
实践 public class Constants {
public static final String DLX_EXCHANGE = "dlx.exchange" ;
public static final String DLX_QUEUE = "dlx.queue" ;
public static final String DIRECT_QUEUE = "direct.queue" ;
public static final String DIRECT_EXCHANGE = "direct.exchange" ;
}
@Configuration
public class RabbitmqConfig {
@Bean
public Queue dlxQueue () {
return new Queue (Constants.DLX_QUEUE);
}
@Bean
public DirectExchange dlxExchange () {
return new DirectExchange (Constants.DLX_EXCHANGE, true , false );
}
@Bean
public Binding dlxBinding () {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(Constants.DLX_QUEUE);
}
}
死信交换机和死信队列跟普通的交换机,普通的消息队列没啥两样。
@Bean
public Queue queue () {
Map<String, Object> args = new HashMap <>();
args.put("x-message-ttl" , 0 );
args.put("x-dead-letter-exchange" , Constants.DLX_EXCHANGE);
args.put("x-dead-letter-routing-key" , Constants.DLX_QUEUE);
return new Queue (Constants.DIRECT_QUEUE, true , false , false , args);
}
@Bean
public DirectExchange exchange () {
return new DirectExchange (Constants.DIRECT_EXCHANGE, true , false );
}
@Bean
public Binding binding () {
return BindingBuilder.bind(queue()).to(exchange()).with(Constants.DIRECT_QUEUE);
}
x-dead-letter-exchange:配置死信交换机。
x-dead-letter-routing-key:配置死信 routing_key。
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.DLX_QUEUE)
public void dlxHandle (String message) {
System.out.println("dlx msg: " + message);
}
}
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDLXMessage () {
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, Constants.DIRECT_QUEUE, "hello world" );
}
}
延迟队列 常见的定时任务,例如日志备份,每天凌晨 3 点去备份,这种固定时间的定时任务一般采用 cron 表达式实现,还有一些比较特殊的定时任务,电影汇总的定时炸弹,3 分钟后爆炸,这种定时任务就无法用过 cron 去描述,因为开始时间不确定,开发中有的时候也会遇到类似的需求,例如:
在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。
公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
…
很多场景下都需要延迟队列。RabbitMQ 上实现定时任务有两种方式:
利用 RabbitMQ 自带的消息过期和死信队列机制,实现定时任务
使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案比较简单
插件实现
安装
在 GitHub 上下载 rabbitmq_delayed_message_exchange 插件
https: //github.com/rabbitmq /rabbitmq-delayed-message-exchange/releases
docker cp ./rabbitmq_delayed_message_exchange-4.2.0.ez rabbitmq:/plugins
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
配置完成后,执行 exit 命令退出 RabbitMQ 容器
消息收发 public class Constants {
public static final String DELAY_QUEUE = "delay.queue" ;
public static final String DELAY_EXCHANGE = "delay.exchange" ;
public static final String EXCHANGE_TYPE = "x-delayed-message" ;
}
@Configuration
public class RabbitmqConfig {
@Bean
public Queue queue () {
return new Queue (Constants.DELAY_QUEUE);
}
@Bean
public CustomExchange customExchange () {
Map<String, Object> args = new HashMap <>();
args.put("x-delayed-type" , "direct" );
return new CustomExchange (Constants.DELAY_EXCHANGE, Constants.EXCHANGE_TYPE, true , false , args);
}
@Bean
public Binding binding () {
return BindingBuilder.bind(queue()).to(customExchange()).with(Constants.DELAY_QUEUE).noargs();
}
}
这里使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数:
名称
类型,固定的
是否持久化
如果没有队列绑定到交换机,交换机是否删除
其它参数
最后一个 args 参数,指定了交换机消息分发的类型,这个类型就是熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void delayHandle (String message) throws IOException {
System.out.println("delay msg: " + message);
}
}
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDelayMessage () {
Message message = MessageBuilder.withBody("hello delay" .getBytes()).setHeader("x-delay" , 3000 ).build();
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_QUEUE, message);
}
}
DLX 实现
思路 延迟队列实现的思路:DLX(死信交换机)+TTL(消息超时时间)。
假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在'蹲点'这个死信队列,消息一进入死信队列,就立马被消费了。
消息可靠性
发送可靠性 RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。所以要确保消息发送的可靠性,主要从两方面去确认:
消息成功到达 Exchange
消息成功到达 Queue
如果能确认这两步,那么我们就可以认为消息发送成功。如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入。
经过上面的分析,可以确认,要确保消息成功发送,需要做好三件事就可以:
确认消息到达 Exchange
确认消息到达 Queue
开启定时任务,定时投递那些发送失败的消息
这是两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报错。
失败重试 有的时候由于网络波动,可能会出现客户端连接 MQ 失败的情况。这个重试机制就和 MQ 本身没有关系,这是利用 Spring 中的 retry 机制来完成,通过配置可以开启连接失败后的重试机制:
spring:
rabbitmq:
connection-timeout: 10000
template:
retry:
enabled: true
initial-interval: 1000ms
multiplier: 2
max-attempts: 3
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过 SpringAMQP 提供的重试机制是阻塞式 的重试,也就是多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用 重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步 线程来执行发送消息的代码。
事务机制 Spring Boot 中开启 RabbitMQ 事务机制:
@Bean
public RabbitTransactionManager transactionManager (ConnectionFactory connectionFactory) {
return new RabbitTransactionManager (connectionFactory);
}
接下来,在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:
@Transactional
public void testTransaction () {
rabbitTemplate.setChannelTransacted(true );
rabbitTemplate.convertAndSend(Constants.FREEDOM_DIRECT_EXCHANGE, Constants.DIRECT_QUEUE, "hello world" );
int i = 1 / 0 ;
}
发送消息的方法上添加 @Transactional 注解标记事务
调用 setChannelTransacted 方法设置为 true 开启事务模式
当开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
客户端发出请求,将信道设置为事务模式
服务端给出回复,同意将信道设置为事务模式
客户端发送消息
客户端提交事务
服务端给出响应,确认事务提交
上面的步骤,除了第三步是本来就有,其他几个步骤都是平白无故多出来的。事务模式其实效率有点低,这并非一个最佳解决方案。一般来说都是一些高并发的项目才会用到消息中间件。
RabbitMQ 还提供了发送方确认机制来确保消息发送,这种方式,性能要远远高于事务模式。
确认机制 RabbitMQ 提供了Publisher Confirm 和Publisher Return 两种确认机制。开启确认机制后,在 MQ 成功收到消息后会返回确认消息给生产者。返回的结果有一下几种情况:
消息投递到了 MQ,但是路由失败。此时会通过 PublisherReturn 返回路由异常原因,然后返回ACK ,告知投递成功
临时消息投递到了 MQ,并且入队成功,返回ACK ,告知投递成功
持久消息投递到了 MQ,并且入队完成持久化,返回ACK ,告知投递成功
其它情况都会返回NACK ,告知投递失败
在 application.yaml 中配置开启消息发送方确认机制
spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
none:表示禁用发布确认模式,默认即此
correlated:表示成功发布消息到交换器后触发的回调方法,MQ 异步回调方式返回回执消息
simple:同步阻塞等待 MQ 的回执消息,类似 correlated,并且支持 waitForConfirms() 和 waitForCofirmsOrDie() 方法的调用
@Configuration
public class RabbitmqConfig implements RabbitTemplate .ConfirmCallback, RabbitTemplate.ReturnCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqConfig.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public Queue queue () {
return new Queue (Constants.DIRECT_QUEUE);
}
@Bean
public DirectExchange exchange () {
return new DirectExchange (Constants.DIRECT_EXCHANGE);
}
@Bean
public Binding binding () {
return BindingBuilder.bind(queue()).to(exchange()).with(Constants.DIRECT_QUEUE);
}
@PostConstruct
public void initRabbitTemplate () {
rabbitTemplate.setConfirmCallback(this );
rabbitTemplate.setReturnCallback(this );
}
@Override
public void confirm (CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
LOGGER.info("{}:消息成功到达交换器" , correlationData.getId());
} else {
LOGGER.error("{}:消息发送失败" , correlationData.getId());
}
}
@Override
public void returnedMessage (Message message, int i, String s, String exchange, String routingKey) {
LOGGER.error("{}: 消息未成功路由到队列,exchange: {}, routing key: {}" , message.getMessageProperties().getMessageId(), exchange, routingKey);
}
}
定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用
定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback
CorrelationData correlationData = new CorrelationData (UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("freedom.direct1" , Constants.DIRECT_QUEUE, "hello world" , correlationData);
CorrelationData correlationData = new CorrelationData (UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(Constants.FREEDOM_DIRECT_EXCHANGE, "direct.queue1" , "hello world" , correlationData);
消费可靠性
消费思路 RabbitMQ 的消息消费,整体上来说有两种不同的思路:
推(Push) :MQ 主动将消息推送 给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式
拉(Pull) :消费者主动从 MQ拉取 消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.DIRECT_QUEUE)
public void handle (String message) {
System.out.println("msg: " + message);
}
}
通过 @RabbitListener 注解去标记消费者,当监听的队列中有消息时,就会触发该方法。
public void handle () throws UnsupportedEncodingException {
Object obj = rabbitTemplate.receiveAndConvert(Constants.DIRECT_QUEUE);
System.out.println("obj = " + new String (((byte []) obj), "UTF-8" ));
}
调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能 。
确认机制 为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除
当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者
将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
待消费的消息
已经投递给消费者,但是还没有被消费者确认的消息
当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
综上所述,无论是手动 Ack 或者自动 Ack,最终都有可能导致消息被重复消费,所以一般来说我们还需要再处理消息时,解决幂等性问题。
自动确认 在 Spring Boot 中,默认情况下,消息消费就是自动确认的。
@Component
public class ConsumerTest {
@RabbitListener(queues = Constants.HELLO_WORLD_QUEUE)
public void receive (String msg) {
System.out.println("receive msg: " + msg);
int i = 1 / 0 ;
}
}
默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。
手动确认
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: MANUAL
@RabbitListener(queues = Constants.DIRECT_QUEUE)
public void handle (Message message, Channel channel) {
try {
String s = new String (message.getBody());
System.out.println("msg = " + s);
channel.basicAck(deliveryTag, false );
int i = 1 / 0 ;
} catch (Exception e) {
channel.basicNack(deliveryTag, false , false );
throw new RuntimeException (e);
}
}
将消费者要做的事情放到一个 try..catch 代码块中,如果消息正常消费成功,则执行 basicAck 完成确认,如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。
basicAck :这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功
basicNack :这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队
当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题 。
public void receive () {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false );
long deliveryTag = 0L ;
try {
GetResponse getResponse = channel.basicGet(Constants.DIRECT_QUEUE, false );
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
System.out.println("o = " + new String ((getResponse.getBody()), "UTF-8" ));
channel.basicAck(deliveryTag, false );
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false , true );
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
消息拒绝 当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。
@RabbitListener(queues = Constants.DIRECT_QUEUE)
public void receive (Channel channel, Message message) {
long deliverTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicReject(deliverTag, true )
} catch (IOException e) {
e.printStackTrace();
}
}
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
获取消息编号 deliveryTag
调用 basicReject 方法拒绝消息
调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
basicReject 方法一次只能拒绝一条消息 。
命令行 docker exec -it rabbitmq /bin/bash
用户操作 添加一个用户名为 freedom,密码为 freedom 的用户
rabbitmqctl add_user freedom freedom
rabbitmqctl change_password freedom freedom
rabbitmqctl authenticate_user freedom freedom
给用户 freedom 设置 administrator 角色
rabbitmqctl set_user_tags freedom administrator
rabbitmqctl delete_user freedom
Vhost 操作 rabbitmqctl add_vhost freedom
rabbitmqctl delete_vhost freedom
rabbitmqctl set_permissions -p freedom freedom ".*" ".*" ".*"
rabbitmqctl -p freedom list_permissions
rabbitmqctl clean_permissions -p freedom freedom
rabbitmqctl list_user_permissions lisi
集群
普通集群 普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
镜像集群 它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
节点类型
RAM node :内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快
Disk node :将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息
RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online