跳到主要内容RabbitMQ 核心概念与六种常用模式解析 | 极客日志Javajava
RabbitMQ 核心概念与六种常用模式解析
消息队列(MQ)是实现异步通信的中间件技术,核心在于解耦生产者和消费者。RabbitMQ 基于 AMQP 协议,通过 Broker、Exchange、Queue 等组件实现消息路由。 RabbitMQ 的六种工作模式:Simple 模式用于基础收发;Work Queues 实现多消费者竞争消费;Publish/Subscribe 支持广播;Routing 和 Topics 模式根据路由键精确或通配符匹配分发;RPC 模式实现请求响应机制。文中提供 Java 客户端连接、声明资源及发送接收消息的代码示例,涵盖安装配置、管理界面访问及核心 API 使用,帮助开发者掌握分布式系统中的消息传递方案。
GopherDev8K 浏览 1. Message Queue 概述
计算机之间的通信方式主要有两种:同步通信和异步通信。
同步通信 (Synchronous Communication):通信双方在严格的时间约束下进行交互。发送方发送请求或数据后,会主动等待并阻塞自身,直到收到接收方的明确响应 (成功、失败或超时) 才会继续执行后续操作。整个过程像是在进行一场'实时对话'。
异步通信 (Asynchronous Communication):发送方发出请求或消息后,不等待接收方的即时响应,而是立即返回并继续执行后续任务。接收方在准备好结果后,通过某种机制将响应或结果'推送'或'通知'给发送方。整个过程更像是'发送邮件'。
MQ (Message Queue,消息队列) 是一种用于实现异步通信的中间件技术。它允许不同组件 (例如应用程序、服务或微服务) 通过发送和接收消息来进行通信,而无需实时等待响应。核心思想是解耦生产者 (发送消息) 和消费者 (接收消息) 之间的关系。其核心功能如下:
削峰填谷 (缓冲):当消费者处理能力有限或暂时不可用时,队列可以作为缓冲区,暂存生产者发送的大量消息,避免系统过载或消息丢失。待消费者恢复或扩展后,再逐步消费积压的消息。

异步解耦:生产者和消费者不需要知道对方的存在或状态。生产者只需将消息发送到队列,消费者在准备好时从队列中获取消息进行处理。

2. 初识 RabbitMQ
AMQP 是一个开放标准的应用层协议,设计用于异步、可靠、跨平台的消息传递。它定义了消息的格式、传递机制和路由规则,是构建分布式系统中消息中间件的核心协议。
RabbitMQ是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 的消息中间件。
2.1 安装
启动 RabbitMQ
root@VM-0-7-ubuntu:~# service rabbitmq-server start
安装 RabbitMQ 管理界面
root@VM-0-7-ubuntu:~# rabbitmq-plugins enable rabbitmq_management
安装 RabbitMQ
root@VM-0-7-ubuntu:~# apt-get install rabbitmq-server
root@VM-0-7-ubuntu:~# systemctl status rabbitmq-server

