详解RabbitMQ工作模式之RPC通信模式

详解RabbitMQ工作模式之RPC通信模式

目录

RPC通信模式

概述

工作流程

特点

应用场景

代码案例

引入依赖

常量类

编写客户端代码

编写服务端代码

运行程序(先运行客户端,再运行服务端)


RPC通信模式

概述
在RabbitMQ中,RPC模式通过消息队列实现远程调用功能。客户端(生产者)发送消息到消费队列,服务端(消费者)进行消息消费并执行相应的程序,然后将结果发送到回调队列供客户端使用。这是一种双向的生产消费模式,其中客户端既是生产者又是消费者,服务端则专注于处理消息并生成响应。

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程.

工作流程

1.客户端发送请求:

客户端连接到RabbitMQ服务器。
客户端声明一个用于发送RPC请求的队列(通常是固定的,如rpc_queue)。
客户端创建一个临时的回调队列,并在发送请求时,将回调队列的名称作为消息属性(reply_to)发送给交换机。
客户端为每个请求生成一个唯一的correlation_id,并将其作为消息属性发送,以便在接收响应时能够匹配请求与响应。

2.交换机路由请求:

交换机接收到RPC请求后,根据路由键将请求路由到服务端监听的队列。

3.服务端处理请求:

服务端(消费者)从队列中接收请求。
服务端处理请求,并生成响应。
服务端将响应发送到客户端指定的回调队列,并在消息属性中设置相同的correlation_id。

4.客户端接收响应:

客户端监听其回调队列以接收响应。
当接收到响应时,客户端检查correlation_id以确定响应是否与之前的请求匹配。
如果匹配,客户端处理响应;如果不匹配,客户端可能丢弃该响应。
特点
1.解耦:客户端和服务端之间不需要直接通信,降低了系统间的耦合度。
2.灵活性:支持多种语言和平台之间的远程调用。
3.可扩展性:通过增加服务端(消费者)的数量,可以轻松扩展RPC服务。
4.性能开销:由于涉及到网络传输和消息队列的处理,RPC调用的性能通常低于本地调用。
5.复杂性:需要处理消息队列的可靠性、持久性、消息确认等复杂问题。
6.安全性:远程调用可能面临更多的安全风险,如消息篡改、中间人攻击等。
应用场景
RabbitMQ的RPC通信模式适用于需要远程调用服务的场景,如分布式系统中的服务调用、微服务架构中的服务通信等。通过RabbitMQ的消息队列机制,可以实现跨系统、跨语言的远程调用,提高系统的灵活性和可扩展性。
代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.21.0</version> </dependency>
常量类
public class Constants { public static final String HOST = "47.98.109.138"; public static final int PORT = 5672; public static final String USER_NAME = "study"; public static final String PASSWORD = "study"; public static final String VIRTUAL_HOST = "aaa"; //rpc 模式 public static final String RPC_REQUEST_QUEUE = "rpc.request.queue"; public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue"; }
编写客户端代码
import com.rabbitmq.client.*; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; /** * rpc 客户端 * 1. 发送请求 * 2. 接收响应 */ public class RpcClient { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null); //3. 发送请求 String msg = "hello rpc..."; //设置请求的唯一标识 String correlationID = UUID.randomUUID().toString(); //设置请求的相关属性 AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(correlationID) .replyTo(Constants.RPC_RESPONSE_QUEUE) .build(); channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes()); //4. 接收响应 //使用阻塞队列, 来存储响应信息 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String respMsg = new String(body); System.out.println("接收到回调消息: "+ respMsg); if (correlationID.equals(properties.getCorrelationId())){ //如果correlationID校验一致 response.offer(respMsg); } } }; channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer); String result = response.take(); System.out.println("[RPC Client 响应结果]:"+ result); } }
编写服务端代码
import com.rabbitmq.client.*; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * RPC server * 1. 接收请求 * 2. 发送响应 */ public class RpcServer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 接收请求 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String request = new String(body,"UTF-8"); System.out.println("接收到请求:"+ request); String response = "针对request:"+ request +", 响应成功"; AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer); } }
运行程序(先运行客户端,再运行服务端)

可以在管理界面看到其中一个队列中有1条消息

我们可以看到,服务端接收到了消息并给客户端发送了响应,与预期符合。

Read more

AI艺术社区推荐:5个Stable Diffusion云端协作平台

