跳到主要内容
极客日志极客日志
首页博客AI提示词GitHub精选代理工具
搜索
|注册
博客列表
Javajava

Spring Boot 整合 RabbitMQ 实战

综述由AI生成Spring Boot 整合 RabbitMQ 支持 Fanout、Direct 和 Topic 三种模式。通过配置类或注解方式定义交换机、队列及绑定关系,演示了生产者和消费者的业务代码实现。重点展示了依赖引入、YAML 配置以及消息发送与接收的具体逻辑,适用于基于 Spring AMQP 的消息队列开发场景。

ApiHolic发布于 2026/3/15更新于 2026/4/2614 浏览

本节讲解 RabbitMQ 与 Spring Boot 的整合方式。需提前准备好 Spring 项目基本框架。

整合时有两种方式创建交换机、队列及它们的绑定关系:

  1. 添加配置类(加上@Configuration注解和@Bean注解),适用于 topic 模式、direct 模式,也需要用到@RabbitListener,只起监听队列的作用。
  2. 用注解创建(@RabbitListener),注解中的内容比之增加,用来创建交换机、队列及它们的绑定关系,以及监听队列。

Fanout 模式

1. 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后在 Spring 项目中创建两个模块,一个生产者模块,一个消费者模块。

2. 在 yml/properties 文件进行相应的配置

分别在两个模块中都进行创建。

server:
  port: 8080
spring:
  application:
    name: RabbitMQ-demo
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    host: 127.0.0.1
    port: 5672

3. 创建配置类(创建相应的交换机与队列,且绑定关系)

分别在两个模块中都进行创建(防止一方没有配置类启动时报错)。

package com.lx.producer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
 org.springframework.amqp.core.Queue;
 org.springframework.context.annotation.Bean;
 org.springframework.context.annotation.Configuration;


   {
    
    
     FanoutExchange  {
        
          (, , );
    }

    
    
     Queue  {
          (, );
    }

    
     Queue  {
          (, );
    }

    
     Queue  {
          (, );
    }

    
    
     Binding  {
         BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    
     Binding  {
         BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }

    
     Binding  {
         BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
}
import
import
import
@Configuration
public
class
RabbitMqConfiguration
// 创建一个 fanout 类型的交换机
@Bean
public
fanoutExchange
()
// 第一个参数是交换机的名字,第二个是是否持久化,第三个是是否自动删除
return
new
FanoutExchange
"fanout_order_exchange"
true
false
// 创建三个队列
@Bean
public
smsQueue
()
return
new
Queue
"sms.fanout.queue"
true
@Bean
public
duanxinQueue
()
return
new
Queue
"duanxin.fanout.queue"
true
@Bean
public
emailQueue
()
return
new
Queue
"email.fanout.queue"
true
// 三个队列分别进行绑定关系
@Bean
public
smsBinding
()
return
@Bean
public
duanxinBinding
()
return
@Bean
public
emailBinding
()
return

4. 编写业务代码

在生产者模块中的 service 层中创建相应的生产信息业务类。

package com.lx.producer.service;

import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;

@Service
public class OrderService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void makeOrder(String userId, String productId, Integer numbers) {
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:" + orderId);
        // 通过 MQ 完成消息的发送
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }
}

在消费者模块中创建 3 个相应的消费信息业务类(注意:必须要在类上写上@RabbitListener 注解,表示监听某个队列)。

package com.lx.consumer.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"duanxin.fanout.queue"})
public class FanoutDuanxinConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("duanxin.fanout---接收到的信息是:" + message);
    }
}
package com.lx.consumer.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"email.fanout.queue"})
public class FanoutEmailConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("email.fanout---接收到的信息是:" + message);
    }
}
package com.lx.consumer.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class FanoutSmsConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("sms.fanout---接收到的信息是:" + message);
    }
}

最后先启动 RabbitMQ,再启动两个项目,我们就能在消费者中得到生产者发送的消息了。

Direct 模式

Direct 模式与 Fanout 模式的代码差不多,只是要把配置类和生产者的业务处理类修改一下。

配置类修改如下(把刚才 Fanout 命名都改为 Direct 了)。

