掌控消息全链路(1)——初识RabbitMQ:从核心概念到六种常用模式全景解析

掌控消息全链路(1)——初识RabbitMQ:从核心概念到六种常用模式全景解析
在这里插入图片描述

🔥我的主页:九转苍翎⭐️个人专栏:《Java SE 》《Java集合框架系统精讲》《MySQL高手之路:从基础到高阶 》《计算机网络 》《Java工程师核心能力体系构建》天行健,君子以自强不息。


Linux操作系统版本:Ubuntu 24.04 LTS

1.Message Queue概述

计算机之间的通信方式主要有两种:同步通信异步通信同步通信(Synchronous Communication):通信双方在严格的时间约束下进行交互。发送方发送请求或数据后,会主动等待并阻塞自身,直到收到接收方的明确响应(成功、失败或超时)才会继续执行后续操作。整个过程像是在进行一场“实时对话”异步通信(Asynchronous Communication):发送方发出请求或消息后,不等待接收方的即时响应,而是立即返回并继续执行后续任务。接收方在准备好结果后,通过某种机制将响应或结果“推送”或“通知”给发送方。整个过程更像是“发送邮件”。

MQ(Message Queue,消息队列)是一种用于实现异步通信的中间件技术。它允许不同组件(例如应用程序、服务或微服务)通过发送和接收消息来进行通信,而无需实时等待响应。核心思想是解耦生产者(发送消息)和消费者(接收消息)之间的关系。其核心功能如下:

削峰填谷(缓冲):当消费者处理能力有限或暂时不可用时,队列可以作为缓冲区,暂存生产者发送的大量消息,避免系统过载或消息丢失。待消费者恢复或扩展后,再逐步消费积压的消息

在这里插入图片描述

异步解耦:生产者和消费者不需要知道对方的存在或状态。生产者只需将消息发送到队列,消费者在准备好时从队列中获取消息进行处理

在这里插入图片描述

2.初识RabbitMQ

AMQP是一个开放标准的应用层协议,设计用于异步、可靠、跨平台的消息传递。它定义了消息的格式、传递机制和路由规则,是构建分布式系统中消息中间件的核心协议

RabbitMQ是采用Erlang语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 的消息中间件

2.1 安装

启动RabbitMQ

root@VM-0-7-ubuntu:~# service rabbitmq-server start

安装RabbitMQ管理界面

root@VM-0-7-ubuntu:~# rabbitmq-plugins enable rabbitmq_management

安装RabbitMQ

# 安装RabbitMQ root@VM-0-7-ubuntu:~# apt-get install rabbitmq-server# 确认安装结果 root@VM-0-7-ubuntu:~# systemctl status rabbitmq-server
在这里插入图片描述

安装erlang

# 更新软件包 root@VM-0-7-ubuntu:~# apt-get update# 安装erlang root@VM-0-7-ubuntu:~# apt-get install erlang# 查看erlang版本,输入 "erl" 会启动Erlang 运行时系统 root@VM-0-7-ubuntu:~# erl# 输入 "halt()." 立即停止Erlang运行时系统1> halt(). 
在这里插入图片描述

2.2 访问

  • 5672 端口是 RabbitMQ 的默认 AMQP协议端口。生产者和消费者通过此端口与 RabbitMQ 交互
  • 15672 端口是 RabbitMQ 管理插件的默认 HTTP 端口。通过此端口可以访问 RabbitMQ 的 Web 管理界面,提供图形化的管理功能

添加管理员用户

# 添加用户:add_user ${账号} ${密码} root@VM-0-7-ubuntu:~# rabbitmqctl add_user admin admin# 用户授权:set_user_tags ${账号} ${权限} root@VM-0-7-ubuntu:~# rabbitmqctl set_user_tags admin administrator
登录页面

首页

2.3 工作流程

