RabbitMQ - 第一个 Hello World 程序:SpringBoot 版极简集成

RabbitMQ - 第一个 Hello World 程序:SpringBoot 版极简集成
在这里插入图片描述
👋 大家好,欢迎来到我的技术博客!
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕RabbitMQ这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!

文章目录

RabbitMQ - 第一个 Hello World 程序:SpringBoot 版极简集成

在现代分布式系统架构中,消息队列扮演着至关重要的角色。它不仅能够解耦系统组件,还能实现异步处理、流量削峰、可靠投递等多种功能。而在众多消息队列中间件中,RabbitMQ 凭借其稳定性、易用性以及丰富的功能特性,成为 Java 开发者最常选择的消息中间件之一。

今天,我们将从零开始,用 Spring Boot 构建一个最简单的 RabbitMQ “Hello World” 程序。这个程序虽然简单,但却是理解 RabbitMQ 与 Spring Boot 集成机制的基石。通过本文,你将掌握:

  • RabbitMQ 的基本概念
  • Spring Boot 如何自动配置 RabbitMQ
  • 消息的发送与接收
  • 常见配置项说明
  • 调试与排错技巧

准备好了吗?让我们一起开启 RabbitMQ 的旅程吧!🚀


🐰 什么是 RabbitMQ?

RabbitMQ 是一个开源的消息代理(Message Broker)和队列服务器,基于 AMQP(Advanced Message Queuing Protocol)协议实现。它允许应用程序通过“生产者-消费者”模型进行异步通信。

核心概念

在深入代码之前,先了解几个关键术语:

  • Producer(生产者):发送消息的应用。
  • Consumer(消费者):接收并处理消息的应用。
  • Queue(队列):存储消息的缓冲区,遵循 FIFO(先进先出)原则。
  • Exchange(交换机):接收生产者的消息,并根据规则将消息路由到一个或多个队列。
  • Binding(绑定):建立 Exchange 与 Queue 之间的关联规则。
  • Routing Key(路由键):生产者发送消息时指定的字符串,用于 Exchange 决定如何路由消息。
💡 在最简单的“Hello World”场景中,我们通常使用默认的 Direct Exchange 和一个命名队列,无需显式声明 Exchange 或 Binding。

RabbitMQ 官方文档对这些概念有非常清晰的解释,推荐阅读 RabbitMQ Tutorials


🧱 环境准备

要运行我们的示例,你需要以下环境:

  1. JDK 8+(推荐 JDK 17)
  2. Maven 或 Gradle(本文使用 Maven)
  3. RabbitMQ 服务

安装 RabbitMQ

最简单的方式是使用 Docker:

docker run -d --hostname my-rabbit --name rabbitmq \ -p 5672:5672 -p 15672:15672 \ rabbitmq:3-management 
  • 5672 是 AMQP 协议端口(客户端连接用)
  • 15672 是管理界面端口(浏览器访问)

启动后,你可以通过 http://localhost:15672 访问管理后台,默认账号密码为 guest/guest

✅ 如果你不想用 Docker,也可以直接下载安装包:RabbitMQ Download

🌱 创建 Spring Boot 项目

使用 Spring Initializr 快速生成项目。

选择以下依赖:

  • Spring Web
  • Spring for RabbitMQ

或者直接在 pom.xml 中添加:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
📌 注意:spring-boot-starter-amqp 是 Spring Boot 对 RabbitMQ 的官方支持模块,内部封装了 spring-rabbit,提供了自动配置、模板类、监听器等便利功能。

⚙️ 配置 RabbitMQ 连接

application.yml 中添加 RabbitMQ 连接信息:

spring:rabbitmq:host: localhost port:5672username: guest password: guest virtual-host: / 
🔐 Virtual Host(虚拟主机) 是 RabbitMQ 的命名空间机制,类似数据库中的 schema。默认 / 即可。

Spring Boot 会自动读取这些配置,并创建 ConnectionFactoryRabbitTemplateRabbitAdmin 等核心 Bean。


