跳到主要内容Javajava
RabbitMQ 核心工作模式实战解析
综述由AI生成详细解析了 RabbitMQ 的六种核心工作模式,包括工作队列、发布订阅、路由、通配符、RPC 通信及发布确认机制。通过 Java 代码示例,展示了如何配置交换机、队列及绑定规则,重点讲解了 Fanout、Direct、Topic 三种交换机的区别与应用场景。此外,针对消息可靠性问题,对比了单独确认、批量确认和异步确认三种 Publisher Confirms 实现方式,分析了各自的性能特点与适用情况,帮助开发者在实际工程中构建稳定可靠的消息系统。
beaabea3 浏览 RabbitMQ 核心工作模式实战解析
RabbitMQ 作为广泛使用的消息中间件,其强大的路由能力支撑了多种业务场景。本文将深入探讨六种常见的工作模式,通过 Java 代码示例,带你从原理到实践,掌握如何在实际开发中灵活运用。
一、Work Queues(工作队列模式)
简单模式下,消息通常由单个消费者处理。而在工作队列模式中,我们引入了多个消费者,它们之间形成竞争关系。每个消息只会被其中一个消费者接收并处理,从而实现负载均衡。
在开始之前,我们需要引入 RabbitMQ 的客户端依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
1.1 生产者实现
生产者的逻辑相对固定:建立连接、创建信道、声明队列、发送消息并释放资源。为了演示方便,这里使用本地环境配置。
package org.example.rabbitmq.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort();
connectionFactory.setUsername();
connectionFactory.setPassword();
connectionFactory.setVirtualHost();
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(, , , , );
( ; i < ; i++) {
+ i;
channel.basicPublish(, , , msg.getBytes());
}
System.out.println();
channel.close();
connection.close();
}
}
5672
"guest"
"guest"
"/"
Connection
connection
=
Channel
channel
=
"workQueues"
true
false
false
null
for
int
i
=
0
10
String
msg
=
"hello workQueues"
""
"workQueues"
null
"消息发送成功"
1.2 消费者实现
我们模拟两个消费者同时运行。关键点在于 basicConsume 的第二个参数设为 true,表示自动确认。如果设为 false,则需要手动 ACK,这在后续章节会详细讨论。
package org.example.rabbitmq.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("workQueues", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body);
System.out.println("Consumer1 接收到消息:" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("workQueues", true, consumer);
Thread.sleep(10000);
}
}
二、Publish/Subscribe(发布/订阅模式)
发布/订阅模式引入了一个核心概念:Exchange(交换机)。消息不再直接发送给队列,而是先发给交换机,再由交换机根据规则分发给绑定的队列。
- Fanout:广播模式,将消息发送给所有绑定到该交换机的队列。
- Direct:定向模式,根据 RoutingKey 精确匹配。
- Topic:通配符模式,支持模糊匹配。
2.1 生产者实现
在 Fanout 模式下,我们不需要关心 RoutingKey,因为交换机只会做广播。
package org.example.rabbitmq.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT, false);
channel.queueDeclare("fanout.queue1", true, false, false, null);
channel.queueDeclare("fanout.queue2", true, false, false, null);
channel.queueBind("fanout.queue1", "fanout.exchange", "");
channel.queueBind("fanout.queue2", "fanout.exchange", "");
for (int i = 0; i < 10; i++) {
String msg = "hello fanout" + i;
channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
}
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
2.2 消费者实现
package org.example.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("fanout.queue1", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("fanout.queue1", true, consumer);
}
}
三、Routing(路由模式)
与 Fanout 不同,Direct 模式的交换机会根据 RoutingKey 进行精确匹配。只有当消息的 RoutingKey 与队列绑定的 BindingKey 完全一致时,消息才会被投递。
3.1 生产者实现
注意观察代码中的 queueBind 方法,它需要指定 BindingKey。发送消息时,basicPublish 也需要指定 RoutingKey。
package org.example.rabbitmq.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
channel.queueDeclare("direct.queue1", true, false, false, null);
channel.queueDeclare("direct.queue2", true, false, false, null);
channel.queueBind("direct.queue1", "direct.exchange", "a");
channel.queueBind("direct.queue2", "direct.exchange", "a");
channel.queueBind("direct.queue2", "direct.exchange", "b");
channel.queueBind("direct.queue2", "direct.exchange", "c");
channel.basicPublish("direct.exchange", "a", null, "msg a".getBytes());
channel.basicPublish("direct.exchange", "b", null, "msg b".getBytes());
channel.basicPublish("direct.exchange", "c", null, "msg c".getBytes());
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
3.2 消费者实现
消费者监听各自的队列,Queue1 只会收到 Key 为 "a" 的消息,Queue2 会收到 "a", "b", "c" 的消息。
package org.example.rabbitmq.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("direct.queue1", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("direct.queue1", true, consumer);
}
}
四、Topics(通配符模式)
Topics 模式是 Direct 模式的扩展,允许更灵活的路由规则。BindingKey 和 RoutingKey 都可以包含通配符。
*:匹配一个单词(例如 stock.usd.nyse 中的 usd)。
#:匹配零个或多个单词。
4.1 生产者实现
package org.example.rabbitmq.topics;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC, true);
channel.queueDeclare("topic.queue1", true, false, false, null);
channel.queueDeclare("topic.queue2", true, false, false, null);
channel.queueBind("topic.queue1", "topic.exchange", "*.a.*");
channel.queueBind("topic.queue2", "topic.exchange", "*.*.b");
channel.queueBind("topic.queue2", "topic.exchange", "c.#");
channel.basicPublish("topic.exchange", "word.a.word", null, "msg A".getBytes());
channel.basicPublish("topic.exchange", "word.word.b", null, "msg B".getBytes());
channel.basicPublish("topic.exchange", "c.word.word.word.word.b", null, "msg C".getBytes());
channel.basicPublish("topic.exchange", "c.a.b", null, "msg D".getBytes());
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
4.2 消费者实现
package org.example.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("topic.queue1", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("topic.queue1", true, consumer);
}
}
五、RPC 通信
RPC(远程过程调用)在消息队列中实现起来比较特殊。RabbitMQ 本身不支持原生的 RPC,但可以通过队列和回调机制模拟。
- 客户端发送请求消息,并在属性中设置
replyTo(回调队列)和 correlationId(请求标识)。
- 服务端消费请求,处理后发送响应到
replyTo 队列,携带相同的 correlationId。
- 客户端监听回调队列,根据
correlationId 匹配响应。
5.1 客户端实现
package org.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class Client {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("rpc.request.queue", true, false, false, null);
channel.queueDeclare("rpc.response.queue", true, false, false, null);
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.correlationId(correlationId)
.replyTo("rpc.response.queue")
.build();
channel.basicPublish("", "rpc.request.queue", properties, "hello rpc".getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
if (properties.getCorrelationId() != null &&
properties.getCorrelationId().equals(correlationId)) {
response.offer(new String(body));
}
}
};
channel.basicConsume("rpc.response.queue", true, consumer);
String result = response.take();
System.out.println(" [RPC_Client] Result: " + result);
channel.close();
connection.close();
}
}
5.2 服务器实现
package org.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Server {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Server 接收到请求:" + new String(body));
String response = "响应结果";
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.correlationId(properties.getCorrelationId())
.replyTo("rpc.response.queue")
.build();
channel.basicPublish("", "rpc.response.queue", props, response.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("rpc.request.queue", false, consumer);
}
}
六、Publisher Confirms(发布确认)
消息丢失是分布式系统中常见的问题。RabbitMQ 提供了 Publisher Confirms 机制来确保消息可靠投递。
- 生产者问题:网络中断导致发送失败。
- Broker 问题:消息未持久化导致丢失。
- 消费者问题:消费失败导致消息被丢弃。
针对前两类,我们可以开启 Confirm 模式。一旦开启,每条消息都会被分配一个唯一的 Sequence Number。Broker 处理成功后会返回 Ack,失败则返回 Nack。
6.1 单独确认
每发送一条消息就等待一次确认。这种方式最安全,但性能较差,属于串行同步等待。
private static void publishingMessagesIndividually() throws Exception {
try (Channel channel = createChannel()) {
channel.confirmSelect();
channel.queueDeclare("publish.confirm.queue1", true, false, false, null);
long start = System.currentTimeMillis();
for (int i = 0; i < 200; i++) {
String msg = "Publishing Messages Individually " + i;
channel.basicPublish("", "publish.confirm.queue1", null, msg.getBytes());
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start));
}
}
6.2 批量确认
发送一批消息后再统一等待确认。这显著提升了吞吐量,但如果某条消息失败,可能需要重发整批数据,存在重复风险。
private static void publishingMessagesInBatches() throws Exception {
try (Channel channel = createChannel()) {
channel.confirmSelect();
channel.queueDeclare("publish.confirm.queue2", true, false, false, null);
long start = System.currentTimeMillis();
int batchSize = 100;
int flag = 0;
for (int i = 0; i < 200; i++) {
String msg = "Publishing Messages in Batches " + i;
channel.basicPublish("", "publish.confirm.queue2", null, msg.getBytes());
if (flag == batchSize) {
channel.waitForConfirmsOrDie(5000);
flag = 0;
}
flag++;
}
if (flag > 0) {
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start));
}
}
6.3 异步确认
这是最高效的方式。通过注册 ConfirmListener 回调,在后台处理 Ack/Nack。我们需要维护一个已发送消息的集合(如 SortedSet),收到确认后移除对应序号。
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
try (Channel channel = createChannel()) {
channel.confirmSelect();
channel.queueDeclare("publish.confirm.queue3", true, false, false, null);
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
long start = System.currentTimeMillis();
for (int i = 0; i < 200; i++) {
String msg = "Handling Publisher Confirms Asynchronously " + i;
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", "publish.confirm.queue3", null, msg.getBytes());
confirmSet.add(nextPublishSeqNo);
}
while (!confirmSet.isEmpty()) {
Thread.sleep(20);
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start));
}
}
通过上述六种模式的组合与优化,我们可以构建出高可用、高性能的消息驱动架构。在实际项目中,建议优先使用异步确认模式,并根据业务需求选择合适的路由策略。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online