Spring Boot 集成 Eclipse Mosquitto

Spring Boot 集成 Eclipse Mosquitto

文章目录

添加 MQTT 客户端依赖

在 Spring Boot 项目的 pom.xml 中添加 Eclipse Paho MQTT 客户端依赖(主流的 MQTT Java 客户端):

<!-- MQTT 客户端 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

配置 MQTT 连接参数

application.yml(或 application.properties)中配置 Mosquitto 连接信息:

mqtt:# 是否启用enable:true# Mosquitto 服务地址(非加密端口),若启用 TLS 加密,使用 ssl://localhost:8883broker: tcp://localhost:1883# 客户端唯一标识(建议加随机数避免冲突)client-id: springboot-mqtt-client # 认证用户名(Mosquitto 启用认证时必填)username: user1 # 认证密码password:123456# 默认 QoS 等级(0/1/2)defalut-qos:1# 心跳间隔(秒)keep-alive:60

实现 MQTT 客户端(发布 + 订阅)

MQTT 客户端配置类

配置类

importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.util.StringUtils;@Slf4j@Configuration@ConditionalOnBean(MqttProperties.class)publicclassMqttConfig{privatefinalMqttProperties mqttProp;publicMqttConfig(MqttProperties mqttProp){this.mqttProp = mqttProp;}/** * 创建 MQTT 客户端实例 */@BeanpublicMqttClientmqttClient()throwsMqttException{// 客户端 ID 建议添加随机数,避免重复连接String clientIdWithRandom = mqttProp.getClientId()+"_"+System.currentTimeMillis();MqttClient client =newMqttClient(mqttProp.getBroker(), clientIdWithRandom,newMemoryPersistence());// 配置连接参数MqttConnectOptions options =newMqttConnectOptions();if(StringUtils.hasText((mqttProp.getUsername()))) options.setUserName(mqttProp.getUsername());if(StringUtils.hasText((mqttProp.getPassword()))) options.setPassword(mqttProp.getPassword().toCharArray()); options.setKeepAliveInterval(mqttProp.getKeepAlive());// 自动重连 options.setAutomaticReconnect(true);// 不清除会话(保留订阅关系和未确认消息) options.setCleanSession(false);// 连接回调(处理连接状态) client.setCallback(newMqttCallback(){/** * 连接断开时触发,可在此实现重连逻辑 */@OverridepublicvoidconnectionLost(Throwable cause){ log.error("MQTT 连接断开,原因:{}", cause.getMessage());}/** * 收到订阅的消息时触发,用于处理业务逻辑(如存储数据到数据库) */@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{// 接收消息回调(订阅的主题有消息时触发)String content =newString(message.getPayload()); log.debug("收到消息 - 主题:{},内容:{}", topic, content);// TODO 业务逻辑}/** * 消息发布完成后触发,可用于确认消息已送达 */@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token){// 消息发布完成回调try{ log.debug("消息发布成功,主题:{}", token.getTopics()[0]);}catch(Exception e){ log.error("", e);}}});// 连接到 Mosquitto client.connect(options); log.info("MQTT 连接成功:{}", mqttProp.getClientId());return client;}}

配置实体类

importlombok.Data;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.cloud.context.config.annotation.RefreshScope;importorg.springframework.stereotype.Component;@Data@Component@RefreshScope@ConfigurationProperties(prefix ="mqtt")@ConditionalOnProperty(name ="mqtt.enable", havingValue ="true")publicclassMqttProperties{/** * 是否启用 */privateboolean enable;/** * Mosquitto 服务地址(非加密端口)。若启用 TLS 加密,使用 ssl://localhost:8883 */privateString broker;/** * 客户端唯一标识(建议加随机数避免冲突) */privateString clientId;/** * 认证用户名(Mosquitto 启用认证时必填) */privateString username;/** * 认证密码 */privateString password;/** * 默认 QoS 等级(0/1/2),非关键数据用 QoS 0,重要状态用 QoS 1,核心控制指令用 QoS 2 */privateint defaultQos;/** * 心跳间隔(秒) */privateint keepAlive =60;}

发布和订阅工具类

消息订阅工具类

importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.stereotype.Component;/** * MQTT 消息订阅工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)publicclassMqttSubscriber{privatefinalMqttClient mqttClient;privatefinalMqttProperties mqttProp;publicMqttSubscriber(MqttClient mqttClient,MqttProperties mqttProp){this.mqttClient = mqttClient;this.mqttProp = mqttProp;}/** * 订阅指定主题 * @param topic 主题(支持通配符,如 sensor/+) */publicvoidsubscribe(String topic)throwsMqttException{subscribe(topic, mqttProp.getDefaultQos());}/** * 订阅指定主题(自定义QoS) * @param topic 主题 * @param qos QoS等级 */publicvoidsubscribe(String topic,int qos)throwsMqttException{if(!mqttClient.isConnected()){ mqttClient.reconnect();} mqttClient.subscribe(topic, qos); log.info("已订阅主题:{},QoS等级:{}", topic, qos);}/** * 取消订阅主题 * @param topic 主题 */publicvoidunsubscribe(String topic)throwsMqttException{ mqttClient.unsubscribe(topic); log.info("已取消订阅主题:{}", topic);}}

消息发布工具类

importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.eclipse.paho.client.mqttv3.MqttMessage;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.stereotype.Component;/** * MQTT 消息发布工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)publicclassMqttPublisher{privatefinalMqttClient mqttClient;privatefinalMqttProperties mqttProp;publicMqttPublisher(MqttClient mqttClient,MqttProperties mqttProp){this.mqttClient = mqttClient;this.mqttProp = mqttProp;}/** * 发布消息到指定主题 * @param topic 主题 * @param content 消息内容 */publicvoidpublish(String topic,String content)throwsMqttException{publish(topic, content, mqttProp.getDefaultQos());}/** * 发布消息到指定主题(自定义QoS) * @param topic 主题 * @param content 消息内容 * @param qos QoS等级 */publicvoidpublish(String topic,String content,int qos)throwsMqttException{if(!mqttClient.isConnected()){ mqttClient.reconnect();// 若断开连接,尝试重连} log.debug("发布消息,主题:{},内容:{}, QoS等级:{}", topic, content, qos);MqttMessage message =newMqttMessage(content.getBytes()); message.setQos(qos); mqttClient.publish(topic, message);}}

测试 MQTT 功能

创建一个测试控制器,验证消息发布和订阅:

importcom.blackcrow.test.mqtt.config.MqttPublisher;importcom.blackcrow.test.mqtt.config.MqttSubscriber;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassMqttTestController{@AutowiredprivateMqttPublisher mqttPublisher;@AutowiredprivateMqttSubscriber mqttSubscriber;@Value("${mqtt.default-topic:topic/temp}")privateString defaultTopic;/** * 订阅主题 */@GetMapping("/subscribe")publicStringsubscribe(@RequestParam(required =false)String topic){try{String targetTopic = topic !=null? topic : defaultTopic; mqttSubscriber.subscribe(targetTopic);return"订阅成功:"+ targetTopic;}catch(MqttException e){return"订阅失败:"+ e.getMessage();}}/** * 发布消息 */@GetMapping("/publish")publicStringpublish(@RequestParam(required =false)String topic,@RequestParamString message){try{String targetTopic = topic !=null? topic : defaultTopic; mqttPublisher.publish(targetTopic, message);return"发布成功:主题="+ targetTopic +",消息="+ message;}catch(MqttException e){return"发布失败:"+ e.getMessage();}}}

Read more

MySQL:事务的理解

MySQL:事务的理解

一、CURD不加控制,会有什么问题  (1)因为,MySQL里面存的是数据,所以很有可能会被多个客户访问,所以mysqld可能一次会接受到多个关于CURD的请求。(2)且mysql内部是采用多线程来完成数据存储等相关工作的,所以必然会存在对数据并发访问的场景      ——>会导致一些多请求并发可能产生的异常结果        比如同行转账,按道理是我减100,你加100,但是因为我是同行所以用的是一张数据库的表,可能我减100的时候还没做完网络或者数据库出问题等其他原因导致没有给你加100,那么整个操作就会出现一个中间过程(我减了但是你没有加),这就有问题,在这种情况下我们允许异常产生,一旦操作没有完成我们应该把减掉的100再加回来,就好像什么都没做,等待下次合适的时候再去转账。这就相当于转账之后不要有中间过程,而是在转的时候一旦出现异常就直接进行回滚,因为不回滚的话就会有问题,必须得回滚保证和初始的状态一样,这就叫我们的回滚操作。在高并发的场景下数据或多或少都会出现这样的问题,所以这也就要求mysql必须要有针对这类问题的解决方案。 二、CURD满足什么属性,能解决上述

By Ne0inhk
华为OD机试双机位C卷:采购订单 (Py/Java/C/C++/Js/Go)

华为OD机试双机位C卷:采购订单 (Py/Java/C/C++/Js/Go)

采购订单 华为OD机试双机位C卷 - 华为OD上机考试双机位C卷 100分题型 华为OD机试双机位c卷真题目录点击查看: 华为OD机试双机位C卷真题题库目录|机考题库 + 算法考点详解 题目描述 在一个采购系统中,采购申请(PR)需要经过审批后才能生成采购订单(PO)。每个PR包含商品的单价(假设相同商品的单价一定是一样的)及数量信息。系统要求对商品进行分类处理:单价高于100元的商品需要单独处理,单价低于或等于100元的相同商品可以合并到同一采购订单PO中。针对单价低于100的小额订单,如果量大可以打折购买。 具体规则如下: 如果PR状态为"审批通过",则将其商品加入到PO中。如果PR的状态为"审批拒绝"或"待审批",则忽略改PR。 对于单价高于100元的商品,每个商品单独生成一条PO记录。对于单价低于100元的商品,将相同商品的数量合并到一条PO记录中。 如果商品单价<100且商品数量>=100,则单价打9折。 输入描述 第一行包含整数N,

By Ne0inhk
本文介绍如何利用Trae国际版的Agent Skill功能大幅提升Java后端开发效率,特别针对Spring Cloud微服务架构,包含完整的实战案例、代码示例和最佳实践。

本文介绍如何利用Trae国际版的Agent Skill功能大幅提升Java后端开发效率,特别针对Spring Cloud微服务架构,包含完整的实战案例、代码示例和最佳实践。

如何在Trae国际版中使用Agent Skill提升Java后端开发效率 引言 对于Java后端开发者,尤其是Spring Cloud微服务架构的使用者来说,日常工作中充满了重复的样板代码编写、繁琐的配置管理和复杂的调试工作。Trae国际版的Agent Skill功能就像是为Java开发者量身打造的"瑞士军刀",能够自动化这些重复劳动,让我们专注于更有创造性的架构设计和业务逻辑实现。 本文将结合Java后端开发的实际场景,特别是Spring Cloud微服务架构,详细介绍如何使用Trae国际版的Agent Skill大幅提升开发效率。 一、Trae国际版Agent Skill简介 1.1 什么是Agent Skill Agent Skill是Trae国际版中一种模块化的AI能力扩展机制,每个Skill都是一个专注于特定领域的"智能助手"。对于Java开发者来说,这些Skill可以理解为精通Java生态的"虚拟专家",能够处理从代码生成到架构设计的各种任务。 1.2 适合Java开发者的核心Skill * Spring Boot代码生成器:快速生成符合最佳实践的Sp

By Ne0inhk
【抽奖系统开发实战】Spring Boot 项目的用户模块设计:注册登录、权限管控与敏感数据加密

【抽奖系统开发实战】Spring Boot 项目的用户模块设计:注册登录、权限管控与敏感数据加密

文章目录 * 一、注册 * 1.1 敏感字段加密 * 1.2 用户注册 * 1.3 TypeHandler * 二、控制层通用异常处理 * 三、登录 * 3.1 发送验证码 * 3.2 Redis的配置与使用 * > 核心工具类`RedisUtil` * 3.3 JWT * > JWT 令牌介绍 * > 核心工具类`JWTUtil` * 3.4 管理员登录 * 四、强制登录 * 4.1 前端处理 * 4.2 后端处理 * 五、用户管理 * 5.1 后台管理页面

By Ne0inhk