📤 发送消息:RabbitTemplate

RabbitTemplate 是 Spring 提供的用于发送消息的模板类,类似于 RestTemplateJdbcTemplate

创建一个 Controller

// HelloController.javapackagecom.example.demo;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassHelloController{@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/send")publicStringsendMessage(@RequestParamString msg){ rabbitTemplate.convertAndSend("hello.queue", msg);return"Message sent: "+ msg;}}

这里我们调用 convertAndSend(String routingKey, Object message) 方法:

  • 第一个参数 "hello.queue" 实际上是 队列名称(在默认 Direct Exchange 下,routing key 与 queue name 相同)
  • 第二个参数是消息内容,可以是任意对象,Spring 会自动序列化为字节
🔄 注意:在 RabbitMQ 中,默认 Exchange 是一个特殊的 Direct Exchange,它会将消息路由到与 routing key 同名的队列。因此,当我们发送到 "hello.queue" 时,RabbitMQ 会尝试将消息投递到名为 hello.queue 的队列。

📥 接收消息:@RabbitListener

消息的消费通过 @RabbitListener 注解实现,非常简洁。

创建一个 Listener

// HelloListener.javapackagecom.example.demo;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassHelloListener{@RabbitListener(queues ="hello.queue")publicvoidreceiveMessage(String message){System.out.println("【收到消息】: "+ message);}}
  • @RabbitListener(queues = "hello.queue") 表示监听名为 hello.queue 的队列
  • 方法参数 String message 会被 Spring 自动反序列化
🧠 Spring 默认使用 SimpleMessageConverter,它支持 Stringbyte[]Serializable 对象。对于 JSON,我们可以稍后自定义 MessageConverter

▶️ 启动并测试

  1. 启动 Spring Boot 应用
  2. 访问 http://localhost:8080/send?msg=Hello%20RabbitMQ

你会在控制台看到:

【收到消息】: Hello RabbitMQ 

