【Java 微服务中间件】RabbitMQ 全方位解析:同步异步对比、SpringAMQT基础入门、实战、交换机类型及消息处理详解

【Java 微服务中间件】RabbitMQ 全方位解析:同步异步对比、SpringAMQT基础入门、实战、交换机类型及消息处理详解

文章目录

一、同步异步调用介绍

(1) 同步

解读: 案例:支付服务》〉》〉》扣余额(需要等待结果) 扩展功能: 支付成功发短信 增加积分 优点: 可以立即得到结果的响应 缺点: 拓展性差(每增加一个功能都要修改之前的代码) 性能下降(同步需要等待结果,没有得到结果就需要一直等待,占用线程) 级联失败问题(在同步中的一个环节出现问题,可能后续的服务都出现问题) 

(2) 异步

消息代理可以将消息同时发送给交易、通知、积分服务
综上,异步调用的优势包括: ● 耦合度更低 ● 性能更好 ● 业务拓展性强 ● 故障隔离,避免级联失败 当然,异步通信也并非完美无缺,它存在下列缺点: ● 完全依赖于Broker的可靠性、安全性和性能 ● 架构复杂,后期维护和调试麻烦 

二、RabbitMQ基本介绍

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
Messaging that just works — RabbitMQ
接下来,我们就学习它的基本概念和基础用法。

三、快速入门

快速入门

需求:在rabbitmq的控制台完成下列操作:
  • 新建队列hello.queue1和hello.queue2
  • 向默认的amp.fanout交换机发送一条消息
  • 查看消息是否到达hello.queue1和hello.queue2
    • 已经接收到了消息

四、虚拟主机(数据隔离)

virtual host的隔离特性,将不同项目隔离

五、java客户端实战

(1)名词解释:AMQT

由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:
Spring AMQP
SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

(2)快速入门:通过队列直接发送接收消息

流程图:

springamqp如何收发消息?

实现流程:引入spring-boot-starter-amgp依赖创建publish模块和consumer模块配置rabbitmg服务端信息利用rabbittemplate发送消息利用@rabbitlistener注解声明要监听的队列,监听消息
  1. 引入spring-boot-starter-amgp依赖
<!-- rabbitmq核心依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  1. 配置rabbitmg服务端信息
# bpulish 和consumer模块的配置一致spring:rabbitmq:host: localhost # 主机port:5672# 主机端口username: test # 用户password: test # 密码virtual-host: test # 虚拟主机
  1. 利用rabbittemplate发送消息(publish模块)
