跳到主要内容RabbitMQ 与原生客户端介绍 | 极客日志Javajava
RabbitMQ 与原生客户端介绍
消息队列(MQ)的四大应用场景,包括异步解耦、流量削峰、消息订阅分发及延迟通知。详细解析了 RabbitMQ 的核心架构组件,如 AMQP 协议、连接、通道、虚拟主机、交换机和队列及其工作流程。重点讲解了四种交换机类型(Fanout、Direct、Topic、Headers)以及七种工作模式(Simple、Work Queue、Publish/Subscribe、Routing、Topics、RPC、Publisher Confirms),并提供了基于 Java 原生客户端的代码示例和性能对比分析。
静心4 浏览 1. 异步解耦
业务中一些操作非常耗时,但是又不需要立即得到响应时,可以借助中间件 MQ,在不影响主线程的情况下,利用消息队列进行通信。典型场景就是在注册场景中,就是将用户信息插入到数据库中后,通常会给用户发送一个邮件来表达欢迎,这时发送邮件消息就可以交给 MQ 来在后台进行,不会影响主业务的运行。
2. 流量削峰
在访问量突增的场景中,服务器仍想要维持正常运行,这时可以将请求存入到消息队列,按照系统能处理的力度来逐渐将这些请求消化。
3. 消息订阅分发
当多个系统都依赖同一数据变化来进行响应时,可以使用 MQ 来进行消息数据的分发,即一个系统将该数据改变时,会将消息发给 MQ,其他需要作出响应的系统通过订阅该消息,从而拉取数据,这就不需要多次轮询数据库了。
4. 延迟通知
需要在特定时间后才发送通知的场景中,可以使用 MQ 的延迟消息功能,典型场景就是下单后 30 分钟内未支付,会将未支付的消息分发给订单系统来自动取消订单。
二、RabbitMQ 的结构分析与工作流程
1. AMQP 协议
RabbitMQ 基于 AMQP 协议进行通信的,AMQP 是一种高级消息队列协议,将交换机和队列等组件组织了起来,进行接收和发送消息。
2. 客户端与服务器
Broker 作为 RabbitMQ 的服务器,负责接收和发送消息,分别有生产者 (Producer) 和消费者 (Consumer) 两种客户端,生产者负责生产消息给消息队列,消费者通过消息队列来进行消费。
3. Connection
网络连接,它允许客户端与 RabbitMQ 通信,是客户端和 RabbitMQ 服务器的一个 TCP 连接,这个连接是通信的基础,负责传输所有的数据。
4. Channel
通道,基于 Connection 的一个抽象层,一个 Connection(TCP 连接) 可以有多个通道,主要是将消息的读写操作复用到一个 TCP 连接上。Channel 既可以接收处理消息也可以同时发送消息 (通常的用法一些 Channel 专门负责接收,一些 Channel 专门负责发送),减少了建立与销毁 TCP 的开销。每个 Channel 都是独立的虚拟连接,消息的接受与发送都要通过 Channel。
5. Virtual host
虚拟主机,为消息队列提供了逻辑上的虚拟隔离,一个 BrokerServer(RabbitMQ 服务器) 上可以有多个 Virtual host,不同的用户使用同一个 RabbitMQ 服务时,可以划分多个虚拟机来作区分,每个虚拟机又有自己的交换机和队列。看到这里你是不是感觉很熟悉,对的,和我们学过的 MySQL 上的 database(数据库) 的概念类似,一个服务可以有多个数据库,每个数据库又有单独的表,库与库之间相互独立。
6. Exchange
交换机,存在于虚拟机当中。负责接收生产者发送的信息,并根据自身的路由算法和规则,将消息路由到对应的一或多个 Queue(队列) 中,如果没有相对应的队列会根据消息的 mandatory 标志来决定销毁 (false) 还是返还 (true)。
7. Queue
队列,是 RabbitMQ 的内部对象,消息的实际存储者,多个消费者可以订阅同一个队列,当然一个消费者也可以订阅多个队列。
8. 工作流程
(1) 创建连接
生产者 (Producer) 与 RabbitMQ 服务器 (Broker) 建立 TCP 连接 (Connection),开辟通道 (Channel)。
(2) 声明交换机和队列
Producer 声明一个交换机 (Exchange),并将队列 (Queue) 绑定到交换机上。
(3) 发送消息
(4) 存储消息
Broker 接收到消息后,通过交换机存入到对应的 Queue 中,如果没有找到队列,会根据 Headers 标志 (上面有生产者预留的配置),来确定消息的丢弃或者返还。
(5) 消费消息
消费者订阅了队列之后,当有新消息加入时,会从 Queue 中获取消息来处理,并向 RabbitMQ 发送消息确认收到。
(6) 删除消息
消息被确认后,RabbitMQ 会将该消息从队列中删除。
三、RabbitMQ 的四种交换机类型
1. 前面我们了解到,交换机 (Exchange) 会根据自身的路由策略来将消息转发到相应的队列 (Queue) 中,这里我们来讲解 RabbitMQ 实现的四种类型的交换机,不同类型意味着路由策略不同。
2. 而在 AMQP 中共有 6 种不同类型的交换机,但因为 System 和自定义类型的交换机,在 RabbitMQ 没有实现,这里不会解释。
3. 在生产者发送消息时,会与交换机发送一个 RoutingKey,用来告诉交换机该如何处理,在 RabbitMQ 中,交换机与队列之间 BindingKey 联系起来,这里 BindingKey 其实属于是 RoutingKey 的另一种类型。
1. Fanout 类型
广播类型的交换机,会将收到的消息转发到所有与之绑定的队列上。
2. Direct
定向类型的交换机,这里交换机会将消息发送给 RoutingKey 与 BindingKey 相同的队列。
3. Topic
通配符匹配类型的队列,会把消息发给符合 Routing Pattern(匹配规则) 的队列。
不依赖路由键匹配规则,通过消息中的 Headers 属性进行匹配队列,但性能很差,基本很少见到。
四、RabbitMQ 的七种工作模式
① RabbitMQ 的工作模式与交换机类型息息相关,可以说有的交换机类型直接就是一种工作模式。
② 这里在前两种模式中,交换机只起到转发作用,没有起到路由作用,为了和其他工作模式作区分就没有画出,图片摘自官方。
1. Simple 模式
(1) 解释
简单模式,一个生产者,一个消息队列,一个消费者,一个消息也只能被消费一次,这也被称为点对点模式 (Point-to-Point)。
(2) 生产者代码
① 通过上面 RabbitMQ 的工作流程,来一步步写出代码,要注意,我们这里使用的是 RabbitMQ 内置的交换机,并没有声明新的交换机。
② 内置交换机的名称为空字符,特性就是会将消息路由到 RoutingKey 和 Queue 名称一致的队列上。
③ 关闭资源时需要注意,因为 Channel 是基于 Connection 的,所以要先关闭 Channel 再关闭 Connection 资源,如果顺序颠倒会报连接不存在的错误。
package com.ran.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ran.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);
for (int i = 0; i < 10; i++) {
String msg = "hello, 我是景画 - " + i;
channel.basicPublish("", Constants.SIMPLE_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
(3) 消费者代码
package com.ran.simple;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者标签:" + consumerTag);
System.out.println("封包信息:" + envelope);
System.out.println("收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.SIMPLE_QUEUE, true, consumer);
channel.close();
connection.close();
}
}
(4) 公共代码
package com.ran.constant;
public class Constants {
public static final String HOST = "localhost";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String VIRTUAL_HOST = "ran";
public static final String SIMPLE_QUEUE = "hello";
}
(5) 效果图
2. Work Queue(工作队列模式/竞争消费模式)
(1) 解释
**1. 一个生产者,多个消费者,队列中的消息会被多个消费者竞争消费,这就意味着同一消息不会分配给不同队列,每个队列中的消息都不同。
- 这里需要先启动消费者代码再启动生产者代码,因为有多个消费者,有消息时先启动任意一个消费者都会导致消息分配不均。**
适用场景:集群中做异步处理,典型例子是订票时,订单系统会将多个订票消息发送给 RabbitMQ 服务器,短信服务就通过从 RabbitMQ 获取信息来进行处理,这里会有多个短信服务,每个短信服务负责处理一批信息 (订票短信肯定也不需要重复发送吧)。
(2) 生产者代码
package com.ran.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ran.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
for (int i = 0; i < 10; i++) {
String msg = "hello, 我是景画 - " + i;
channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
(3) 消费者 1 代码
package com.ran.work;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者标签:" + consumerTag);
System.out.println("封包信息:" + envelope);
System.out.println("收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
}
}
(4) 消费者 2 代码
package com.ran.work;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者标签:" + consumerTag);
System.out.println("封包信息:" + envelope);
System.out.println("收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
}
}
(5) 公共代码
package com.ran.constant;
public class Constants {
public static final String HOST = "localhost";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String VIRTUAL_HOST = "ran";
public static final String SIMPLE_QUEUE = "hello";
public static final String WORK_QUEUE = "work.queue";
}
(6) 效果图
3. Publish/Subscribe(发布/订阅)
(1) 解释
① 发布订阅模式,交换机会将消息复制多份,无条件来转发给所有与之绑定的队列,来确保每一个消费者 (订阅者) 都能收到同一份消息,是不是和 Fanout 类型交换机的描述很类似!
② 典型应用场景是气象局发布天气预报消息到交换机,各大门户网站通过将自建的队列与该交换机绑定后,来自动获取天气信息。
(2) 生产者代码
这里我们不再使用内置交换机,而是自己声明了一个 Fanout 类型的交换机,并声明了两个队列,两个队列与交换机绑定时的 routingkey 参数,就是 BindingKey,这里为空字符串,作用是交换机把所有消息都转发到队列上。
package com.ran.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ran.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
String msg = "hello, 发布订阅模式测试~";
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
(3) 消费者 1 代码
package com.ran.fanout;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者标签:" + consumerTag);
System.out.println("封包信息:" + envelope);
System.out.println("收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
}
}
(4) 消费者 2 代码
package com.ran.fanout;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者标签:" + consumerTag);
System.out.println("封包信息:" + envelope);
System.out.println("收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);
}
}
(5) 公共代码
package com.ran.constant;
public class Constants {
public static final String HOST = "localhost";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String VIRTUAL_HOST = "ran";
public static final String SIMPLE_QUEUE = "hello";
public static final String WORK_QUEUE = "work.queue";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
}
(6) 效果图
可以看出,我们只发送了一次消息,但是交换机路由到了两个队列中
4. Routing(路由模式)
(1) 解释
① 发布订阅模式的一个变种,交换机根据 RoutingKey 的规则,来发送给与 BindingKey 匹配的队列,消费者再获取信息。
② 应用场景:系统日志的打印,日志等级分为 error, warning, info, debug,通过路由到不同队列,来输出到不同的日志文件。
(2) 生产者代码
这里交换机类型变为了 Direct,只有当 BindingKey 和 RoutingKey 完全匹配时,交换机才会转发消息到队列上。
package com.ran.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ran.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
String msg = "hello, 我的 routingkey 为 a";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg.getBytes(StandardCharsets.UTF_8));
String msg_b = "hello, 我的 routingkey 为 b";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes(StandardCharsets.UTF_8));
String msg_c = "hello, 我的 routingkey 为 c";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
(3) 消费者 1 代码
package com.ran.direct;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者标签:" + consumerTag);
System.out.println("封包信息:" + envelope);
System.out.println("收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
}
}
(4) 消费者 2 代码
package com.ran.direct;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);
}
}
(5) 公共代码
package com.ran.constant;
public class Constants {
public static final String HOST = "localhost";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String VIRTUAL_HOST = "ran";
public static final String SIMPLE_QUEUE = "hello";
public static final String WORK_QUEUE = "work.queue";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
}
(6) 效果图
5. Topics(通配符模式)
(1) 解释
**路由模式的进步版本,在 BindingKey 基础上,又增加了通配符匹配的功能,RoutingKey 与 BindingKey 匹配更加灵活,在 RoutingKey/BindingKey 中,是通过 . 来区分一个个单词的。
- (star) can substitute for exactly one word -> * 号可以匹配单个任意单词
(hash) can substitute for zero or more words -> # 号可以匹配任意多个单词**
(2) 生产者代码
package com.ran.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ran.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
String msg = "hello, 我的 routingkey 为 jh.a.ran";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "jh.a.ran", null, msg.getBytes(StandardCharsets.UTF_8));
String msg_b = "hello, 我的 routingkey 为 jh.aa.b";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "jh.aa.b", null, msg_b.getBytes(StandardCharsets.UTF_8));
String msg_c = "hello, 我的 routingkey 为 c.jh";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.jh", null, msg_c.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
(3) 消费者 1 代码
package com.ran.topic;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("队列 1 收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
}
}
(4) 消费者 2 代码
package com.ran.topic;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("队列 2 收到的消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);
}
}
(5) 公共代码
package com.ran.constant;
public class Constants {
public static final String HOST = "localhost";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String VIRTUAL_HOST = "ran";
public static final String SIMPLE_QUEUE = "hello";
public static final String WORK_QUEUE = "work.queue";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
}
(6) 效果图
6. RPC(RPC 通信)
(1) 解释
1. RPC 模式没有生产者和消费者的概念,而是变为了客户端和服务器,这里请求消息有两个字段,replyTo 用来指定队列,correlationId 作用是用来标识是哪个请求消息。
2. 客户端 (Client) 将消息发送到一个指定的存储请求消息的队列,并且消息的属性中设置了 replyTo 字段,指定了专属的响应队列,用来接收服务器响应的消息。
3. 服务器 (Server) 处理请求后,将响应的消息发送到 replyTo 指定的队列,并在响应消息中携带与请求相同的 correlationId,确保客户端能正确关联。
4. 客户端检查响应队列的状态,当加入新响应时,会先检查 correlationId 是否和之前请求相等。
(2) 客户端代码
package com.ran.rpc;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
String msg = "hello, rpc......";
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("请求发送成功");
BlockingDeque<String> q = new LinkedBlockingDeque<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String res = new String(body);
System.out.println("接收到消息:" + res);
if (properties.getCorrelationId().equals(correlationId)) {
try {
q.put(res);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
String res = q.take();
System.out.println("RPC 响应结果:" + res);
}
}
(3) 服务器代码
package com.ran.rpc;
import com.rabbitmq.client.*;
import com.ran.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body);
String res = "对 [" + request + "] 做出响应";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, props, res.getBytes(StandardCharsets.UTF_8));
System.out.println("响应发送成功");
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
}
}
(4) 公共代码
package com.ran.constant;
public class Constants {
public static final String HOST = "localhost";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String VIRTUAL_HOST = "ran";
public static final String SIMPLE_QUEUE = "hello";
public static final String WORK_QUEUE = "work.queue";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
}
(5) 效果图
7. Publisher Confirms(发布确认)
(1) 解释
**1. 是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制,可以确保生产者的消息被 RabbitMQ 服务器成功接收,避免丢失,安全性较高。
- 生产者将通道 (Channel) 设置为 Confirm 模式,每一条消息都会获得一个独特的 ID。
- 当消息被 RabbitMQ 服务器接收储存后,会异步向生产者发送一个包含独特 ID 的确认信息 (ACK),来告诉生产者,消息已经到达。**
消息队列作为中间件,肯定会面临消息丢失的问题,大概有三大类:
1. 生产者问题:生产者程序因为网络或其他原因,导致向 RabbitMQ 服务器 Broker 没有发送成功。解决方式:我们下面叙述的 Publisher Confirms(发布确认) 机制。
2. 中间件问题:Broker 因为自身原因,没有将消息保存完整或者丢失。解决方式:Broker 收到消息后将其持久化到硬盘上。
3. 消费者问题:Broker 发送消息后,消费者在处理消息时,因为特殊原因处理失败,但是 Broker 也将消息从队列中删除了。解决方式:上面 RPC 通信中我们用到的手动确认方式,消费者正确处理消息后,再发送应答,Broker 才会将消息从队列中删除。
在 Publisher Confirms(发布确认) 机制中,又分为三个策略,分别是单独确认,批量确认,异步确认。
(2) Publishing Messages Individually(单独确认)
每发送一个信息后就进行确认,缺点是费时费力,性能较低。
private static void PublishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = create()) {
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, false, null);
long start = System.currentTimeMillis();
for (int i = 0; i < COUNT; i++) {
String msg = "hello publisher confirm" + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));
channel.waitForConfirmsOrDie(500);
}
long end = System.currentTimeMillis();
System.out.printf("单独确认策略,消息数:%d, 耗时:%d ms \n", COUNT, end - start);
}
}
(3) Publishing Messages in Batches(批量确认)
每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回。
private static void PublishingMessagesInBatches() throws Exception {
try (Connection connection = create()) {
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, false, null);
long batchCount = 500;
long outstandingMessageCount = 0;
long start = System.currentTimeMillis();
for (int i = 0; i < COUNT; i++) {
String msg = "hello publisher confirm" + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, msg.getBytes(StandardCharsets.UTF_8));
outstandingMessageCount++;
if (outstandingMessageCount == batchCount) {
channel.waitForConfirmsOrDie(500);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(500);
}
long end = System.currentTimeMillis();
System.out.printf("批量确认策略,消息数:%d, 耗时:%d ms \n", COUNT, end - start);
}
}
(4) Handling Publisher Confirms Asynchronously(异步确认)
该策略需要监听消息确认的状态,创建一个有序结合 SortedSet,来存储未确认消息的 deliveryTag,当 Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号,当收到 ack 时,从 SortedSet 中删除该消息的序号。如果为批量认消息,表示小于等于当前序号 deliveryTag 的消息都收到了,则清除对应标签。

private static void HandlingPublisherConfirmsAsynchronously() throws Exception {
try (Connection connection = create()) {
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, false, null);
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
long start = System.currentTimeMillis();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSeqNo.headSet(deliveryTag + 1).clear();
} else {
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSeqNo.headSet(deliveryTag + 1).clear();
} else {
confirmSeqNo.remove(deliveryTag);
}
}
});
for (int i = 0; i < COUNT; i++) {
String msg = "hello publisher confirm" + i;
long seqNoDeliveryTag = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, msg.getBytes(StandardCharsets.UTF_8));
confirmSeqNo.add(seqNoDeliveryTag);
}
while (!confirmSeqNo.isEmpty()) {
Thread.sleep(1);
}
long end = System.currentTimeMillis();
System.out.printf("异步确认策略,消息数:%d, 耗时:%d ms \n", COUNT, end - start);
}
}
(5) 耗时图
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online