RabbitMQ 组件邮局系统类比作用
Broker(代理)整个邮局整个消息队列系统,包含所有组件
Virtual Host(虚拟主机)不同地域的邮局分局逻辑隔离,每个 vhost 有独立的用户/权限/队列
Connection(连接)电话专线生产者和消费者之间的 TCP 连接
Channel(信道)分机电话虚拟连接,复用同一个 TCP 连接
Exchange(交换机)邮件分拣中心接收消息并决定路由到哪个队列
Queue(队列)收件人的邮箱存储消息等待消费者取走
Binding(绑定)分拣规则交换机和队列之间的路由规则
在这里插入图片描述
第一阶段:连接建立建立 TCP 连接生产者应用程序首先与 Broker 建立 TCP 连接,默认端口为 5672连接需要指定虚拟主机、用户名和密码创建信道在已建立的 TCP 连接上创建一个或多个信道信道是虚拟连接,共享同一个 TCP 连接。每个信道有独立的通信流,相互隔离第二阶段:资源声明声明交换机:指定名称、类型和属性声明队列:指定名称和属性创建绑定:将队列绑定到交换机,并指定绑定键(Binding Key)第三阶段:消息发送发送消息到交换机:将消息发布到指定的交换机。消息包含:交换机名称、路由键、消息体、消息属性交换机路由消息:交换机接收消息后,根据其类型和绑定规则进行路由第四阶段:路由处理成功路由:队列接收并存储消息,等待消费者拉取或推送消息给消费者

3.RabbitMQ六种工作模式

3.1 Simple

核心特征:

  • 最简单的消息队列模式
  • 一个生产者,一个消费者,一个队列

工作流程:

在这里插入图片描述


代码实现:

