跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Javajava

Spring Boot 集成 Eclipse Mosquitto MQTT 实战

综述由AI生成本文演示了如何在 Spring Boot 项目中通过 Eclipse Paho 客户端集成 Mosquitto MQTT 服务。主要涵盖 Maven 依赖配置、YAML 连接参数设置、MqttClient 生命周期管理及回调处理、以及发布订阅工具类的封装。测试控制器验证了消息收发与自动重连机制,适用于物联网设备通信场景。

修罗发布于 2025/11/14更新于 2026/6/1424 浏览
Spring Boot 集成 Eclipse Mosquitto MQTT 实战

在 Spring Boot 项目中接入 MQTT 协议,Eclipse Paho 是最常用的 Java 客户端库。配合 Mosquitto 服务端,可以快速构建物联网或即时通讯能力。下面以实战方式梳理从依赖引入到功能测试的完整流程。

添加 MQTT 客户端依赖

首先需要在 pom.xml 中引入 Eclipse Paho 的 MQTTv3 客户端依赖,这是目前主流的 MQTT Java 实现:

<!-- MQTT 客户端 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

配置 MQTT 连接参数

接下来在 application.yml 中定义连接信息。注意若启用 TLS 加密,地址前缀应改为 ssl://,默认端口通常为 8883。这里我们使用非加密的 TCP 模式:

mqtt:
  # 是否启用
  enable: true
  # Mosquitto 服务地址(非加密端口)
  broker: tcp://localhost:1883
  # 客户端唯一标识(建议加随机数避免冲突)
  client-id: springboot-mqtt-client
  # 认证用户名(Mosquitto 启用认证时必填)
  username: user1
  # 认证密码
  password: 123456
  # 默认 QoS 等级(0/1/2)
  default-qos: 1
  # 心跳间隔(秒)
  keep-alive: 60

实现 MQTT 客户端(发布 + 订阅)

MQTT 客户端配置类

核心在于创建一个配置类来管理 实例的生命周期。这里使用了 确保只有在配置属性存在时才初始化 Bean。

MqttClient
@ConditionalOnBean
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

@Slf4j
@Configuration
@ConditionalOnBean(MqttProperties.class)
public class MqttConfig {
    private final MqttProperties mqttProp;

    public MqttConfig(MqttProperties mqttProp) {
        this.mqttProp = mqttProp;
    }

    /**
     * 创建 MQTT 客户端实例
     */
    @Bean
    public MqttClient mqttClient() throws MqttException {
        // 客户端 ID 建议添加随机数,避免重复连接
        String clientIdWithRandom = mqttProp.getClientId() + "_" + System.currentTimeMillis();
        MqttClient client = new MqttClient(mqttProp.getBroker(), clientIdWithRandom, new MemoryPersistence());

        // 配置连接参数
        MqttConnectOptions options = new MqttConnectOptions();
        if (StringUtils.hasText(mqttProp.getUsername())) {
            options.setUserName(mqttProp.getUsername());
        }
        if (StringUtils.hasText(mqttProp.getPassword())) {
            options.setPassword(mqttProp.getPassword().toCharArray());
        }
        options.setKeepAliveInterval(mqttProp.getKeepAlive());
        
        // 自动重连
        options.setAutomaticReconnect(true);
        // 不清除会话(保留订阅关系和未确认消息)
        options.setCleanSession(false);

        // 连接回调(处理连接状态)
        client.setCallback(new MqttCallback() {
            /**
             * 连接断开时触发,可在此实现重连逻辑
             */
            @Override
            public void connectionLost(Throwable cause) {
                log.error("MQTT 连接断开,原因:{}", cause.getMessage());
            }

            /**
             * 收到订阅的消息时触发,用于处理业务逻辑(如存储数据到数据库)
             */
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // 接收消息回调(订阅的主题有消息时触发)
                String content = new String(message.getPayload());
                log.debug("收到消息 - 主题:{},内容:{}", topic, content);
                // TODO 此处执行具体的业务逻辑
            }

            /**
             * 消息发布完成后触发,可用于确认消息已送达
             */
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // 消息发布完成回调
                try {
                    log.debug("消息发布成功,主题:{}", token.getTopics()[0]);
                } catch (Exception e) {
                    log.error("", e);
                }
            }
        });

        // 连接到 Mosquitto
        client.connect(options);
        log.info("MQTT 连接成功:{}", mqttProp.getClientId());
        return client;
    }
}

配置实体类

对应的属性映射类,支持动态刷新:

import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = "mqtt")
@ConditionalOnProperty(name = "mqtt.enable", havingValue = "true")
public class MqttProperties {
    /**
     * 是否启用
     */
    private boolean enable;

    /**
     * Mosquitto 服务地址(非加密端口)。若启用 TLS 加密,使用 ssl://localhost:8883
     */
    private String broker;

    /**
     * 客户端唯一标识(建议加随机数避免冲突)
     */
    private String clientId;

    /**
     * 认证用户名(Mosquitto 启用认证时必填)
     */
    private String username;

    /**
     * 认证密码
     */
    private String password;

    /**
     * 默认 QoS 等级(0/1/2),非关键数据用 QoS 0,重要状态用 QoS 1,核心控制指令用 QoS 2
     */
    private int defaultQos;

    /**
     * 心跳间隔(秒)
     */
    private int keepAlive = 60;
}
发布和订阅工具类

为了业务层解耦,我们将订阅和发布逻辑封装为独立的组件。

消息订阅工具类

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;

/**
 * MQTT 消息订阅工具类
 */
@Slf4j
@Component
@ConditionalOnBean(MqttProperties.class)
public class MqttSubscriber {
    private final MqttClient mqttClient;
    private final MqttProperties mqttProp;

