跳到主要内容RabbitMQ 核心工作模式详解与 Java 实战 | 极客日志Javajava
RabbitMQ 核心工作模式详解与 Java 实战
RabbitMQ 支持多种消息分发模式,包括工作队列、发布订阅、路由、通配符及 RPC。本文通过 Java 客户端示例,详解了各模式下的交换机配置、队列绑定及消息确认机制,重点分析了发布确认(Confirm)的三种策略及其性能差异,帮助开发者构建高可靠的消息中间件应用。
ServerBase1 浏览 RabbitMQ 常见工作模式
在消息中间件的实际应用中,RabbitMQ 提供了多种灵活的消息分发策略。本文将通过 Java 客户端示例,逐一解析工作队列、发布订阅、路由、通配符、RPC 通信以及发布确认这六大核心模式。
所有示例均基于 amqp-client 依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
一、Work Queues(工作队列模式)
工作队列模式主要用于负载均衡。多个消费者监听同一个队列,它们之间是竞争关系,每条消息只会被其中一个消费者处理。这种机制非常适合将耗时任务分摊到多个 worker 节点上。
1.1 生产者实现
生产者的逻辑相对简单:建立连接、创建信道、声明队列、发送消息并释放资源。注意这里我们使用的是默认交换机(空字符串),直接发送到指定队列。
package org.example.rabbitmq.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort();
connectionFactory.setUsername();
connectionFactory.setPassword();
connectionFactory.setVirtualHost();
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(, , , , );
( ; i < ; i++) {
+ i;
channel.basicPublish(, , , msg.getBytes());
}
System.out.println();
channel.close();
connection.close();
}
}
5672
"study"
"study"
"study"
Connection
connection
=
Channel
channel
=
"workQueues"
true
false
true
null
for
int
i
=
0
10
String
msg
=
"hello workQueues"
""
"workQueues"
null
"消息发送成功"
1.2 消费者实现
为了模拟竞争消费,我们启动两个消费者实例。关键点在于 basicConsume 的第二个参数 autoAck。如果设为 true,消息一旦发送给消费者即被确认;若为 false,则需手动 ACK。本例中为了演示方便暂时使用自动确认,实际生产中建议根据业务可靠性需求调整。
package org.example.rabbitmq.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("workQueues", false, false, true, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("workQueues", true, consumer);
Thread.sleep(100);
}
}
二、Publish/Subscribe(发布/订阅)
发布/订阅模式引入了 Exchange(交换机)的概念。生产者不再直接将消息发给队列,而是发给交换机,交换机根据类型将消息路由到绑定的队列。
- Fanout:广播模式,忽略 RoutingKey,将消息分发给所有绑定队列。
- Direct:定向模式,精确匹配 RoutingKey。
- Topic:通配符模式,支持模糊匹配。
2.1 生产者实现(Fanout)
在 Fanout 模式下,我们声明一个类型为 FANOUT 的交换机,并将多个队列绑定到它。发送消息时,RoutingKey 为空即可。
package org.example.rabbitmq.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT, false);
channel.queueDeclare("fanout.queue1", true, false, false, null);
channel.queueDeclare("fanout.queue2", true, false, false, null);
channel.queueBind("fanout.queue1", "fanout.exchange", "");
channel.queueBind("fanout.queue2", "fanout.exchange", "");
for (int i = 0; i < 10; i++) {
String msg = "hello fanout" + i;
channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
}
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
2.2 消费者实现
每个消费者监听不同的队列。由于交换机是广播模式,两个消费者都会收到相同的内容。
package org.example.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("fanout.queue1", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("fanout.queue1", consumer);
}
}
三、Routing(路由模式)
Routing 模式使用 Direct 类型的交换机。与 Fanout 不同,它要求队列绑定交换机时必须指定 BindingKey,而生产者发送消息时需指定 RoutingKey。只有当两者完全一致时,消息才会被投递。
3.1 生产者实现
package org.example.rabbitmq.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
channel.queueDeclare("direct.queue1", true, false, false, null);
channel.queueDeclare("direct.queue2", true, false, false, null);
channel.queueBind("direct.queue1", "direct.exchange", "a");
channel.queueBind("direct.queue2", "direct.exchange", "a");
channel.queueBind("direct.queue2", "direct.exchange", "b");
channel.queueBind("direct.queue2", "direct.exchange", "c");
String msgA = "hello direct routingKey is a";
channel.basicPublish("direct.exchange", "a", null, msgA.getBytes());
String msgB = "hello direct routingKey is b";
channel.basicPublish("direct.exchange", "b", null, msgB.getBytes());
String msgC = "hello direct routingKey is c";
channel.basicPublish("direct.exchange", "c", null, msgC.getBytes());
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
3.2 消费者实现
package org.example.rabbitmq.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("direct.queue1", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("direct.queue1", consumer);
}
}
四、Topics(通配符模式)
Topics 模式是对 Routing 模式的扩展,允许使用通配符进行更灵活的路由匹配。
- RoutingKey 和 BindingKey 均由点号
. 分隔的单词组成。
* 代表匹配一个单词。
# 代表匹配零个或多个单词。
4.1 生产者实现
package org.example.rabbitmq.topics;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC, true);
channel.queueDeclare("topic.queue1", true, false, false, null);
channel.queueDeclare("topic.queue2", true, false, false, null);
channel.queueBind("topic.queue1", "topic.exchange", "*.a.*");
channel.queueBind("topic.queue2", "topic.exchange", "*.*.b");
channel.queueBind("topic.queue2", "topic.exchange", "c.#");
String msgA = "hello topic routingKey is word.a.word";
channel.basicPublish("topic.exchange", "word.a.word", null, msgA.getBytes());
String msgB = "hello topic routingKey is word.word.b";
channel.basicPublish("topic.exchange", "word.word.b", null, msgB.getBytes());
String msgC = "hello topic routingKey is c.word.word.word.word.b";
channel.basicPublish("topic.exchange", "c.word.word.word.word.b", null, msgC.getBytes());
String msgD = "hello topic routingKey is c.a.b";
channel.basicPublish("topic.exchange", "c.a.b", null, msgD.getBytes());
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
4.2 消费者实现
package org.example.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("topic.queue1", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Consumer1 接收到消息:" + new String(body));
}
};
channel.basicConsume("topic.queue1", consumer);
}
}
五、RPC 通信
RPC(远程过程调用)在 RabbitMQ 中通常通过两个队列配合实现:一个请求队列和一个响应队列。客户端发送请求时附带回调队列名和唯一标识(CorrelationID),服务端处理后回写到该回调队列。
5.1 客户端实现
客户端需要阻塞等待响应。这里使用了 BlockingQueue 来存储结果,并通过 correlationId 确保收到的响应属于当前请求。
package org.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class Client {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("rpc.request.queue", true, false, false, null);
channel.queueDeclare("rpc.response.queue", true, false, false, null);
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationId)
.replyTo("rpc.response.queue")
.build();
String msg = "hello rpc";
channel.basicPublish("", "rpc.request.queue", properties, msg.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
if (correlationId.equals(properties.getCorrelationId())) {
response.offer(new String(body));
}
}
};
channel.basicConsume("rpc.response.queue", true, consumer);
String result = response.take();
System.out.println(" [RPC_Client] Result: " + result);
}
}
5.2 服务器实现
服务端接收请求,处理业务逻辑,并将结果发送到客户端指定的回调队列。
package org.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Server {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.43.47.137");
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("Server 接收到请求:" + new String(body));
String response = "响应";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.replyTo("rpc.request.queue")
.build();
channel.basicPublish("", "rpc.response.queue", props, response.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("rpc.request.queue", false, consumer);
}
}
六、Publisher Confirms(发布确认)
消息丢失是分布式系统中常见的问题。RabbitMQ 提供了 Publisher Confirms 机制来解决生产者侧的消息丢失问题。主要分为三种策略:单独确认、批量确认和异步确认。
6.1 单独确认
每发送一条消息就调用 waitForConfirmsOrDie() 等待服务端确认。这种方式最安全但性能最差,因为它是同步阻塞的,且对于持久化消息需要等待磁盘写入完成。
private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
try (Channel channel = createChannel()) {
channel.confirmSelect();
channel.queueDeclare("publish.confirm.queue1", true, false, false, null);
long start = System.currentTimeMillis();
for (int i = 0; i < 200; i++) {
String msg = "Publishing Messages Individually " + i;
channel.basicPublish("", "publish.confirm.queue1", null, msg.getBytes());
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("单独确认 发送 200 条消息耗时 " + (end - start));
}
}
6.2 批量确认
为了提高性能,可以累积一定数量的消息后再等待确认。虽然减少了网络交互次数,但如果某一批次失败,重发成本较高,可能导致重复消息。
private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {
try (Channel channel = createChannel()) {
channel.confirmSelect();
channel.queueDeclare("publish.confirm.queue2", true, false, false, null);
long start = System.currentTimeMillis();
int batchSize = 100;
int flag = 0;
for (int i = 0; i < 200; i++) {
String msg = "Publishing Messages in Batches " + i;
channel.basicPublish("", "publish.confirm.queue2", null, msg.getBytes());
if (flag == batchSize) {
channel.waitForConfirmsOrDie(5000);
flag = 0;
}
flag++;
}
if (flag > 0) {
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("批量确认 发送 200 条消息耗时 " + (end - start));
}
}
6.3 异步确认
这是性能最高的方式。通过 addConfirmListener 注册回调接口,服务端确认后触发 handleAck 或 handleNack。我们需要维护一个已发送消息序号的集合(如 TreeSet),根据返回的 deliveryTag 更新集合状态。
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
try (Channel channel = createChannel()) {
channel.confirmSelect();
channel.queueDeclare("publish.confirm.queue3", true, false, false, null);
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
long start = System.currentTimeMillis();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
int flag = 0;
for (int i = 0; i < 200; i++) {
String msg = "Handling Publisher Confirms Asynchronously " + i;
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", "publish.confirm.queue3", null, msg.getBytes());
confirmSet.add(nextPublishSeqNo);
}
while (!confirmSet.isEmpty()) {
Thread.sleep(20);
}
long end = System.currentTimeMillis();
System.out.println("异步确认 发送 200 条消息耗时 " + (end - start));
}
}
在实际开发中,建议优先使用异步确认模式,既能保证消息不丢失,又能获得较好的吞吐量。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online