RabbitMQ - 第一个 Hello World 程序:SpringBoot 版极简集成

RabbitMQ - 第一个 Hello World 程序:SpringBoot 版极简集成
在这里插入图片描述
👋 大家好,欢迎来到我的技术博客!
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕RabbitMQ这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!

文章目录

RabbitMQ - 第一个 Hello World 程序:SpringBoot 版极简集成

在现代分布式系统架构中,消息队列扮演着至关重要的角色。它不仅能够解耦系统组件,还能实现异步处理、流量削峰、可靠投递等多种功能。而在众多消息队列中间件中,RabbitMQ 凭借其稳定性、易用性以及丰富的功能特性,成为 Java 开发者最常选择的消息中间件之一。

今天,我们将从零开始,用 Spring Boot 构建一个最简单的 RabbitMQ “Hello World” 程序。这个程序虽然简单,但却是理解 RabbitMQ 与 Spring Boot 集成机制的基石。通过本文,你将掌握:

  • RabbitMQ 的基本概念
  • Spring Boot 如何自动配置 RabbitMQ
  • 消息的发送与接收
  • 常见配置项说明
  • 调试与排错技巧

准备好了吗?让我们一起开启 RabbitMQ 的旅程吧!🚀


🐰 什么是 RabbitMQ?

RabbitMQ 是一个开源的消息代理(Message Broker)和队列服务器,基于 AMQP(Advanced Message Queuing Protocol)协议实现。它允许应用程序通过“生产者-消费者”模型进行异步通信。

核心概念

在深入代码之前,先了解几个关键术语:

  • Producer(生产者):发送消息的应用。
  • Consumer(消费者):接收并处理消息的应用。
  • Queue(队列):存储消息的缓冲区,遵循 FIFO(先进先出)原则。
  • Exchange(交换机):接收生产者的消息,并根据规则将消息路由到一个或多个队列。
  • Binding(绑定):建立 Exchange 与 Queue 之间的关联规则。
  • Routing Key(路由键):生产者发送消息时指定的字符串,用于 Exchange 决定如何路由消息。
💡 在最简单的“Hello World”场景中,我们通常使用默认的 Direct Exchange 和一个命名队列,无需显式声明 Exchange 或 Binding。

RabbitMQ 官方文档对这些概念有非常清晰的解释,推荐阅读 RabbitMQ Tutorials


🧱 环境准备

要运行我们的示例,你需要以下环境:

  1. JDK 8+(推荐 JDK 17)
  2. Maven 或 Gradle(本文使用 Maven)
  3. RabbitMQ 服务

安装 RabbitMQ

最简单的方式是使用 Docker:

docker run -d --hostname my-rabbit --name rabbitmq \ -p 5672:5672 -p 15672:15672 \ rabbitmq:3-management 
  • 5672 是 AMQP 协议端口(客户端连接用)
  • 15672 是管理界面端口(浏览器访问)

启动后,你可以通过 http://localhost:15672 访问管理后台,默认账号密码为 guest/guest

✅ 如果你不想用 Docker,也可以直接下载安装包:RabbitMQ Download

🌱 创建 Spring Boot 项目

使用 Spring Initializr 快速生成项目。

选择以下依赖:

  • Spring Web
  • Spring for RabbitMQ

或者直接在 pom.xml 中添加:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
📌 注意:spring-boot-starter-amqp 是 Spring Boot 对 RabbitMQ 的官方支持模块,内部封装了 spring-rabbit,提供了自动配置、模板类、监听器等便利功能。

⚙️ 配置 RabbitMQ 连接

application.yml 中添加 RabbitMQ 连接信息:

spring:rabbitmq:host: localhost port:5672username: guest password: guest virtual-host: / 
🔐 Virtual Host(虚拟主机) 是 RabbitMQ 的命名空间机制,类似数据库中的 schema。默认 / 即可。