同时,在 RabbitMQ 管理界面(http://localhost:15672)的 Queues 标签页中,可以看到 hello.queue 队列已被自动创建(因为 RabbitAdmin 默认启用自动声明)。


🧩 自动声明队列:为什么能成功?

你可能会疑惑:我们并没有显式创建 hello.queue 队列,为什么消息能被成功接收?

这得益于 Spring Boot 的 自动声明机制

@RabbitListener 注解指定了一个队列名,而该队列在 RabbitMQ 中不存在时,Spring 会自动向 Broker 声明(Declare)该队列。

显式声明队列(推荐做法)

虽然自动声明很方便,但在生产环境中,建议显式声明队列,以便控制其属性(如持久化、最大长度等)。

// RabbitConfig.javapackagecom.example.demo;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuehelloQueue(){returnnewQueue("hello.queue",true);// true 表示队列持久化}}
  • new Queue("name", durable):第二个参数表示是否持久化(重启 RabbitMQ 后队列是否保留)
  • 其他构造参数还包括:exclusive(排他性)、autoDelete(自动删除)等
💾 持久化很重要!如果队列不持久化,RabbitMQ 重启后队列会消失,导致消息丢失。

📊 消息流转流程图

让我们用 Mermaid 图来可视化整个消息传递过程:

消费者 (@RabbitListener)RabbitMQ生产者 (Controller)消费者 (@RabbitListener)RabbitMQ生产者 (Controller)发送消息到 hello.queue消息入队 (Queue: hello.queue)推送消息处理消息 (打印日志)

这个流程展示了典型的“点对点”通信模型:一个生产者发送,一个消费者接收。


🧪 深入理解:消息是如何被序列化的?

默认情况下,RabbitTemplate 使用 SimpleMessageConverter,它会将:

  • String → UTF-8 字节
  • byte[] → 原样传输
  • Serializable 对象 → Java 序列化

但 Java 原生序列化存在兼容性问题,且不可读。因此,强烈建议使用 JSON

配置 JSON 消息转换器

// RabbitConfig.java (追加)importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuehelloQueue(){returnnewQueue("hello.queue",true);}@BeanpublicMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}}

现在,你可以发送任意 Java 对象:

// 定义消息体publicclassGreeting{privateString content;publicGreeting(String content){this.content = content;}// getter/setter}// Controller 中@GetMapping("/sendObj")publicStringsendObject(){Greeting greeting =newGreeting("Hello from Object!"); rabbitTemplate.convertAndSend("hello.queue", greeting);return"Object sent";}

对应的 Listener 也要调整:

@RabbitListener(queues ="hello.queue")publicvoidreceiveObject(Greeting greeting){System.out.println("【收到对象】: "+ greeting.getContent());}
✅ 使用 JSON 不仅跨语言友好,还便于调试和监控。

🛡️ 消息确认机制(Acknowledgement)

默认情况下,Spring 的 @RabbitListener 使用 自动确认(Auto Ack) 模式:一旦消息被投递给消费者方法,RabbitMQ 就认为消息已成功处理,并从队列中删除。

但如果消费者在处理过程中崩溃,消息就会丢失!

启用手动确认

application.yml 中配置:

spring:rabbitmq:listener:simple:acknowledge-mode: manual 

然后在 Listener 中手动确认:

importcom.rabbitmq.client.Channel;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.messaging.handler.annotation.Header;@RabbitListener(queues ="hello.queue")publicvoidreceiveMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){try{System.out.println("处理消息: "+ message);// 模拟业务逻辑if("error".equals(message)){thrownewRuntimeException("模拟异常");}// 手动确认 channel.basicAck(deliveryTag,false);}catch(Exception e){try{// 拒绝消息,不重新入队 channel.basicNack(deliveryTag,false,false);}catch(IOException ex){ ex.printStackTrace();}}}
  • basicAck:确认消息处理成功
  • basicNack:拒绝消息,第三个参数 requeue 决定是否重新入队
⚠️ 在手动确认模式下,必须显式调用 ack/nack,否则消息会一直“未确认”,占用内存,最终可能导致 RabbitMQ 崩溃。

🔄 重试与死信队列(DLQ)

即使有手动确认,我们仍可能遇到临时故障(如数据库连接失败)。此时,我们希望消息能重试几次,若仍失败,则转入死信队列(Dead Letter Queue) 供人工处理。

配置重试机制

spring:rabbitmq:listener:simple:retry:enabled:truemax-attempts:3initial-interval:1000

Spring 会自动在异常时重试最多 3 次,间隔 1 秒。

死信队列配置

  1. 声明死信交换机和队列
  2. 主队列绑定死信交换机
@BeanpublicQueuedlq(){returnQueueBuilder.durable("hello.dlq").build();}@BeanpublicDirectExchangedlx(){returnnewDirectExchange("hello.dlx");}@BeanpublicBindingdlqBinding(){returnBindingBuilder.bind(dlq()).to(dlx()).with("hello.dlq");}@BeanpublicQueuehelloQueue(){returnQueueBuilder.durable("hello.queue").withArgument("x-dead-letter-exchange","hello.dlx").withArgument("x-dead-letter-routing-key","hello.dlq").build();}

当消息重试 3 次仍失败,就会被路由到 hello.dlq 队列。

你可以再写一个 Listener 监听 DLQ:

@RabbitListener(queues ="hello.dlq")publicvoidhandleDlq(String message){System.err.println("【死信队列】: "+ message);// 报警、记录日志、人工干预等}

📈 性能优化:并发消费者

默认情况下,每个 @RabbitListener 只启动一个消费者线程。如果消息量大,处理慢,会导致积压。

可以通过配置增加并发数:

spring:rabbitmq:listener:simple:concurrency:5max-concurrency:10
  • concurrency:初始消费者数量
  • max-concurrency:最大消费者数量(动态扩容)
🚀 高并发场景下,合理设置并发数能显著提升吞吐量。

🧪 单元测试:如何测试 RabbitMQ?

虽然 RabbitMQ 是外部依赖,但我们仍可以编写集成测试。

使用 Testcontainers(推荐)

@SpringBootTest@TestcontainersclassRabbitMQIntegrationTest{@ContainerstaticRabbitMQContainer rabbitMQ =newRabbitMQContainer("rabbitmq:3-management");@DynamicPropertySourcestaticvoidconfigureProperties(DynamicPropertyRegistry registry){ registry.add("spring.rabbitmq.host", rabbitMQ::getHost); registry.add("spring.rabbitmq.port", rabbitMQ::getAmqpPort); registry.add("spring.rabbitmq.username", rabbitMQ::getAdminUsername); registry.add("spring.rabbitmq.password", rabbitMQ::getAdminPassword);}@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestSendMessage()throwsInterruptedException{// 发送消息 rabbitTemplate.convertAndSend("test.queue","Hello Test");// 等待消费者处理(实际项目中应使用 CountDownLatch 或 Awaitility)Thread.sleep(1000);// 验证逻辑(例如检查数据库、日志等)}}
🧪 Testcontainers 能在测试时启动真实的 RabbitMQ 容器,确保测试环境与生产一致。

🛠️ 常见问题排查

1. 消息发送成功,但消费者没收到?

  • 检查队列名称是否一致
  • 查看 RabbitMQ 管理界面,确认消息是否在队列中
  • 检查 Listener 是否被 Spring 扫描到(@Component

2. 启动时报连接拒绝?

  • 确认 RabbitMQ 服务已启动
  • 检查 hostportusernamepassword 是否正确
  • 防火墙是否放行 5672 端口

3. 消息乱码?

  • 确保生产者和消费者使用相同的 MessageConverter
  • JSON 场景下,检查对象字段是否匹配

4. 消息丢失?

  • 队列是否持久化?(durable=true
  • 消息是否标记为持久化?(MessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  • 消费者是否启用了手动确认?
📚 更多排错技巧可参考 RabbitMQ Troubleshooting Guide

🌐 RabbitMQ 与其他消息队列对比

特性RabbitMQKafkaRocketMQ
协议AMQP自定义自定义
延迟消息插件支持不支持原生支持
事务支持不支持支持
吞吐量中等极高
易用性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

RabbitMQ 适合复杂路由、低延迟、高可靠性的场景,如订单处理、通知系统等。


🧩 扩展:发布/订阅模式

除了点对点,RabbitMQ 还支持发布/订阅(Pub/Sub)。

示例:广播消息

// 声明 Fanout Exchange@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("broadcast.exchange");}@BeanpublicQueueemailQueue(){returnnewQueue("email.queue");}@BeanpublicQueuesmsQueue(){returnnewQueue("sms.queue");}// 绑定@BeanpublicBindingbindEmail(){returnBindingBuilder.bind(emailQueue()).to(fanoutExchange());}@BeanpublicBindingbindSms(){returnBindingBuilder.bind(smsQueue()).to(fanoutExchange());}

发送消息:

rabbitTemplate.convertAndSend("broadcast.exchange","","Hello All!");

两个 Listener 都会收到消息。

broadcast.exchange

Producer

Fanout Exchange

email.queue

sms.queue

Email Consumer

SMS Consumer


🏁 总结

通过这个极简的 “Hello World” 程序,我们掌握了:

  • Spring Boot 与 RabbitMQ 的无缝集成
  • 消息的发送与接收
  • 队列的声明与配置
  • JSON 序列化、手动确认、重试、死信队列等高级特性

虽然只是一个入门示例,但它涵盖了 RabbitMQ 在 Spring Boot 中的核心用法。后续你可以在此基础上扩展:

  • 消息追踪(如集成 Sleuth)
  • 监控告警(如 Prometheus + Grafana)
  • 集群部署与高可用
🌟 记住:消息队列不是银弹,但它是构建弹性、可扩展系统的关键工具。

希望这篇博客能为你打开 RabbitMQ 的大门。Happy Coding!🎉


🔗 参考资料


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Read more

从零敲开 MySQL 的大门:库与表的基础操作实战(保姆级入门指南)

从零敲开 MySQL 的大门:库与表的基础操作实战(保姆级入门指南)

🔥海棠蚀omo:个人主页                 ❄️个人专栏:《初识数据结构》,《C++:从入门到实践》,《Linux:从零基础到实践》,《Linux网络:从不懂到不会》,《MySQL:新手入门指南》                 ✨追光的人,终会光芒万丈 博主简介: 目录 一.库的操作 1.1创建数据库 1.1.1编码集和校验集 1.1.2校验规则对数据库的影响 1.2操纵数据库 1.2.1查看数据库 1.2.2修改数据库 1.2.3删除数据库 1.3数据库的备份与恢复 二.表的操作 2.1创建表 2.2查看表结构 2.3修改表 2.3.1ADD操作

By Ne0inhk

PostgresSQL(安装教程及初始使用)

目录 LINUX安装教程 1.点击官网,进入下载页面 2.选择适合版本 3.获取下载命令 4.修改postgres账号密码 4.1进入PostgreSQL命令行 4.2启动SQL shell 4.3修改密码 4.4配置远程访问 4.5修改IP绑定 4.6 使用navicat登录pgsql PostgreSQL的基本使用 登录 数据库操作 数据表操作 LINUX安装教程 PGSQL官方网站:https://www.postgresql.org/ 1.点击官网,进入下载页面 2.选择适合版本 有两种安装方式(一、社区yum安装 二、源码包编译安装 生产环境下更推荐编译安装,本文由于是测试教学,所以在yum安装下执行操作 ) 我是用的是Linux CentOS

By Ne0inhk
精选了几道MySQL的大厂面试题,被提问的几率很高!

精选了几道MySQL的大厂面试题,被提问的几率很高!

🎥 作者简介: ZEEKLOG\阿里云\腾讯云\华为云开发社区优质创作者,专注分享大数据、Python、数据库、人工智能等领域的优质内容 🌸个人主页:长风清留杨的博客 🍃形式准则: 无论成就大小,都保持一颗谦逊的心,尊重他人,虚心学习。 ✨推荐专栏:Python入门到入魔,Mysql入门到入魔,Python入门基础大全,Flink入门到实战 🍂若缘分至此,无法再续相逢,愿你朝朝暮暮,皆有安好,晨曦微露道早安,日中炽热说午安,星河长明寄晚安🍂 MySQL面试题:如何存储IP地址? 面试官提出的问题 在MySQL中,IP地址的存储是一个常见的问题。请问你有哪些方法可以存储IP地址?并请详细解释每种方法的优缺点 问题的重点 1. 了解IP地址的存储方法。 2. 掌握不同存储方法的优缺点。 3. 能够设计合理的表结构和编写相应的SQL代码。 面试者如何回答 在MySQL中,存储IP地址通常有几种常见的方法,每种方法都有其特定的应用场景和优缺点。 方法一:使用VARCHAR类型存储 最直接的方法是将IP地址作为字符串存储。IPv4地址通常用点分十进制表示

By Ne0inhk

Node.js完全指南:从入门到精通

一、Node.js基础概念 1.1 什么是Node.js? Node.js是一个基于Chrome V8引擎的JavaScript运行环境,让JavaScript可以在服务器端运行。它使用事件驱动、非阻塞I/O模型,使其轻量且高效。 1.2 Node.js的历史 * 2009年:Ryan Dahl创建了Node.js * 2010年:NPM(Node Package Manager)诞生 * 2011年:npm 1.0发布 * 2015年:Node.js基金会成立 * 2016年:引入长期支持(LTS)版本 * 至今:持续快速发展,广泛应用于后端开发 1.3 Node.js的特点 * 单线程:使用单线程事件循环,避免线程切换开销

By Ne0inhk