package com.lx.producer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfiguration {
    // 创建一个 direct 类型的交换机
    @Bean
    public DirectExchange directExchangeExchange() {
        // 第一个参数是交换机的名字,第二个是是否持久化,第三个是是否自动删除
        return new DirectExchange("direct_order_exchange", true, false);
    }

    // 创建三个队列
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.direct.queue", true);
    }

    @Bean
    public Queue duanxinQueue() {
        return new Queue("duanxin.direct.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.direct.queue", true);
    }

    // 三个队列分别进行绑定关系
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchangeExchange()).with("sms");
    }

    @Bean
    public Binding duanxinBinding() {
        return BindingBuilder.bind(duanxinQueue()).to(directExchangeExchange()).with("duanxin");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchangeExchange()).with("email");
    }
}

改成创建 direct 类型的交换机以及路由与队列绑定时加了 key。

生产者业务类。

package com.lx.producer.service;

import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;

@Service
public class OrderService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void makeOrder(String userId, String productId, Integer numbers) {
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:" + orderId);
        // 通过 MQ 完成消息的发送
        String exchangeName = "direct_order_exchange";
        String routingKey1 = "sms";
        String routingKey2 = "duanxin";
        rabbitTemplate.convertAndSend(exchangeName, routingKey1, orderId);
        rabbitTemplate.convertAndSend(exchangeName, routingKey2, orderId);
    }
}

其余的代码不要变,把所有名字从 Fanout 改为 Direct 就行。

Topic 模式

Topic 模式中我们使用注解的方式来创建交换机、队列以及它们之间的绑定关系,只需要把消费者业务代码及生产者业务代码修改下即可。

消费者业务代码(修改@RabbitListener 注解)。

package com.lx.consumer.service;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "duanxin.topic.queue", durable = "true", autoDelete = "false"),
    exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
    key = "#.duanxin.#"
))
public class TopicDuanxinConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("duanxin.topic---接收到的信息是:" + message);
    }
}
package com.lx.consumer.service;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "topic.email.queue", durable = "true", autoDelete = "false"),
    exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
    key = "*.email.#"
))
public class TopicEmailConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("email.topic---接收到的信息是:" + message);
    }
}
package com.lx.consumer.service;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "sms.topic.queue", durable = "true", autoDelete = "false"),
    exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
    key = "com.#"
))
public class TopicSmsConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("sms.topic---接收到的信息是:" + message);
    }
}

生产者业务代码(只修改 routing key 就行,通配符模式)。

package com.lx.producer.service;

import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;

@Service
public class OrderService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void makeOrder(String userId, String productId, Integer numbers) {
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:" + orderId);
        // 通过 MQ 完成消息的发送
        String exchangeName = "topic_order_exchange";
        String routingKey1 = "com.email.duanxin";
        // String routingKey2 = "duanxin";
        rabbitTemplate.convertAndSend(exchangeName, routingKey1, orderId);
        // rabbitTemplate.convertAndSend(exchangeName, routingKey2, orderId);
    }
}

目录

  1. Fanout 模式
  2. 1. 引入依赖
  3. 2. 在 yml/properties 文件进行相应的配置
  4. 3. 创建配置类(创建相应的交换机与队列,且绑定关系)
  5. 4. 编写业务代码
  6. Direct 模式
  7. Topic 模式
  • 💰 8折买阿里云服务器限时8折了解详情
  • 💰 8折买阿里云服务器限时8折购买
  • 🦞 5分钟部署阿里云小龙虾了解详情
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 机器人调试学习规划
  • C/C++ 动态规划:二维路径问题实战
  • 国内外 AI 大模型对比及国产 AI 核心优势解析
  • Android 位置权限组陷阱与数组二分查找索引坑
  • NVIDIA DGX Spark 部署 Stable Diffusion 3.5 与 ComfyUI
  • 大模型 RAG 中关键字检索的实现与实战
  • OpenClaw:开源 AI 智能体框架的技术架构与实战指南
  • OpenClaw macOS 安装与配置指南
  • 线性代数导引:可构造数域 K
  • GitHub 上值得关注的计算机视觉开源项目
  • FPGA 嵌入式块存储器 RAM:原理与实现指南
  • Eclipse代码提示功能设置(Java & C/C++)
  • VSCode 远程连接 Linux 配置:离线安装与免密登录
  • 使用 frontend-design Skill 提升大模型前端设计审美
  • Python 容器性能优化:列表、集合与字典的高效用法
  • Google AI Studio 生成代码打包 Android APK 教程
  • OpenClaw 在 Ubuntu 22.04 部署及对接百炼模型指南
  • Claude Opus 4.6 上线 DigitalOcean:支持百万上下文推理
  • C++大模型 SDK 开发:流式交互协议 SSE 解析与 httplib 实现原理
  • ModelSim 仿真软件安装与使用指南

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online