/** * 生产者 */importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列/* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) queue:队列名称 durable:可持久化 exclusive:是否独占 autoDelete:是否自动删除 arguments:参数,暂不介绍 */ channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5.发送消息for(int i =0; i <10; i++){ channel.basicPublish("",Constants.WORK_QUEUE,null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//6.资源释放 channel.close(); connection.close();}}/** * 消费者 */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列/* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) queue:队列名称 durable:可持久化 exclusive:是否独占 autoDelete:是否自动删除 arguments:参数,暂不介绍 */ channel.queueDeclare("first",true,false,false,null);//5. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.WORK_QUEUE,true,defaultConsumer);Thread.sleep(2000);//6.资源释放 channel.close(); connection.close();}}

3.2 Work Queues

核心特征:

  • 多个消费者竞争消费同一个队列
  • 轮询分发(默认):每个消费者依次接收消息

工作流程:

在这里插入图片描述


代码实现:

/** * 生产者 */importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列 channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5.发送消息for(int i =0; i <10; i++){ channel.basicPublish("",Constants.WORK_QUEUE,null,("Hello RabbitMQ!:"+ i).getBytes());}System.out.println("消息发送成功");//6.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列 channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("ConsumerA 接收到消息:"+newString(body));}}; channel.basicConsume(Constants.WORK_QUEUE,true,defaultConsumer);Thread.sleep(2000);//6.资源释放// channel.close();// connection.close();}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机,使用内置交换机//4.声明队列 channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//5. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("ConsumerB 接收到消息:"+newString(body));}}; channel.basicConsume(Constants.WORK_QUEUE,true,defaultConsumer);Thread.sleep(2000);//6.资源释放// channel.close();// connection.close();}}

3.3 Publish/Subscribe

核心特征:

  • 一条消息被多个消费者接收
  • 消息广播到所有绑定队列

工作流程:

在这里插入图片描述


代码实现:

/** * 生产者 */importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机//BuiltinExchangeType.FANOUT:广播类型,将消息交给所有绑定到交换机的队列 channel.exchangeDeclare(Constants.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT,true);//4.声明队列 channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null); channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//5.交换机和队列绑定 channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,""); channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//6.发送消息for(int i =0; i <10; i++){ channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//7.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.FANOUT_QUEUE1,true,defaultConsumer);Thread.sleep(2000);}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.FANOUT_QUEUE2,true,defaultConsumer);Thread.sleep(2000);}}

3.4 Routing

核心特征:

  • 根据路由键选择性接收消息
  • 精确匹配路由键

工作流程:

在这里插入图片描述


代码实现:

/** * 生产者 */importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机//BuiltinExchangeType.DIRECT:定向类型,把消息交给符合指定binding key的队列 channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);//4.声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null); channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5.交换机和队列绑定 channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a"); channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a"); channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b"); channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");//6.发送消息for(int i =0; i <3; i++){ channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){ channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){ channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//7.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.DIRECT_QUEUE1,true,defaultConsumer);Thread.sleep(2000);}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.DIRECT_QUEUE2,true,defaultConsumer);Thread.sleep(2000);}}

3.5 Topics

核心特征:

  • 根据路由键选择性接收消息
  • 支持通配符匹配
  • 通配符规则
    • *(星号):匹配一个单词
    • #(井号):匹配零个或多个单词
    • 单词用点号分隔,如stock.usd.nyse

工作流程:

在这里插入图片描述


代码实现:与Routing模式基本一致

/** * 生产者 */importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机//BuiltinExchangeType.TOPIC:通配符类型,把消息交给符合routing pattern(路由模式)的队列 channel.exchangeDeclare(Constants.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC,true);//4.声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null); channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//5.交换机和队列绑定 channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*"); channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b"); channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");//6.发送消息for(int i =0; i <3; i++){//转发到queue1 channel.basicPublish(Constants.TOPIC_EXCHANGE,"aa.a.f",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){//转发到queue1和queue2 channel.basicPublish(Constants.TOPIC_EXCHANGE,"aa.a.b",null,("Hello RabbitMQ!"+ i).getBytes());}for(int i =0; i <3; i++){//转发到queue2 channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.a.a.a",null,("Hello RabbitMQ!"+ i).getBytes());}System.out.println("消息发送成功");//7.资源释放 channel.close(); connection.close();}}/** * 消费者A */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerA{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.TOPIC_QUEUE1,true,defaultConsumer);Thread.sleep(2000);}}/** * 消费者B */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumerB{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));}}; channel.basicConsume(Constants.TOPIC_QUEUE2,true,defaultConsumer);Thread.sleep(2000);}}

3.6 RPC

RPC(Remote Procedure Call,远程过程调用):通过网络从远程计算机上请求服务并同步等待服务端的响应

核心特征:

  • 请求/响应模式
  • 实现远程过程调用,使用回调队列和关联ID

工作流程:

在这里插入图片描述
  • 当客户端启动时,它创建一个独占回调队列
  • 对于RPC请求,客户端发送一个具有两个属性的消息:reply_to,它被设置为回调队列,correlation_id,它被设置为每个请求的唯一值。
  • 请求被发送到rpc_queue队列
  • RPC工作者(又名:服务器)正在等待队列上的请求。当出现请求时,它完成任务,并使用replyTo字段中的队列将带有结果的消息发送回客户机
  • 客户端在应答队列上等待数据。当消息出现时,它会检查correlationId属性。如果与请求的值匹配,则将响应返回给应用程序

代码实现:

/** * 客户端 */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.TimeoutException;publicclassRpcClient{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);//4.发送请求String correlationID = UUID.randomUUID().toString();AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build(); channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,properties,("Hello RabbitMQ!").getBytes());//5.接收响应finalBlockingQueue<String> response =newArrayBlockingQueue<>(1);DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){//TODOSystem.out.println("接收到消息:"+newString(body));if(correlationID.equals(properties.getCorrelationId())){if(!response.offer(newString(body))){System.out.println("队列满");}}}}; channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,defaultConsumer);System.out.println("[RPC Client 响应结果]:"+ response.take());}}/** * 服务器 */importcom.rabbitmq.client.*;importrabbitmq.constant.Constants;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.TimeoutException;publicclassRpcServer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明队列 channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null); channel.basicQos(1);DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//TODOString request =newString(body,StandardCharsets.UTF_8);System.out.println("接收到消息:"+ request);System.out.println("针对请求:"+ request +",响应成功");AMQP.BasicProperties basicProperties =newAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();//5.发送响应 channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,basicProperties,request.getBytes());//手动确认 channel.basicAck(envelope.getDeliveryTag(),false);}};//4.接收请求 channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,defaultConsumer);}}

Read more

MySQL 调优

MySQL 调优

🧑 博主简介:ZEEKLOG博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c=1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编程,高并发设计,Springboot和微服务,熟悉Linux,ESXI虚拟化以及云原生Docker和K8s,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。 技术合作请加本人wx(注明来自ZEEKLOG):foreast_sea MySQL 调优 SQL 优化步骤 当面对一个需要优化的 SQL 时,我们有哪几种排查思路呢? 通过 show status 命令了解 SQL 执行次数 首先,我们可以使用

By Ne0inhk
【金仓数据库】ksql 指南(五) —— 创建与管理索引和视图(KingbaseES 查询优化核心)

【金仓数据库】ksql 指南(五) —— 创建与管理索引和视图(KingbaseES 查询优化核心)

引言 掌握表的基本运作之后,若想优化查询效率并简化数据访问,就要去学习“索引”和“视图”的运用,索引类似于“书籍目录”,可以极大地加快查询速度;视图类似“数据窗口”,能够隐藏复杂的查询逻辑,还能控制数据的可见性。本文就“ksql命令行操作索引与视图”展开论述,把从“作用到创建,再到查看,维持直至删除”的全过程拆解成实际操作步骤,并结合例子和避坑提示,以使初学者能够领悟并付诸实行。 文章目录 * 引言 * 一、前置准备:确认操作基础(衔接前文,确保连贯) * 1.1 1. 连接数据库并切换目标模式 * 1.2 2. 插入测试数据(用于验证索引 / 视图效果) * 二、索引管理:给表 “加目录”,加速查询 * 2.1 1.

By Ne0inhk
【MySQL】第七节—表的增删改查,吃透这篇就够了(上)

【MySQL】第七节—表的增删改查,吃透这篇就够了(上)

Hello,我是云边有个稻草人,表的增删改查,ACTION! 《MySQL》本篇文章所属专栏—持续更新中—欢迎订阅! 目录 一、Create  1.1 单行数据 + 全列插入  1.2 多行数据 + 指定列插入  1.3 插入否则更新  1.4 替换  二、Retrieve(查询) 2.1 select 列  【全列查询】 【指定列查询】 【查询字段为表达式】 【distinct 结果去重】 2.2 where条件  【英语不及格的同学及英语成绩 ( < 60 )】 【语文成绩在 [80, 90] 分的同学及语文成绩】 【数学成绩是 58 或者

By Ne0inhk
Flutter for OpenHarmony:tostore 鸿蒙原生 KV 数据库,支持 SQL 与 NoSQL 混合存储(全能型数据引擎) 深度解析与鸿蒙适配指南

Flutter for OpenHarmony:tostore 鸿蒙原生 KV 数据库,支持 SQL 与 NoSQL 混合存储(全能型数据引擎) 深度解析与鸿蒙适配指南

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 在移动应用开发中,数据持久化(Data Persistence)永远是架构设计中不可或缺的一环。无论是保存用户的登录状态、偏好设置,还是缓存新闻列表、聊天记录,选择一个合适的数据库往往决定了 App 的运行流畅度和开发效率。 在 Flutter 生态中,我们熟知的数据库方案琳琅满目: * Shared Preferences: 轻量级,但只适合存简单的 Key-Value,性能较差,且不支持复杂查询。 * Sqflite: 基于 SQLite 的封装,功能强大且稳定,但它是关系型数据库,Schema 变更(数库迁移)极其繁琐,且需要编写大量的 SQL 语句或依赖复杂的 ORM。 * Hive: 纯 Dart 编写的 NoSQL 数据库,速度极快(

By Ne0inhk