AI艺术社区推荐:5个Stable Diffusion云端协作平台 你是否也遇到过这样的情况:社团成员各自用本地电脑跑Stable Diffusion,结果有人显卡不够、有人环境配不起来,作品风格五花八门,想一起搞个联合创作项目却根本没法同步?别急——这正是我们今天要解决的问题。 随着AI绘画的普及,越来越多的艺术社团开始尝试用Stable Diffusion进行集体创作。但传统的单机模式已经跟不上节奏了。真正的未来,在于云端协作:所有人共享模型、提示词、参数配置,实时查看彼此生成进度,还能一键部署展示空间。听起来很复杂?其实现在已经有多个成熟的云端Stable Diffusion协作平台,专为团队设计,支持多人在线编辑、版本管理、资源共用,甚至能直接对外发布Web服务。 本文将结合ZEEKLOG星图提供的算力资源和预置镜像能力,为你盘点5个最适合艺术社团使用的Stable Diffusion云端协作平台。这些平台都具备以下特点: * 支持一键部署Stable Diffusion WebUI或ComfyUI * 提供GPU加速(如A100/V100等),确保出图流畅 *

By Ne0inhk

Claude部署(copilot反向代理)

一、教育邮箱认证 1、进行教育邮箱认证可免费使用claude pro 2年,有机会的话可以进行认证,无法教育认证的话只能花钱充claude的会员了,如何进行教育认证可观看该Up的视频 超简单一次通过Github学生认证,逐步详细视频教程_哔哩哔哩_bilibili 2、教育认证通过后在GitHub个人主页下的Copilot/Features中开启Copilot Pro 二、服务器上配置Copilot反向代理 1、配置nodejs环境 在官网https://nodejs.org/en/download/package-manager,下载nodejs安装包(Linux) 下载完成后将压缩包传到服务器上进行解压,目录如下 创建软连接,使得在任意目录下都可以试用直接使用node命令和npm命令 ln -s /root/node-v24.13.1-linux-x64/bin/node /usr/local/bin/node ln -s /root/node-v24.13.

By Ne0inhk

AnythingLLM集成Whisper实战:如何实现高效语音转文本处理

快速体验 在开始今天关于 AnythingLLM集成Whisper实战:如何实现高效语音转文本处理 的探讨之前,我想先分享一个最近让我觉得很有意思的全栈技术挑战。 我们常说 AI 是未来,但作为开发者,如何将大模型(LLM)真正落地为一个低延迟、可交互的实时系统,而不仅仅是调个 API? 这里有一个非常硬核的动手实验:基于火山引擎豆包大模型,从零搭建一个实时语音通话应用。它不是简单的问答,而是需要你亲手打通 ASR(语音识别)→ LLM(大脑思考)→ TTS(语音合成)的完整 WebSocket 链路。对于想要掌握 AI 原生应用架构的同学来说,这是个绝佳的练手项目。 从0到1构建生产级别应用,脱离Demo,点击打开 从0打造个人豆包实时通话AI动手实验 AnythingLLM集成Whisper实战:如何实现高效语音转文本处理 语音转文本(ASR)在现代应用中越来越重要,但很多开发者在实际部署时都会遇到效率瓶颈。本文将详细介绍如何将Whisper语音识别模型高效集成到AnythingLLM中,解决这些性能问题。 当前语音转文本的痛点分析 1. 处理延迟高:传统

By Ne0inhk

免费开源!Qwen-Image-Edit-2511本地部署全流程

免费开源!Qwen-Image-Edit-2511本地部署全流程 你是否试过用AI修图,结果人物脸型变了、衣服颜色跑偏、背景线条扭曲?或者想给产品图换材质,却反复生成出完全不像原图的“抽象派”版本?别急——Qwen-Image-Edit-2511来了。这不是又一个参数微调的“小升级”,而是真正解决图像编辑中“失真、漂移、不一致”三大顽疾的实用型模型。它不开玩笑:能稳住人脸结构、锁住品牌标识、保持多人合影的姿态逻辑,还能让工业设计草图的圆角半径、倒角过渡、投影方向都经得起放大审视。 更关键的是:它完全开源,无需API密钥,不依赖云端排队,一台带NVIDIA显卡的普通工作站就能跑起来。本文不讲论文、不堆参数,只带你从零开始,在本地完整部署Qwen-Image-Edit-2511,实测图片编辑效果,避开所有常见坑——包括ComfyUI路径错乱、LoRA加载失败、端口冲突、显存溢出等真实问题。全程使用中文界面、中文提示词、中文报错排查,小白也能照着操作成功。 1. 为什么这次部署值得你花30分钟? 很多人看到“本地部署”就下意识觉得麻烦:

By Ne0inhk