一、Work Queues(工作队列模式)
工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收。
每个工作模式的实现,都先需要引入 RabbitMQ 的依赖:
RabbitMQ 支持多种消息传递模式,包括工作队列、发布订阅、路由、通配符、RPC 通信及发布确认机制。内容详细阐述了各模式的原理与 Java 客户端实现代码,涵盖生产者消费者竞争消费、交换机类型绑定规则、RPC 回调队列设计以及防止消息丢失的确认策略。通过对比单独确认、批量确认和异步确认的性能差异,帮助开发者根据业务场景选择合适的可靠性方案。

工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收。
每个工作模式的实现,都先需要引入 RabbitMQ 的依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
生产者步骤:
package org.example.rabbitmq.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
// 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 开启信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("workQueues", true, false, true, null);
// 发送消息
for (int i = 0; i < 10; i++) {
String msg = "hello workQueues" + i;
channel.basicPublish("", "workQueues", null, msg.getBytes());
}
System.out.println("消息发送成功");
// 资源释放
channel.close();
connection.close();
}
}
我们创建两个消费者模拟。消费者步骤:
package org.example.rabbitmq.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("workQueues", false, false, true, null);
// 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("workQueues", true, consumer);
Thread.sleep(100);
// 释放资源
// channel.close();
// connection.close();
}
}
在发布/订阅模型中,多了一个 Exchange 角色。
Exchange 常见有三种类型,分别代表不同的路由规则,也就分别对应不同的工作模式:
生产者步骤:
声明交换机的方法是 Channel 类下的 exchangeDeclare 方法。
package org.example.rabbitmq.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 2. 创建信道
Channel channel = connection.createChannel();
// 3. 声明交换机
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT, false);
// 4. 声明队列
channel.queueDeclare("fanout.queue1", true, false, false, null);
channel.queueDeclare("fanout.queue2", true, false, false, null);
// 5. 交换机与队列绑定
channel.queueBind("fanout.queue1", "fanout.exchange", "");
channel.queueBind("fanout.queue2", "fanout.exchange", "");
// 6. 生产消息
for (int i = 0; i < 10; i++) {
String msg = "hello fanout" + i;
channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
}
System.out.println("发送消息成功");
// 7. 释放资源
channel.close();
connection.close();
}
}
两个消费者分别消费两个队列的消息。
消费者步骤:
package org.example.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("fanout.queue1", true, false, false, null);
// 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("fanout.queue1", consumer);
/* //释放资源
channel.close();
connection.close();*/
}
}
Routing(路由模式):队列和交换机的绑定,不能是任意的绑定了,而是要指定一个 BindingKey(RoutingKey 的一种),消息的发送方在向 Exchange 发送消息时,也需要指定消息的 RoutingKey,Exchange 也不再把消息交给每一个绑定的 key,而是根据消息的 RoutingKey 进行判断,只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致,才会接收到消息。
生产者步骤:
package org.example.rabbitmq.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 2. 创建信道
Channel channel = connection.createChannel();
// 3. 声明交换机
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
// 4. 声明队列
channel.queueDeclare("direct.queue1", true, false, false, null);
channel.queueDeclare("direct.queue2", true, false, false, null);
// 5. 队列与交换机绑定
channel.queueBind("direct.queue1", "direct.exchange", "a");
channel.queueBind("direct.queue2", "direct.exchange", "a");
channel.queueBind("direct.queue2", "direct.exchange", "b");
channel.queueBind("direct.queue2", "direct.exchange", "c");
// 6. 生产消息
String msgA = "hello direct routingKey is a";
channel.basicPublish("direct.exchange", "a", null, msgA.getBytes());
String msgB = "hello direct routingKey is b";
channel.basicPublish("direct.exchange", "b", null, msgB.getBytes());
String msgC = "hello direct routingKey is c";
channel.basicPublish("direct.exchange", "c", null, msgC.getBytes());
System.out.println("发送消息成功");
// 7. 释放资源
channel.close();
connection.close();
}
}
两个消费者分别消费两个队列的消息。
消费者步骤:
package org.example.rabbitmq.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("direct.queue1", true, false, false, null);
// 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("direct.queue1", consumer);
/* //释放资源
channel.close();
connection.close();*/
}
}
Topics 和 Routing 模式的区别是:
在 topic 类型的交换机在匹配规则上,有些要求:
* 表示一个单词# 表示多个单词 (0-N 个)生产者步骤:
package org.example.rabbitmq.topics;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 2. 创建信道
Channel channel = connection.createChannel();
// 3. 声明交换机
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC, true);
// 4. 声明队列
channel.queueDeclare("topic.queue1", true, false, false, null);
channel.queueDeclare("topic.queue2", true, false, false, null);
// 5. 队列与交换机绑定
channel.queueBind("topic.queue1", "topic.exchange", "*.a.*");
channel.queueBind("topic.queue2", "topic.exchange", "*.*.b");
channel.queueBind("topic.queue2", "topic.exchange", "c.#");
// 6. 生产消息
String msgA = "hello topic routingKey is word.a.word";
channel.basicPublish("topic.exchange", "word.a.word", null, msgA.getBytes());
String msgB = "hello topic routingKey is word.word.b";
channel.basicPublish("topic.exchange", "word.word.b", null, msgB.getBytes());
String msgC = "hello topic routingKey is c.word.word.word.word.b";
channel.basicPublish("topic.exchange", "c.word.word.word.word.b", null, msgC.getBytes());
String msgD = "hello topic routingKey is c.a.b";
channel.basicPublish("topic.exchange", "c.a.b", null, msgD.getBytes());
System.out.println("发送消息成功");
// 7. 释放资源
channel.close();
connection.close();
}
}
两个消费者分别消费两个队列的消息。
消费者步骤:
package org.example.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topic.queue1", true, false, false, null);
// 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("topic.queue1", consumer);
// 释放资源
// channel.close();
// connection.close();
}
}
RPC(Remote Procedure Call),即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。类似于 Http 远程调用。
RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程。
大概流程如下:
客户端步骤:
package org.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class Client {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 2. 创建信道
Channel channel = connection.createChannel();
// 3. 声明队列
channel.queueDeclare("rpc.request.queue", true, false, false, null);
channel.queueDeclare("rpc.response.queue", true, false, false, null);
// 4. 发送消息
// 设置请求标识
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationId)
.replyTo("rpc.response.queue")
.build();
String msg = "hello rpc";
channel.basicPublish("", "rpc.request.queue", properties, msg.getBytes());
// 5. 接收响应
// 使用阻塞队列存储响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Client 接收到消息:" + new String(body));
// 判断唯一标识正确,放到阻塞队列中
if (correlationId.equals(properties.getCorrelationId())) {
response.offer(new String(body));
}
}
};
channel.basicConsume("rpc.response.queue", true, consumer);
// 获取响应的结果
String result = response.take();
System.out.println(" [RPC_Client] Result:" + result);
}
}
服务器步骤:
package org.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Server {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 2. 创建信道
Channel channel = connection.createChannel();
// 接收请求
// 每次接受一条
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Server 接收到请求:" + new String(body));
// 响应请求
String response = "响应";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.replyTo("rpc.request.queue")
.build();
channel.basicPublish("", "rpc.response.queue", props, response.getBytes());
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("rpc.request.queue", false, consumer);
}
}
消息中间件都会有消息丢失的问题发生,大概分为以下三种丢失情况:
RabbitMQ 针对上面三种情况给出的解决方案:
生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始); 一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了; 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出。 broker 回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号,此外 broker 也可以设置 channel.basicAck() 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理。
跟生产者发送消息,只有调用 Channel 类 confirmSelect() 设置信道为 confirm 模式,和 Channel 类 waitForConfirmsOrDie() 方法等待手动确认。
private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
try (Channel channel = createChannel()) {
// 设置信道为 confirm 模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare("publish.confirm.queue1", true, false, false, null);
long start = System.currentTimeMillis();
// 发送消息
for (int i = 0; i < 200; i++) {
String msg = "Publishing Messages Individually " + i;
channel.basicPublish("", "publish.confirm.queue1", null, msg.getBytes());
// 等待确认
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("Publishing Messages Individually(单独确认) 发送 200 条消息耗时 " + (end - start));
}
}
可以发现,发送 200 条消息,耗时很长。
观察上面代码,会发现这种策略是每发送一条消息后就调用 channel.waitForConfirmsOrDie() 方法,之后等待服务端的确认,这实际上是一种串行同步等待的方式。 尤其对于持久化的消息来说,需要等待消息确认存储在磁盘之后才会返回(调用 Linux 内核的 fsync 方法)。 但是发布确认机制是支持异步的。可以一边发送消息,一边等待消息确认。
每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回。
跟单独确认区别就是,发送到一定消息再进行等待确认。
private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {
try (Channel channel = createChannel()) {
// 设置信道为 confirm 模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare("publish.confirm.queue2", true, false, false, null);
long start = System.currentTimeMillis();
// 发送消息
int batchSize = 100;
int flag = 0;
for (int i = 0; i < 200; i++) {
String msg = "Publishing Messages in Batches " + i;
channel.basicPublish("", "publish.confirm.queue2", null, msg.getBytes());
// 批量 等待确认
if (flag == batchSize) {
channel.waitForConfirmsOrDie(5000);
flag = 0;
}
flag++;
}
if (flag > 0) {
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("Publishing Messages in Batches(批量确认) 发送 200 条消息耗时 " + (end - start));
}
}
相比于单独确认策略,批量确认极大地提升了 confirm 的效率, 缺点是出现 Basic.Nack 或者超时时,我们不清楚具体哪条消息出了问题。客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量。 当消息经常丢失时,批量确认的性能应该是不升反降的。
提供一个回调方法,服务端确认了一条或者多条消息后客户端会回这个方法进行处理。
异步 confirm 方法的编程实现最为复杂。Channel 接口提供了一个方法 addConfirmListener,这个方法可以添加 ConfirmListener 回调接口。
ConfirmListener 接口中包含两个方法:handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple),分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack。
deliveryTag 表示发送消息的序号,multiple 表示是否批量确认。我们需要为每一个 Channel 维护一个已发送消息的序号集合。 当收到 RabbitMQ 的 confirm 回调时,从集合中删除对应的消息。当 Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号。
我们可以使用 SortedSet 的有序性来维护这个已发消息的集合。
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
try (Channel channel = createChannel()) {
// 设置信道为 confirm 模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare("publish.confirm.queue3", true, false, false, null);
// 有序集合,元素按照自然顺序进行排序,存储未 confirm 消息序号
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
long start = System.currentTimeMillis();
// 监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 批量确认:将集合中小于等于当前序号 deliveryTag 元素的集合清除,表示这批序号的消息都已经被 ack 了
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
// 单独确认
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 批量确认:将集合中小于等于当前序号 deliveryTag 元素的集合清除,表示这批序号的消息都已经被 ack 了
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
// 单独确认
confirmSet.remove(deliveryTag);
}
// 根据业务处理
}
});
// 发送消息
int flag = 0;
for (int i = 0; i < 200; i++) {
String msg = "Handling Publisher Confirms Asynchronously " + i;
// 得到下次发送消息的序号,从 1 开始
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", "publish.confirm.queue3", null, msg.getBytes());
// 存入集合
confirmSet.add(nextPublishSeqNo);
}
while (confirmSet.isEmpty()) {
Thread.sleep(20);
}
long end = System.currentTimeMillis();
System.out.println("Handling Publisher Confirms Asynchronously(异步确认) 发送 200 条消息耗时 " + (end - start));
}
}

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online