Spring Boot 会自动读取这些配置,并创建 ConnectionFactoryRabbitTemplateRabbitAdmin 等核心 Bean。


📤 发送消息:RabbitTemplate

RabbitTemplate 是 Spring 提供的用于发送消息的模板类,类似于 RestTemplateJdbcTemplate

创建一个 Controller

// HelloController.javapackagecom.example.demo;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassHelloController{@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/send")publicStringsendMessage(@RequestParamString msg){ rabbitTemplate.convertAndSend("hello.queue", msg);return"Message sent: "+ msg;}}

这里我们调用 convertAndSend(String routingKey, Object message) 方法:

  • 第一个参数 "hello.queue" 实际上是 队列名称(在默认 Direct Exchange 下,routing key 与 queue name 相同)
  • 第二个参数是消息内容,可以是任意对象,Spring 会自动序列化为字节
🔄 注意:在 RabbitMQ 中,默认 Exchange 是一个特殊的 Direct Exchange,它会将消息路由到与 routing key 同名的队列。因此,当我们发送到 "hello.queue" 时,RabbitMQ 会尝试将消息投递到名为 hello.queue 的队列。

📥 接收消息:@RabbitListener

消息的消费通过 @RabbitListener 注解实现,非常简洁。

创建一个 Listener

// HelloListener.javapackagecom.example.demo;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassHelloListener{@RabbitListener(queues ="hello.queue")publicvoidreceiveMessage(String message){System.out.println("【收到消息】: "+ message);}}
  • @RabbitListener(queues = "hello.queue") 表示监听名为 hello.queue 的队列
  • 方法参数 String message 会被 Spring 自动反序列化
🧠 Spring 默认使用 SimpleMessageConverter,它支持 Stringbyte[]Serializable 对象。对于 JSON,我们可以稍后自定义 MessageConverter

▶️ 启动并测试

  1. 启动 Spring Boot 应用
  2. 访问 http://localhost:8080/send?msg=Hello%20RabbitMQ

你会在控制台看到:

【收到消息】: Hello RabbitMQ 