    public MqttSubscriber(MqttClient mqttClient, MqttProperties mqttProp) {
        this.mqttClient = mqttClient;
        this.mqttProp = mqttProp;
    }

    /**
     * 订阅指定主题
     * @param topic 主题(支持通配符,如 sensor/+)
     */
    public void subscribe(String topic) throws MqttException {
        subscribe(topic, mqttProp.getDefaultQos());
    }

    /**
     * 订阅指定主题(自定义 QoS)
     * @param topic 主题
     * @param qos QoS 等级
     */
    public void subscribe(String topic, int qos) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect();
        }
        mqttClient.subscribe(topic, qos);
        log.info("已订阅主题:{},QoS 等级:{}", topic, qos);
    }

    /**
     * 取消订阅主题
     * @param topic 主题
     */
    public void unsubscribe(String topic) throws MqttException {
        mqttClient.unsubscribe(topic);
        log.info("已取消订阅主题:{}", topic);
    }
}

消息发布工具类

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;

/**
 * MQTT 消息发布工具类
 */
@Slf4j
@Component
@ConditionalOnBean(MqttProperties.class)
public class MqttPublisher {
    private final MqttClient mqttClient;
    private final MqttProperties mqttProp;

    public MqttPublisher(MqttClient mqttClient, MqttProperties mqttProp) {
        this.mqttClient = mqttClient;
        this.mqttProp = mqttProp;
    }

    /**
     * 发布消息到指定主题
     * @param topic 主题
     * @param content 消息内容
     */
    public void publish(String topic, String content) throws MqttException {
        publish(topic, content, mqttProp.getDefaultQos());
    }

    /**
     * 发布消息到指定主题(自定义 QoS)
     * @param topic 主题
     * @param content 消息内容
     * @param qos QoS 等级
     */
    public void publish(String topic, String content, int qos) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect(); // 若断开连接,尝试重连
        }
        log.debug("发布消息,主题:{},内容:{}, QoS 等级:{}", topic, content, qos);
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        mqttClient.publish(topic, message);
    }
}
测试 MQTT 功能

最后编写一个 Controller 来验证接口连通性。实际开发中请根据项目包结构调整导入路径:

import com.blackcrow.test.mqtt.config.MqttPublisher;
import com.blackcrow.test.mqtt.config.MqttSubscriber;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MqttTestController {
    @Autowired
    private MqttPublisher mqttPublisher;
    @Autowired
    private MqttSubscriber mqttSubscriber;
    
    @Value("${mqtt.default-topic:topic/temp}")
    private String defaultTopic;

    /**
     * 订阅主题
     */
    @GetMapping("/subscribe")
    public String subscribe(@RequestParam(required = false) String topic) {
        try {
            String targetTopic = topic != null ? topic : defaultTopic;
            mqttSubscriber.subscribe(targetTopic);
            return "订阅成功:" + targetTopic;
        } catch (MqttException e) {
            return "订阅失败:" + e.getMessage();
        }
    }

    /**
     * 发布消息
     */
    @GetMapping("/publish")
    public String publish(@RequestParam(required = false) String topic, @RequestParam String message) {
        try {
            String targetTopic = topic != null ? topic : defaultTopic;
            mqttPublisher.publish(targetTopic, message);
            return "发布成功:主题=" + targetTopic + ",消息=" + message;
        } catch (MqttException e) {
            return "发布失败:" + e.getMessage();
        }
    }
}

通过以上步骤,即可在 Spring Boot 应用中稳定运行 MQTT 通信。记得在生产环境中妥善管理密码等敏感信息,并监控连接状态。

目录

  1. 添加 MQTT 客户端依赖
  2. 配置 MQTT 连接参数
  3. 是否启用
  4. Mosquitto 服务地址(非加密端口)
  5. 客户端唯一标识(建议加随机数避免冲突)
  6. 认证用户名(Mosquitto 启用认证时必填)
  7. 认证密码
  8. 默认 QoS 等级(0/1/2)
  9. 心跳间隔(秒)
  10. 实现 MQTT 客户端(发布 + 订阅)
  11. MQTT 客户端配置类
  12. 发布和订阅工具类
  13. 测试 MQTT 功能
  • 免费图片AI生成工具免费生成了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 免费图片视频在线生成30秒,将你的创意变成现实开始设计
  • X/Twitter免费视频下载器免登陆无限额度免费视频解析下载了解详情
  • 100+免费在线小游戏爽一把
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • AI 大模型学习路线:从入门到实战指南
  • C++ 红黑树封装实战:从零实现 Map 与 Set
  • Java 动态代理核心原理与实战对比
  • 二分算法实战:A-B 数对与高考志愿匹配
  • 基于 Leaflet Trackplayer 的 WebGIS 高速轨迹可视化实战
  • 一文说清ESP32 Arduino在智能家居中的核心应用要点
  • PCL 点云处理算法与函数汇总(C++ 版)
  • 哈希表实现原理与代码详解
  • 基于 FastAPI 自动构建 SSE MCP 服务器
  • JavaScript 基础语法与核心概念详解
  • 大模型生成逻辑深度解析:从预训练到多模态应用
  • ChatGPT GPTs 安全指南:如何防止提示词与知识库泄露
  • 算法基础:二分答案应用——木材加工与砍树
  • 模拟算法实战:铺地毯、回文日期与扫雷题解
  • OpenAkita:自我进化的开源 AI 助手框架
  • Llama-Factory与PyTorch版本兼容性问题排查手册
  • 自然语言处理在金融领域的应用与实战
  • AI 时代技术民主化:文科生为何成最大受益者
  • 腾讯混元图像 3.0 图生图模型开源,LMArena 评测跻身全球第一梯队
  • Spring AI Alibaba 短期记忆与长期记忆实现原理

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online