掌控消息全链路(1)——初识RabbitMQ:从核心概念到六种常用模式全景解析
🔥我的主页:九转苍翎⭐️个人专栏:《Java SE 》《Java集合框架系统精讲》《MySQL高手之路:从基础到高阶 》《计算机网络 》《Java工程师核心能力体系构建》天行健,君子以自强不息。
Linux操作系统版本:Ubuntu 24.04 LTS
1.Message Queue概述
计算机之间的通信方式主要有两种:同步通信和异步通信同步通信(Synchronous Communication):通信双方在严格的时间约束下进行交互。发送方发送请求或数据后,会主动等待并阻塞自身,直到收到接收方的明确响应(成功、失败或超时)才会继续执行后续操作。整个过程像是在进行一场“实时对话”异步通信(Asynchronous Communication):发送方发出请求或消息后,不等待接收方的即时响应,而是立即返回并继续执行后续任务。接收方在准备好结果后,通过某种机制将响应或结果“推送”或“通知”给发送方。整个过程更像是“发送邮件”。
MQ(Message Queue,消息队列)是一种用于实现异步通信的中间件技术。它允许不同组件(例如应用程序、服务或微服务)通过发送和接收消息来进行通信,而无需实时等待响应。核心思想是解耦生产者(发送消息)和消费者(接收消息)之间的关系。其核心功能如下:
削峰填谷(缓冲):当消费者处理能力有限或暂时不可用时,队列可以作为缓冲区,暂存生产者发送的大量消息,避免系统过载或消息丢失。待消费者恢复或扩展后,再逐步消费积压的消息
异步解耦:生产者和消费者不需要知道对方的存在或状态。生产者只需将消息发送到队列,消费者在准备好时从队列中获取消息进行处理
2.初识RabbitMQ
AMQP是一个开放标准的应用层协议,设计用于异步、可靠、跨平台的消息传递。它定义了消息的格式、传递机制和路由规则,是构建分布式系统中消息中间件的核心协议
RabbitMQ是采用Erlang语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 的消息中间件
2.1 安装
启动RabbitMQ
root@VM-0-7-ubuntu:~# service rabbitmq-server start安装RabbitMQ管理界面
root@VM-0-7-ubuntu:~# rabbitmq-plugins enable rabbitmq_management安装RabbitMQ
# 安装RabbitMQ root@VM-0-7-ubuntu:~# apt-get install rabbitmq-server# 确认安装结果 root@VM-0-7-ubuntu:~# systemctl status rabbitmq-server安装erlang
# 更新软件包 root@VM-0-7-ubuntu:~# apt-get update# 安装erlang root@VM-0-7-ubuntu:~# apt-get install erlang# 查看erlang版本,输入 "erl" 会启动Erlang 运行时系统 root@VM-0-7-ubuntu:~# erl# 输入 "halt()." 立即停止Erlang运行时系统1> halt(). 2.2 访问
- 5672 端口是 RabbitMQ 的默认 AMQP协议端口。生产者和消费者通过此端口与 RabbitMQ 交互
- 15672 端口是 RabbitMQ 管理插件的默认 HTTP 端口。通过此端口可以访问 RabbitMQ 的 Web 管理界面,提供图形化的管理功能
添加管理员用户
# 添加用户:add_user ${账号} ${密码} root@VM-0-7-ubuntu:~# rabbitmqctl add_user admin admin# 用户授权:set_user_tags ${账号} ${权限} root@VM-0-7-ubuntu:~# rabbitmqctl set_user_tags admin administrator登录页面
首页
2.3 工作流程
| RabbitMQ 组件 | 邮局系统类比 | 作用 |
|---|---|---|
| Broker(代理) | 整个邮局 | 整个消息队列系统,包含所有组件 |
| Virtual Host(虚拟主机) | 不同地域的邮局分局 | 逻辑隔离,每个 vhost 有独立的用户/权限/队列 |
| Connection(连接) | 电话专线 | 生产者和消费者之间的 TCP 连接 |
| Channel(信道) | 分机电话 | 虚拟连接,复用同一个 TCP 连接 |
| Exchange(交换机) | 邮件分拣中心 | 接收消息并决定路由到哪个队列 |
| Queue(队列) | 收件人的邮箱 | 存储消息等待消费者取走 |
| Binding(绑定) | 分拣规则 | 交换机和队列之间的路由规则 |
第一阶段:连接建立建立 TCP 连接生产者应用程序首先与 Broker 建立 TCP 连接,默认端口为 5672连接需要指定虚拟主机、用户名和密码创建信道在已建立的 TCP 连接上创建一个或多个信道信道是虚拟连接,共享同一个 TCP 连接。每个信道有独立的通信流,相互隔离第二阶段:资源声明声明交换机:指定名称、类型和属性声明队列:指定名称和属性创建绑定:将队列绑定到交换机,并指定绑定键(Binding Key)第三阶段:消息发送发送消息到交换机:将消息发布到指定的交换机。消息包含:交换机名称、路由键、消息体、消息属性交换机路由消息:交换机接收消息后,根据其类型和绑定规则进行路由第四阶段:路由处理成功路由:队列接收并存储消息,等待消费者拉取或推送消息给消费者
3.RabbitMQ六种工作模式
3.1 Simple
核心特征:
- 最简单的消息队列模式
- 一个生产者,一个消费者,一个队列
工作流程:
代码实现:
/** * 生产者 */importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列/* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) queue:队列名称 durable:可持久化 exclusive:是否独占 autoDelete:是否自动删除 arguments:参数,暂不介绍 */ channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5.发送消息for(int i =0; i <10; i++){ channel.basicPublish("",Constants.WORK_QUEUE,null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//6.资源释放 channel.close(); connection.close();}}/** * 消费者 */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列/* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) queue:队列名称 durable:可持久化 exclusive:是否独占 autoDelete:是否自动删除 arguments:参数,暂不介绍 */ channel.queueDeclare("first",true,false,false,null);//5. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.WORK_QUEUE,true,defaultConsumer);Thread.sleep(2000);//6.资源释放 channel.close(); connection.close();}}3.2 Work Queues
核心特征:
- 多个消费者竞争消费同一个队列
- 轮询分发(默认):每个消费者依次接收消息
工作流程:
代码实现:
/** * 生产者 */importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列 channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5.发送消息for(int i =0; i <10; i++){ channel.basicPublish("",Constants.WORK_QUEUE,null,("Hello RabbitMQ!:"+ i).getBytes());}System.out.println("消息发送成功");//6.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列 channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("ConsumerA 接收到消息:"+newString(body));}}; channel.basicConsume(Constants.WORK_QUEUE,true,defaultConsumer);Thread.sleep(2000);//6.资源释放// channel.close();// connection.close();}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列 channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("ConsumerB 接收到消息:"+newString(body));}}; channel.basicConsume(Constants.WORK_QUEUE,true,defaultConsumer);Thread.sleep(2000);//6.资源释放// channel.close();// connection.close();}}3.3 Publish/Subscribe
核心特征:
- 一条消息被多个消费者接收
- 消息广播到所有绑定队列
工作流程:
代码实现:
/** * 生产者 */importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机//BuiltinExchangeType.FANOUT:广播类型,将消息交给所有绑定到交换机的队列 channel.exchangeDeclare(Constants.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT,true);//4.声明队列 channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null); channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//5.交换机和队列绑定 channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,""); channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//6.发送消息for(int i =0; i <10; i++){ channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//7.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.FANOUT_QUEUE1,true,defaultConsumer);Thread.sleep(2000);}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.FANOUT_QUEUE2,true,defaultConsumer);Thread.sleep(2000);}}3.4 Routing
核心特征:
- 根据路由键选择性接收消息
- 精确匹配路由键
工作流程:
代码实现:
/** * 生产者 */importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机//BuiltinExchangeType.DIRECT:定向类型,把消息交给符合指定binding key的队列 channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);//4.声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null); channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5.交换机和队列绑定 channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a"); channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a"); channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b"); channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");//6.发送消息for(int i =0; i <3; i++){ channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){ channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){ channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//7.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.DIRECT_QUEUE1,true,defaultConsumer);Thread.sleep(2000);}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.DIRECT_QUEUE2,true,defaultConsumer);Thread.sleep(2000);}}3.5 Topics
核心特征:
- 根据路由键选择性接收消息
- 支持通配符匹配
- 通配符规则
- *(星号):匹配一个单词
- #(井号):匹配零个或多个单词
- 单词用点号分隔,如stock.usd.nyse
工作流程:
代码实现:与Routing模式基本一致
/** * 生产者 */importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机//BuiltinExchangeType.TOPIC:通配符类型,把消息交给符合routing pattern(路由模式)的队列 channel.exchangeDeclare(Constants.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC,true);//4.声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null); channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//5.交换机和队列绑定 channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*"); channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b"); channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");//6.发送消息for(int i =0; i <3; i++){//转发到queue1 channel.basicPublish(Constants.TOPIC_EXCHANGE,"aa.a.f",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){//转发到queue1和queue2 channel.basicPublish(Constants.TOPIC_EXCHANGE,"aa.a.b",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){//转发到queue2 channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.a.a.a",null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//7.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.TOPIC_QUEUE1,true,defaultConsumer);Thread.sleep(2000);}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.TOPIC_QUEUE2,true,defaultConsumer);Thread.sleep(2000);}}3.6 RPC
RPC(Remote Procedure Call,远程过程调用):通过网络从远程计算机上请求服务并同步等待服务端的响应核心特征:
- 请求/响应模式
- 实现远程过程调用,使用回调队列和关联ID
工作流程:
- 当客户端启动时,它创建一个独占回调队列
- 对于RPC请求,客户端发送一个具有两个属性的消息:reply_to,它被设置为回调队列,correlation_id,它被设置为每个请求的唯一值。
- 请求被发送到rpc_queue队列
- RPC工作者(又名:服务器)正在等待队列上的请求。当出现请求时,它完成任务,并使用replyTo字段中的队列将带有结果的消息发送回客户机
- 客户端在应答队列上等待数据。当消息出现时,它会检查correlationId属性。如果与请求的值匹配,则将响应返回给应用程序
代码实现:
/** * 客户端 */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.TimeoutException;publicclassRpcClient{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);//4.发送请求String correlationID = UUID.randomUUID().toString();AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build(); channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,properties,("Hello RabbitMQ!").getBytes());//5.接收响应finalBlockingQueue<String> response =newArrayBlockingQueue<>(1);DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));if(correlationID.equals(properties.getCorrelationId())){if(!response.offer(newString(body))){System.out.println("队列满");}}}}; channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,defaultConsumer);System.out.println("[RPC Client 响应结果]:"+ response.take());}}/** * 服务器 */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.TimeoutException;publicclassRpcServer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null); channel.basicQos(1);DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//TODOString request =newString(body,StandardCharsets.UTF_8);System.out.println("接收到消息:"+ request);System.out.println("针对请求:"+ request +",响应成功");AMQP.BasicProperties basicProperties =newAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();//5.发送响应 channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,basicProperties,request.getBytes());//手动确认 channel.basicAck(envelope.getDeliveryTag(),false);}};//4.接收请求 channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,defaultConsumer);}}