同时,在 RabbitMQ 管理界面(http://localhost:15672)的 Queues 标签页中,可以看到 hello.queue 队列已被自动创建(因为 RabbitAdmin 默认启用自动声明)。


🧩 自动声明队列:为什么能成功?

你可能会疑惑:我们并没有显式创建 hello.queue 队列,为什么消息能被成功接收?

这得益于 Spring Boot 的 自动声明机制

@RabbitListener 注解指定了一个队列名,而该队列在 RabbitMQ 中不存在时,Spring 会自动向 Broker 声明(Declare)该队列。

显式声明队列(推荐做法)

虽然自动声明很方便,但在生产环境中,建议显式声明队列,以便控制其属性(如持久化、最大长度等)。

// RabbitConfig.javapackagecom.example.demo;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuehelloQueue(){returnnewQueue("hello.queue",true);// true 表示队列持久化}}
  • new Queue("name", durable):第二个参数表示是否持久化(重启 RabbitMQ 后队列是否保留)
  • 其他构造参数还包括:exclusive(排他性)、autoDelete(自动删除)等
💾 持久化很重要!如果队列不持久化,RabbitMQ 重启后队列会消失,导致消息丢失。

📊 消息流转流程图

让我们用 Mermaid 图来可视化整个消息传递过程:

消费者 (@RabbitListener)RabbitMQ生产者 (Controller)消费者 (@RabbitListener)RabbitMQ生产者 (Controller)发送消息到 hello.queue消息入队 (Queue: hello.queue)推送消息处理消息 (打印日志)

这个流程展示了典型的“点对点”通信模型:一个生产者发送,一个消费者接收。


🧪 深入理解:消息是如何被序列化的?

默认情况下,RabbitTemplate 使用 SimpleMessageConverter,它会将:

  • String → UTF-8 字节
  • byte[] → 原样传输
  • Serializable 对象 → Java 序列化

但 Java 原生序列化存在兼容性问题,且不可读。因此,强烈建议使用 JSON

配置 JSON 消息转换器

// RabbitConfig.java (追加)importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuehelloQueue(){returnnewQueue("hello.queue",true);}@BeanpublicMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}}

现在,你可以发送任意 Java 对象:

// 定义消息体publicclassGreeting{privateString content;publicGreeting(String content){this.content = content;}// getter/setter}// Controller 中@GetMapping("/sendObj")publicStringsendObject(){Greeting greeting =newGreeting("Hello from Object!"); rabbitTemplate.convertAndSend("hello.queue", greeting);return"Object sent";}

对应的 Listener 也要调整:

@RabbitListener(queues ="hello.queue")publicvoidreceiveObject(Greeting greeting){System.out.println("【收到对象】: "+ greeting.getContent());}
✅ 使用 JSON 不仅跨语言友好,还便于调试和监控。

🛡️ 消息确认机制(Acknowledgement)

默认情况下,Spring 的 @RabbitListener 使用 自动确认(Auto Ack) 模式:一旦消息被投递给消费者方法,RabbitMQ 就认为消息已成功处理,并从队列中删除。

但如果消费者在处理过程中崩溃,消息就会丢失!

启用手动确认

application.yml 中配置:

spring:rabbitmq:listener:simple:acknowledge-mode: manual 

然后在 Listener 中手动确认:

importcom.rabbitmq.client.Channel;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.messaging.handler.annotation.Header;@RabbitListener(queues ="hello.queue")publicvoidreceiveMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){try{System.out.println("处理消息: "+ message);// 模拟业务逻辑if("error".equals(message)){thrownewRuntimeException("模拟异常");}// 手动确认 channel.basicAck(deliveryTag,false);}catch(Exception e){try{// 拒绝消息,不重新入队 channel.basicNack(deliveryTag,false,false);}catch(IOException ex){ ex.printStackTrace();}}}
  • basicAck:确认消息处理成功
  • basicNack:拒绝消息,第三个参数 requeue 决定是否重新入队
⚠️ 在手动确认模式下,必须显式调用 ack/nack,否则消息会一直“未确认”,占用内存,最终可能导致 RabbitMQ 崩溃。

🔄 重试与死信队列(DLQ)

即使有手动确认,我们仍可能遇到临时故障(如数据库连接失败)。此时,我们希望消息能重试几次,若仍失败,则转入死信队列(Dead Letter Queue) 供人工处理。

配置重试机制

spring:rabbitmq:listener:simple:retry:enabled:truemax-attempts:3initial-interval:1000

Spring 会自动在异常时重试最多 3 次,间隔 1 秒。

死信队列配置

  1. 声明死信交换机和队列
  2. 主队列绑定死信交换机
@BeanpublicQueuedlq(){returnQueueBuilder.durable("hello.dlq").build();}@BeanpublicDirectExchangedlx(){returnnewDirectExchange("hello.dlx");}@BeanpublicBindingdlqBinding(){returnBindingBuilder.bind(dlq()).to(dlx()).with("hello.dlq");}@BeanpublicQueuehelloQueue(){returnQueueBuilder.durable("hello.queue").withArgument("x-dead-letter-exchange","hello.dlx").withArgument("x-dead-letter-routing-key","hello.dlq").build();}

当消息重试 3 次仍失败,就会被路由到 hello.dlq 队列。

你可以再写一个 Listener 监听 DLQ:

@RabbitListener(queues ="hello.dlq")publicvoidhandleDlq(String message){System.err.println("【死信队列】: "+ message);// 报警、记录日志、人工干预等}

📈 性能优化:并发消费者

默认情况下,每个 @RabbitListener 只启动一个消费者线程。如果消息量大,处理慢,会导致积压。

可以通过配置增加并发数:

spring:rabbitmq:listener:simple:concurrency:5max-concurrency:10
  • concurrency:初始消费者数量
  • max-concurrency:最大消费者数量(动态扩容)
🚀 高并发场景下,合理设置并发数能显著提升吞吐量。

🧪 单元测试:如何测试 RabbitMQ?

虽然 RabbitMQ 是外部依赖,但我们仍可以编写集成测试。

使用 Testcontainers(推荐)

@SpringBootTest@TestcontainersclassRabbitMQIntegrationTest{@ContainerstaticRabbitMQContainer rabbitMQ =newRabbitMQContainer("rabbitmq:3-management");@DynamicPropertySourcestaticvoidconfigureProperties(DynamicPropertyRegistry registry){ registry.add("spring.rabbitmq.host", rabbitMQ::getHost); registry.add("spring.rabbitmq.port", rabbitMQ::getAmqpPort); registry.add("spring.rabbitmq.username", rabbitMQ::getAdminUsername); registry.add("spring.rabbitmq.password", rabbitMQ::getAdminPassword);}@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestSendMessage()throwsInterruptedException{// 发送消息 rabbitTemplate.convertAndSend("test.queue","Hello Test");// 等待消费者处理(实际项目中应使用 CountDownLatch 或 Awaitility)Thread.sleep(1000);// 验证逻辑(例如检查数据库、日志等)}}
🧪 Testcontainers 能在测试时启动真实的 RabbitMQ 容器,确保测试环境与生产一致。

🛠️ 常见问题排查

1. 消息发送成功,但消费者没收到?

  • 检查队列名称是否一致
  • 查看 RabbitMQ 管理界面,确认消息是否在队列中
  • 检查 Listener 是否被 Spring 扫描到(@Component

2. 启动时报连接拒绝?

  • 确认 RabbitMQ 服务已启动
  • 检查 hostportusernamepassword 是否正确
  • 防火墙是否放行 5672 端口

3. 消息乱码?

  • 确保生产者和消费者使用相同的 MessageConverter
  • JSON 场景下,检查对象字段是否匹配

4. 消息丢失?

  • 队列是否持久化?(durable=true
  • 消息是否标记为持久化?(MessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  • 消费者是否启用了手动确认?
📚 更多排错技巧可参考 RabbitMQ Troubleshooting Guide

🌐 RabbitMQ 与其他消息队列对比

特性RabbitMQKafkaRocketMQ
协议AMQP自定义自定义
延迟消息插件支持不支持原生支持
事务支持不支持支持
吞吐量中等极高
易用性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

RabbitMQ 适合复杂路由、低延迟、高可靠性的场景,如订单处理、通知系统等。


🧩 扩展:发布/订阅模式

除了点对点,RabbitMQ 还支持发布/订阅(Pub/Sub)。

示例:广播消息

// 声明 Fanout Exchange@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("broadcast.exchange");}@BeanpublicQueueemailQueue(){returnnewQueue("email.queue");}@BeanpublicQueuesmsQueue(){returnnewQueue("sms.queue");}// 绑定@BeanpublicBindingbindEmail(){returnBindingBuilder.bind(emailQueue()).to(fanoutExchange());}@BeanpublicBindingbindSms(){returnBindingBuilder.bind(smsQueue()).to(fanoutExchange());}

发送消息:

rabbitTemplate.convertAndSend("broadcast.exchange","","Hello All!");

两个 Listener 都会收到消息。

broadcast.exchange

Producer

Fanout Exchange

email.queue

sms.queue

Email Consumer

SMS Consumer


🏁 总结

通过这个极简的 “Hello World” 程序,我们掌握了:

  • Spring Boot 与 RabbitMQ 的无缝集成
  • 消息的发送与接收
  • 队列的声明与配置
  • JSON 序列化、手动确认、重试、死信队列等高级特性

虽然只是一个入门示例,但它涵盖了 RabbitMQ 在 Spring Boot 中的核心用法。后续你可以在此基础上扩展:

  • 消息追踪(如集成 Sleuth)
  • 监控告警(如 Prometheus + Grafana)
  • 集群部署与高可用
🌟 记住:消息队列不是银弹,但它是构建弹性、可扩展系统的关键工具。

希望这篇博客能为你打开 RabbitMQ 的大门。Happy Coding!🎉


🔗 参考资料


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Read more

如何在分布式环境中实现高可靠性分布式锁

如何在分布式环境中实现高可靠性分布式锁

目录 一、简单了解分布式锁 (一)分布式锁:应对分布式环境的同步挑战 (二)分布式锁的实现方式 (三)分布式锁的使用场景 (四)分布式锁需满足的特点 二、Redis 实现分布式锁的基本思路(粗糙实现版本) (一)实现步骤 (二)基本代码展示 (三)上述实现的缺陷 三、健壮分布式锁聚焦 (一)误删问题的分析 问题说明 解决方案 具体实现步骤 具体代码实现 (二)原子性保证 问题场景 解决方案:使用 Lua 脚本 设置锁并设置过期时间(原子操作) 释放锁(原子操作) Java 调用 Lua 脚本 (三)超时自动解锁 问题描述 传统解决方案 改进方案:

By Ne0inhk
《剖析 Linux 文件系统:架构、原理与实战操作指南》

《剖析 Linux 文件系统:架构、原理与实战操作指南》

前引:文件系统是 Linux 系统的 “骨架”—— 它不仅决定了文件如何存储、读取,更直接影响系统的稳定性与性能。无论是 EXT4、XFS 等常见文件系统,还是 “挂载”“分区” 等核心操作,背后都有一套严谨的工作机制。本文将拆解 Linux 文件系统的底层架构,详解 inode、超级块、目录项的作用,同时搭配格式化、挂载配置、磁盘检查等实战案例,帮你打通 “理论” 与 “实操” 的壁垒! 新建一个文件,系统要做什么? 分配 inode ,形成该文件的所有属性然后保存在磁盘,同时和文件名关联 删除一个文件,系统要做什么? 查看引用计数,符合要求就将数据块和对应的 inode 取消关联,改变文件状态,允许覆盖 查找/修改一个文件,系统要做什么? 根据

By Ne0inhk

一文读懂Spring AOP:手把手教你优雅实现“无侵入”代码增强

目录 1.什么是Spring AOP? 2.SpringAOP优点与上手 Spring AOP 的核心术语 3.通知类型注解 4.@PointCut+@Order 5.切点表达式 6.代理模式 7.Spring AOP原理 1.什么是Spring AOP? AOP=>面向切片编程思想,是一种对一类问题集中处理的思想,比如拦截器,统一返回结果管理,统一异常处理,登录校验......如果使用OOP(面向结果编程)会让相同的代码重复多次出现,业务方法中混杂着非核心的逻辑。 Spring AOP就是为了解决这些问题存在,是AOP思想的其中一种实现方式 2.SpringAOP优点与上手 优点: * 不影响原有代码,解耦 * 便于维护功能 * 提高开发效率 * 减少重复代码 快速上手SpringAOP 编写一个使用SpringAOP计算所有方法的运行时长的例子 1.

By Ne0inhk
Windows安装RabbitMQ保姆级教程(图文详解)

Windows安装RabbitMQ保姆级教程(图文详解)

文章目录 * 前言 * 准备工作 * 系统要求 * 安装概述 * 第一步:下载Erlang * 1.1 访问Erlang官网 * 1.2 下载安装包 * 第二步:安装Erlang * 2.1 运行安装程序 * 2.2 安装向导 * 2.3 配置Erlang环境变量 * 2.4 验证环境变量配置 * 第三步:下载RabbitMQ * 3.1 访问RabbitMQ官网 * 3.2 选择Windows安装包 * 第四步:安装RabbitMQ * 4.1 运行安装程序 * 4.2 安装过程 * 4.3 安装完成 * 4.4 配置RabbitMQ环境变量 * 4.

By Ne0inhk