packagecn.varin.rabbitmq.publish;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;importjava.time.LocalDateTime;importstaticorg.junit.jupiter.api.Assertions.*;@SpringBootTestclassPublishTestTest{@ResourceprivateRabbitTemplate rabbitTemplate;/** * 直接向队列中发送消息 testQueue为队列名称 */@Testvoidsend(){ rabbitTemplate.convertAndSend("testQueue","hello");}}
  1. 利用@rabbitlistener注解声明要监听的队列,监听消息(consumer模块
packagecn.varin.rabbitmq.consumer;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassQueueConsumer{/** * 直接接收队列的消息 * @param message */@RabbitListener(queues ={"testQueue"})publicvoidgetMessage(String message){ log.info("message:{}", message);}}

六、WorkQueues模型

解释:WorkQueues模型就是在一个队列绑定了多个消费者

解决问题:当生产者生产过多的消息,导致消息堆积的时候,多个消费者可以摊消息,从而提供消息的处理速度。

缺点:在默认情况下每个consumer消费消息时,是进行轮询等待的(你一个,我一个,平均分),这样的话如果某一consumer性能比较差的话,就会增加处理消息的时间。

优化:添加prefetch配置,将他设置为1

解释:表示每次只能获取一条消息,处理完成才能获取下一个消息
1. 测试一:一个队列多个消费者同时消费(Work模式)
建立simple.work队列
发送消息
@TestvoidworkQueueTest()throwsInterruptedException{for(int i =1; i <=10; i++){ rabbitTemplate.convertAndSend("simple.work","消息条数"+i);}}
接收消息(三个consumer)
/** * 模拟work模型 */@RabbitListener(queues ="simple.work")publicvoidgetWorkMessage(String message)throwsInterruptedException{ log.info("message1:{}", message);}@RabbitListener(queues ="simple.work")publicvoidgetWorkMessage2(String message)throwsInterruptedException{ log.info("message2:{}", message);}
效果
2. 测试二:一个队列多个消费者同时消费(Work模式prefetch版本)
  1. consumer模块添加配置
server:port:9999spring:rabbitmq:host: varin.cn # 主机port:5672# 主机端口username: test # 用户password: test # 密码virtual-host: test # 虚拟主机listener:simple:prefetch:1#每次只能获取一条消息,处理完成才能获取下一个消息
  1. 添加consumer延时时间(模拟性能不同)
@RabbitListener(queues ="simple.work")publicvoidgetWorkMessage(String message)throwsInterruptedException{ log.info("message1:{}", message);Thread.sleep(1000);}@RabbitListener(queues ="simple.work")publicvoidgetWorkMessage2(String message)throwsInterruptedException{ log.info("message2:{}", message);Thread.sleep(3000);
  1. 测试效果
结果中可以看出:

message1消费了7条消息

message2消费了3条消息
3. 总结
Work模型的使用:多个消费者绑定到一个队列,同一条消息只会被一个消费者处理通过设置prefetch来控制消费者预取的消息数量

七、交换机类型(Exchange)

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

八、Fanout类型交换机

作用:消息发送到该类型的交换机,它会将消息通过广播的方式转发的它绑定的队列

1. Fanout交换机案例实现

  • 实现流程图
  • 创建一个名为 simple.fanout的交换机,类型是Fanout

  • 创建两个队列fanout.queue1fanout.queue2,绑定到交换机simple.fanout
    • 创建
- 绑定 
  • publish发送
@TestvoidfanoutExchangeToQueueTest()throwsInterruptedException{/** * 参数一:交换机名称 * 参数三:消息内容 */ rabbitTemplate.convertAndSend("simple.fanout","","fanoutExchangeToQueueTest");}
  • consumer接收
@RabbitListener(queues ="fanout.queue1")publicvoidgetFanoutMessage1(String message)throwsInterruptedException{ log.info("fanout.queue1,message:{}", message);}@RabbitListener(queues ="fanout.queue2")publicvoidgetFanoutMessage2(String message)throwsInterruptedException{ log.info("fanout.queue2,message:{}", message);}
  • 结果

2. 总结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

九、Direct类型交换机

作用:通过RoutingKey与队列进行绑定,根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

1. Direct交换机案例实现

  1. 需求流程图:
  1. 声明一个名为simple.direct的交换机
  1. 声明队列direct.queue1,绑定simple.directbindingKeybludred
  1. 声明队列direct.queue2,绑定simple.directbindingKeyyellowred
  1. 在publisher中编写测试方法,向simple.direct发送消息
@TestvoiddirectExchangeToQueueTest()throwsInterruptedException{ rabbitTemplate.convertAndSend("simple.direct","red","red"); rabbitTemplate.convertAndSend("simple.direct","yellow","yellow"); rabbitTemplate.convertAndSend("simple.direct","blue","blue");}
  1. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
/** * direct交换机 */@RabbitListener(queues ="direct.queue1")publicvoidgetDirectMessage1(String message)throwsInterruptedException{ log.info("direct.queue1,message:{}", message);}@RabbitListener(queues ="direct.queue2")publicvoidgetDirectMessage2(String message)throwsInterruptedException{ log.info("direct.queue2,message:{}", message);}
  1. 效果图

2. 总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

十、Topic交换机

作用:topic交换机和direct交换机使用方法一致,不同点就是topic交换机在设置路由名称时,可以使用通配符代替。

通配符:#:匹配一个或多个词*:匹配不多不少恰好1个词

1. Topic交换机案例实现

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news
  1. 声明一个名为simple.topic的交换机
  1. 声明队列topic.queue1,绑定china.#
  1. 声明队列topic.queue2,绑定#.news
  1. 在publisher中编写测试方法,向simple.topic发送消息
@TestvoidtopicExchangeToQueueTest()throwsInterruptedException{ rabbitTemplate.convertAndSend("simple.topic","chain.abc","chain.abc"); rabbitTemplate.convertAndSend("simple.topic","chain.news","chain.news"); rabbitTemplate.convertAndSend("simple.topic","fujian.news","fujian.news"); rabbitTemplate.convertAndSend("simple.topic","chain.fujian","chain.fujian");}
  1. consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
/** * topic交换机 */@RabbitListener(queues ="topic.queue1")publicvoidgettopicMessage1(String message)throwsInterruptedException{ log.info("topic.queue1,message:{}", message);}@RabbitListener(queues ="topic.queue2")publicvoidgettopicMessage2(String message)throwsInterruptedException{ log.info("topic.queue2,message:{}", message);
  1. 效果

2. 总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

十一、声明队列和交换机

由于之前都是使用web控制台的方式创建队列和交换机的,SpirngAMQT其实提供了代码自定义的方式。

编码方式声明

(1)fanout示例

packagecn.varin.rabbitmq.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/** * 创建fanout交换机 */@BeanpublicFanoutExchangefanoutExchange(){returnExchangeBuilder.fanoutExchange("customize.fanout").build();}@BeanpublicQueuequeue1(){returnnewQueue("customize.queue1");}// 绑定@BeanpublicBindingbinding1(){returnBindingBuilder.bind(queue1()).to(fanoutExchange());}}

(2)direct示例

注意:如果有多个routingKey需要绑定的话,就需要创建多个Binding

packagecn.varin.rabbitmq.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{/** * 创建Direct交换机 */@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("customize.direct").build();}@BeanpublicQueuedirectQueue1(){returnnewQueue("customize.direct.queue1");}// 绑定@BeanpublicBindingdirectBinding1(){/** * 目的地 * 类型 * 交换机 * routingkey */returnnewBinding("customize.direct.queue1",Binding.DestinationType.QUEUE,"customize.direct","red",null);}}

基于注解声明

(1)Fanout交换机
/** *基于注解声明fanout */@RabbitListener( bindings =@QueueBinding( value =@Queue(name ="test.queue1"),// 创建队列// 创建交换机并且指定类型 exchange =@Exchange(name ="test.fanout",type =ExchangeTypes.FANOUT)))publicvoidgetFanoutMessage(String message){System.out.println(message);}
(2)Direct交换机
/** *基于注解声明dirext */@RabbitListener( bindings =@QueueBinding( value =@Queue(name ="test.direct.queue1"), exchange =@Exchange(name ="test.direct.exchange",type =ExchangeTypes.DIRECT), key ={"routing1,routing2"}))publicvoidgetDirectMessage(String message){System.out.println(message);}
(3) Topic交换机
/** *基于注解声明topic */@RabbitListener( bindings =@QueueBinding( value =@Queue(name ="test.topic.queue1"), exchange =@Exchange(name ="test.topic.exchange",type =ExchangeTypes.TOPIC), key ={"#.topic"}))publicvoidgetTopicMessage(String message){System.out.println(message);}

十二、消息转换器

( 1 )默认转换器测试

默认情况下Spring采用的序列化方式是JDK序列化存在问题:可能存在安全漏洞,以及序列化话后占用空间大

示例

  • 建立一个队列:customize.queue
  • 发送消息
@TestvoidToQueueTest()throwsInterruptedException{HashMap<String,Object> map =newHashMap<>(); map.put("id",1); rabbitTemplate.convertAndSend("customize.queue",map);}
  • 查看效果

( 2 )配置JSON转换器

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
  • 引入依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
  • 编写json配置Bean
packagecn.varin.rabbitmq.config;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassJsonConfig{@BeanpublicMessageConvertermessageConverter(){// 创建Jackson2JsonMessageConverter实例Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}}
  • 效果

Read more

《算法题讲解指南:优选算法-位运算》--35.两个整数之和,36.只出现一次的数字 ||,37.消失的两个数字

《算法题讲解指南:优选算法-位运算》--35.两个整数之和,36.只出现一次的数字 ||,37.消失的两个数字

🔥小叶-duck:个人主页 ❄️个人专栏:《Data-Structure-Learning》 《C++入门到进阶&自我学习过程记录》《算法题讲解指南》--从优选到贪心 ✨未择之路,不须回头 已择之路,纵是荆棘遍野,亦作花海遨游 目录 35.两个整数之和 题目链接: 题目描述: 题目示例: 解法(位运算): 算法思路: C++算法代码: 算法总结及流程解析: 36.只出现一次的数字 || 题目链接: 题目描述: 题目示例: 解法(比特位计数): 算法思路: C++算法代码: 算法总结及流程解析: 38. 消失的两个数字 题目链接: 题目描述: 题目示例: 解法(位运算): 算法思路: C++算法代码: 算法总结及流程解析: 结束语

【算法进阶】滑动窗口与前缀和:从“和为 K”到“最小覆盖子串”的极限挑战

【算法进阶】滑动窗口与前缀和:从“和为 K”到“最小覆盖子串”的极限挑战

【算法进阶】滑动窗口与前缀和:从“和为 K”到“最小覆盖子串”的极限挑战 我的主页:寻星探路个人专栏:《JAVA(SE)----如此简单!!! 》《从青铜到王者,就差这讲数据结构!!!》 《数据库那些事!!!》《JavaEE 初阶启程记:跟我走不踩坑》 《JavaEE 进阶:从架构到落地实战 》《测试开发漫谈》 《测开视角・力扣算法通关》《从 0 到 1 刷力扣:算法 + 代码双提升》 《Python 全栈测试开发之路》没有人天生就会编程,但我生来倔强!!! 寻星探路的个人简介: 1. 题目一:和为 K 的子数组 (LeetCode 560) 题目背景 这道题虽然被归类在子串/子数组中,但它是**“滑动窗口”思想的变体—

计算机毕业设计源码:Python58同城租房数据分析可视化系统 Django框架 可视化 Requests爬虫 房子 租房 房屋 数据分析 大模型 大数据(建议收藏)✅

计算机毕业设计源码:Python58同城租房数据分析可视化系统 Django框架 可视化 Requests爬虫 房子 租房 房屋 数据分析 大模型 大数据(建议收藏)✅

博主介绍:✌全网粉丝50W+,前互联网大厂软件研发、集结硕博英豪成立软件开发工作室,专注于计算机相关专业项目实战6年之久,累计开发项目作品上万套。凭借丰富的经验与专业实力,已帮助成千上万的学生顺利毕业,选择我们,就是选择放心、选择安心毕业✌ > 🍅想要获取完整文章或者源码,或者代做,拉到文章底部即可与我联系了。🍅 点击查看作者主页,了解更多项目! 🍅感兴趣的可以先收藏起来,点赞、关注不迷路,大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助同学们顺利毕业 。🍅 1、毕业设计:2026年计算机专业毕业设计选题汇总(建议收藏)✅ 2、最全计算机大数据专业毕业设计选题大全(建议收藏)✅ 1、项目介绍 技术栈 Python语言、Django框架、MySQL数据库、Echarts可视化工具、requests爬虫框架,用于58同城租房数据的采集清洗、多维度分析与可视化展示。 功能模块 · 租房数据可视化大屏 · 租房数据管理 · 系统首页 · 租房数据条件查询 · 评论功能 · 租房数据展示 项目介绍 本系统基于Python语言与Django框架开发,

基于Python豆瓣电影数据可视化分析设计与实现

基于Python豆瓣电影数据可视化分析设计与实现

在电影数据繁杂且难以有效利用的背景下,本研究旨在搭建豆瓣电影数据可视化分析体系。该系统功能涵盖数据采集、处理、分析与展示,通过Python爬虫技术采集数据,运用Pandas库处理数据,借助Matplotlib、Seaborn等库实现数据可视化。系统采用分层架构设计,保障高效运行与可扩展性。经测试,系统稳定可靠,能为电影爱好者提供精准影片推荐,帮助其提升观影体验;为电影行业从业者在影片制作、发行、营销等方面提供数据支撑,助力行业精细化运营;为学术研究提供数据资源与研究思路,推动电影相关学科发展。总之,该系统在多领域具有重要应用价值,有效解决了电影数据利用难题。 关键词:电影数据可视化;Python爬虫;数据处理;数据可视化技术 研究的目的与意义 研究的目的 在信息爆炸的当下,互联网中蕴藏着海量数据,电影领域亦不例外。豆瓣作为国内极具影响力的电影分享与评论平台,积累了丰富且多元的电影数据,涵盖电影基本信息、用户评分、评论以及各类标签等。这些数据宛如一座宝藏,蕴含着用户行为模式、电影市场走向、大众审美偏好等有价值的信息。然而,原始数据往往繁杂无序,如同未经雕琢的璞玉,难以直接发挥其作

阿里云全品类 8 折券限时领,建站 / AI / 存储通用 立即领取