root@VM-0-7-ubuntu:~# apt-get update
root@VM-0-7-ubuntu:~# apt-get install erlang
root@VM-0-7-ubuntu:~# erl
1> halt().
2.2 访问
- 5672 端口是 RabbitMQ 的默认 AMQP 协议端口。生产者和消费者通过此端口与 RabbitMQ 交互。
- 15672 端口是 RabbitMQ 管理插件的默认 HTTP 端口。通过此端口可以访问 RabbitMQ 的 Web 管理界面,提供图形化的管理功能。
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
2.3 工作流程
| RabbitMQ 组件 | 邮局系统类比 | 作用 |
|---|
| Broker(代理) | 整个邮局 | 整个消息队列系统,包含所有组件 |
| Virtual Host(虚拟主机) | 不同地域的邮局分局 | 逻辑隔离,每个 vhost 有独立的用户/权限/队列 |
| Connection(连接) | 电话专线 | 生产者和消费者之间的 TCP 连接 |
| Channel(信道) | 分机电话 | 虚拟连接,复用同一个 TCP 连接 |
| Exchange(交换机) | 邮件分拣中心 | 接收消息并决定路由到哪个队列 |
| Queue(队列) | 收件人的邮箱 | 存储消息等待消费者取走 |
| Binding(绑定) | 分拣规则 | 交换机和队列之间的路由规则 |
第一阶段:连接建立
建立 TCP 连接:生产者应用程序首先与 Broker 建立 TCP 连接,默认端口为 5672。连接需要指定虚拟主机、用户名和密码。
创建信道:在已建立的 TCP 连接上创建一个或多个信道。信道是虚拟连接,共享同一个 TCP 连接。每个信道有独立的通信流,相互隔离。
第二阶段:资源声明
声明交换机:指定名称、类型和属性。
声明队列:指定名称和属性。
创建绑定:将队列绑定到交换机,并指定绑定键 (Binding Key)。
第三阶段:消息发送
发送消息到交换机:将消息发布到指定的交换机。消息包含:交换机名称、路由键、消息体、消息属性。
交换机路由消息:交换机接收消息后,根据其类型和绑定规则进行路由。
第四阶段:路由处理
成功路由:队列接收并存储消息,等待消费者拉取或推送消息给消费者。
3. RabbitMQ 六种工作模式
3.1 Simple
- 最简单的消息队列模式
- 一个生产者,一个消费者,一个队列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", Constants.WORK_QUEUE, null, ("Hello RabbitMQ!" + i).getBytes());
}
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("first", true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, defaultConsumer);
Thread.sleep(2000);
channel.close();
connection.close();
}
}
3.2 Work Queues
- 多个消费者竞争消费同一个队列
- 轮询分发 (默认):每个消费者依次接收消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", Constants.WORK_QUEUE, null, ("Hello RabbitMQ!:" + i).getBytes());
}
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerA {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("ConsumerA 接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, defaultConsumer);
Thread.sleep(2000);
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerB {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("ConsumerB 接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, defaultConsumer);
Thread.sleep(2000);
}
}
3.3 Publish/Subscribe

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
for (int i = 0; i < 10; i++) {
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, ("Hello RabbitMQ!" + i).getBytes());
}
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerA {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, defaultConsumer);
Thread.sleep(2000);
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerB {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE2, true, defaultConsumer);
Thread.sleep(2000);
}
}
3.4 Routing

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
for (int i = 0; i < 3; i++) {
channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, ("Hello RabbitMQ!" + i).getBytes());
}
for (int i = 0; i < 3; i++) {
channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, ("Hello RabbitMQ!" + i).getBytes());
}
for (int i = 0; i < 3; i++) {
channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, ("Hello RabbitMQ!" + i).getBytes());
}
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerA {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, true, defaultConsumer);
Thread.sleep(2000);
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerB {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE2, true, defaultConsumer);
Thread.sleep(2000);
}
}
3.5 Topics
- 根据路由键选择性接收消息
- 支持通配符匹配
- 通配符规则
*(星号):匹配一个单词
#(井号):匹配零个或多个单词
- 单词用点号分隔,如 stock.usd.nyse

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
for (int i = 0; i < 3; i++) {
channel.basicPublish(Constants.TOPIC_EXCHANGE, "aa.a.f", null, ("Hello RabbitMQ!" + i).getBytes());
}
for (int i = 0; i < 3; i++) {
channel.basicPublish(Constants.TOPIC_EXCHANGE, "aa.a.b", null, ("Hello RabbitMQ!" + i).getBytes());
}
for (int i = 0; i < 3; i++) {
channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.a.a.a", null, ("Hello RabbitMQ!" + i).getBytes());
}
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerA {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1, true, defaultConsumer);
Thread.sleep(2000);
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerB {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE2, true, defaultConsumer);
Thread.sleep(2000);
}
}
3.6 RPC
RPC(Remote Procedure Call,远程过程调用):通过网络从远程计算机上请求服务并同步等待服务端的响应。
- 请求/响应模式
- 实现远程过程调用,使用回调队列和关联 ID
- 当客户端启动时,它创建一个独占回调队列。
- 对于 RPC 请求,客户端发送一个具有两个属性的消息:reply_to,它被设置为回调队列,correlation_id,它被设置为每个请求的唯一值。
- 请求被发送到 rpc_queue 队列。
- RPC 工作者 (又名:服务器) 正在等待队列上的请求。当出现请求时,它完成任务,并使用 replyTo 字段中的队列将带有结果的消息发送回客户机。
- 客户端在应答队列上等待数据。当消息出现时,它会检查 correlationId 属性。如果与请求的值匹配,则将响应返回给应用程序。
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
String correlationID = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, ("Hello RabbitMQ!").getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("接收到消息:" + new String(body));
if (correlationID.equals(properties.getCorrelationId())) {
if (!response.offer(new String(body))) {
System.out.println("队列满");
}
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, defaultConsumer);
System.out.println("[RPC Client 响应结果]:" + response.take());
}
}
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body, StandardCharsets.UTF_8);
System.out.println("接收到消息:" + request);
System.out.println("针对请求:" + request + ",响应成功");
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, request.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, defaultConsumer);
}
}
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online