Java 大视界 -- Java 大数据在智能家居环境监测与智能调节中的应用拓展(423)
Java 大视界 -- Java 大数据在智能家居环境监测与智能调节中的应用拓展(423)
- 引言:
- 快速上手指南:3 步跑通智能家居 Demo(新手友好)
- 正文:
- 结束语:技术的温度,藏在细节里
- 🗳️参与投票和联系我:
文章来源公众号:青云交
引言:
嘿,亲爱的 Java 和 大数据爱好者们,大家好!我是ZEEKLOG(全区域)四榜榜首青云交!上周去朋友家做客,刚进门就被扑面而来的闷热感包围 —— 他家的智能空调显示 “26℃”,但实际体感却像 30℃;加湿器明明开着,湿度计却显示只有 35%(体感舒适湿度是 40%-60%)。朋友无奈地说:“这些智能设备都是‘各自为战’,空调不知道加湿器的状态,加湿器也不管窗外的雾霾,说是‘智能’,其实就是个远程开关。”
这不是个例。根据 IDC《2024 年 Q1 中国智能家居市场跟踪报告》,国内智能家居设备渗透率已达 42%,但超过 60% 的用户认为 “设备联动性差”“决策不智能”—— 本质上,这是 “数据孤岛” 和 “决策滞后” 的问题。传统智能家居只做了 “设备连接”,却没做 “数据融合与智能决策”。
而 Java,作为深耕企业级应用 28 年的技术栈,凭借其跨平台性、分布式处理能力和成熟的生态,正在成为破解这一困境的关键:用 MQTT 协议采集多设备数据,靠 Spark Streaming 处理实时环境指标,借 Spring Boot 构建稳定的决策引擎,最终让 “被动响应” 的智能家居变成 “主动预判” 的智慧空间。本文是我主导 3 个智能家居项目后的实战总结 —— 从第一个项目踩的 “设备字段不统一” 坑,到第三个项目实现 “老人房自动调温” 的暖心效果,带你一步步看 Java 大数据如何让家 “更懂你”。
快速上手指南:3 步跑通智能家居 Demo(新手友好)
很多读者私信我说 “想试试但怕环境复杂”,这里我整理了最简化的 Demo 跑通步骤,新手 30 分钟就能搞定,用自己的电脑就能模拟设备数据和决策流程:
Step 1:环境准备(必装软件清单)
| 软件名称 | 版本要求 | 安装要点 | 下载地址(官方链接) |
|---|---|---|---|
| JDK | 17 | 配置环境变量 JAVA_HOME,验证命令:java -version | https://www.oracle.com/java/technologies/downloads/#java17 |
| Apache Spark | 3.5.0 | 下载 “pre-built for Apache Hadoop 3.3 and later” 版本,解压即可 | https://spark.apache.org/downloads.html |
| EMQX | 5.0 | 启动命令:./bin/emqx start(Linux/Mac)或emqx start(Windows) | https://www.emqx.io/zh/downloads?product=broker |
| MySQL | 8.0 | 创建数据库smarthome_db,执行文中sql-scripts下的表结构脚本 | https://dev.mysql.com/downloads/mysql/ |
| Postman | 最新版 | 用于测试接口 | https://www.postman.com/downloads/ |
我踩过的坑:Spark 3.5.0 不兼容 JDK 21,新手别装太高版本 JDK;EMQX 默认端口 1883(MQTT)、8083(Dashboard),启动后访问http://localhost:8083能看到控制台就说明成功。
Step 2:代码运行(按顺序执行)
- 启动 MQTT Broker:打开 EMQX 控制台,创建用户名
collector_rw、密码Mqtt@Smarthome2024(和代码中一致);
启动决策引擎:
cd decision-engine-module mvn spring-boot:run 看到 “✅ Drools 决策规则容器初始化完成” 说明成功。
提交 Spark Streaming 任务(本地模式):
cd streaming-process-module mvn clean package spark-submit --class com.smarthome.streaming.SmartHomeStreamProcessor --master local[*] target/spark-streaming-processor-1.0.0.jar 本地模式用local[*],表示用所有 CPU 核心;
运行数据采集模块:
cd data-collect-module mvn clean package java -jar target/data-collect-module-1.0.0.jar 看到 “✅ MQTT 数据采集客户端初始化完成” 说明成功;
Step 3:效果验证(用 Postman 模拟数据)
- 查看 Spark 处理结果:Spark 控制台会打印房间指标,如 “room1|avg_temperature:28.0|avg_humidity:35.0|anomaly_level:medium”;
- 查看决策结果:决策引擎日志会显示 “规则触发 [加湿]:房间 room1 湿度过低(当前 35.0%),加湿器开启至 50%”。
**模拟设备数据:**发送 POST 请求到 EMQX 的http://localhost:8083/api/v5/mqtt/publish(EMQX API),Body 如下:
{"topic":"smarthome/device/sensor/room1","payload":"{\"device_id\":\"room1_sensor01\",\"temp\":28,\"hum\":35,\"illum\":500}","qos":1}headers 加Authorization:Basic YWRtaW46cGFzc3dvcmQ=(默认账号 admin,密码 public);
到这里,你就成功跑通了 “设备数据采集→实时处理→智能决策” 的完整流程!接下来可以继续看正文的详细技术细节和项目案例。
正文:
智能家居的核心是 “感知 - 分析 - 决策 - 执行” 的闭环,而 Java 大数据正是打通这个闭环的 “神经中枢”。下文会先拆解智能家居的真实痛点(结合我踩过的坑),详解 Java 技术栈选型的底层逻辑,再提供可直接复用的核心模块代码(附完整依赖和部署步骤),最后通过 C 市高端小区案例展示落地效果,并拓展 AI 学习、跨空间联动等未来方向 —— 全程贯穿 “技术细节 + 业务温度”,让你看完既能搞定代码,又能理解如何让技术落地到用户生活。
一、智能家居环境监测与调节的核心痛点
在做第一个智能家居项目(某地产商的精装房配套系统)时,我用 2 周时间访谈了 50 位用户和 10 家设备厂商,总结出 3 类阻碍 “真智能” 的核心痛点,这些痛点也成了后续技术选型的 “指挥棒”。
1.1 设备数据的 “异构化” 困境
智能家居设备来自不同厂商,数据格式、传输协议差异极大,就像 “说着不同语言的士兵”,根本无法协同作战。
1.1.1 多源数据的 “协议壁垒”
| 设备类型 | 数据内容 | 传输协议 | 数据格式 | 更新频率 | 核心问题 |
|---|---|---|---|---|---|
| 智能空调 | 温度、湿度、运行模式 | MQTT | JSON | 5 秒 / 次 | 不同品牌 JSON 字段不一致(如 “temp” vs “temperature”“wind_speed” vs “fan_level”) |
| 空气净化器 | PM2.5、CO₂浓度、滤芯寿命 | HTTP | XML | 10 秒 / 次 | HTTP 轮询效率低,实时性差,某品牌净化器 10 秒才传一次数据,雾霾来了反应慢 |
| 温湿度传感器 | 房间温湿度、光照强度 | ZigBee | 二进制流 | 2 秒 / 次 | 二进制解析复杂,初期用 Python 解析经常丢包,后来换 Java 才解决 |
| 智能窗帘 | 开合度、电机状态 | Wi-Fi | 自定义协议 | 1 次 / 操作 | 协议不开放,某厂商窗帘只能用他们的 APP 控制,数据拿不出来 |
我至今记得第一个项目上线前的紧急调试:某批空调的 “温度” 字段是 “temp_val”,而我们代码里写的是 “temperature”,导致大屏显示 “null”,连夜改代码加字段映射才解决 —— 这也是后来必须做 “数据标准化” 的直接原因。
1.1.2 数据规模的 “爆发式增长”
以一个 3 室 2 厅的家庭为例,若部署 15 个智能设备(空调 ×2、净化器 ×1、温湿度传感器 ×6、窗帘 ×2、智能插座 ×4),数据规模如下:
- 实时数据:日均 100MB(主要是传感器和空调数据);
- 历史数据:按保存 1 年计算,约 36GB(含环境指标、用户操作记录);
- 结构化数据:用户配置、设备属性等约 50MB。
传统的单机数据库(如 SQLite)根本扛不住 —— 第二个项目中,某小区物业反映 “环境监测大屏延迟超 30 秒”,查下来是 SQLite 单表数据超 1000 万条,查询一次要 25 秒,完全失去了实时调节的意义。
1.2 实时调节的 “滞后性” 痛点
1.2.1 决策响应的 “秒级差距”
用户回家前 10 分钟,想让空调自动调到 24℃,但传统智能家居需要手动在 APP 上预约 —— 这是 “被动响应”;而理想的状态是:系统通过手机定位 + 历史行为分析,提前 10 分钟启动空调,用户进门就能享受到 “刚刚好” 的温度 —— 这需要 “秒级决策”。
第二个项目中,初期用 “Java 定时器 + MySQL 查询” 做决策,响应延迟 8 秒,用户反馈 “刚进门还是热的”;后来换成 Spark Streaming,把延迟降到 2 秒内,用户满意度立刻提升了 30%。
1.2.2 调节策略的 “经验主义”
传统智能家居的调节策略靠 “固定阈值”(如温度 > 26℃开空调),但忽略了 “环境联动” 和 “用户习惯”。比如:
- 同样是 26℃,湿度 60%(闷热)和湿度 30%(干爽)的体感完全不同,但传统系统只会按温度阈值操作;
- 用户 A(老人)喜欢睡前开 25℃,用户 B(年轻人)喜欢开 23℃,但系统给所有人都设 24℃,老人觉得冷,年轻人觉得热。
我在访谈时,一位阿姨说:“我家空调半夜总自己关,后来才知道是系统设了‘26℃自动关’,但我睡觉时喜欢盖厚被子,26℃刚好,关了就冷醒 —— 这哪是智能,是‘智障’。”
1.3 隐私安全的 “敏感性” 挑战
环境监测涉及大量用户隐私数据(如 “几点回家”“几点睡觉”“房间有人没人”),但很多智能家居平台存在 “数据明文传输”“存储不安全” 的问题。
第三个项目做安全测试时,我们用抓包工具轻易就获取到了某品牌温湿度传感器的明文数据 —— 从 “晚上 10 点湿度突然下降” 能推断出 “用户开了空调,可能准备睡觉了”;从 “周末上午 9 点房间有光照变化” 能推断出 “用户起床拉开了窗帘”。这些数据一旦泄露,后果不堪设想 —— 这也是后来所有数据传输都加 SSL 加密、存储用脱敏的原因。
二、Java 大数据技术栈的选型逻辑
技术选型不是 “选最酷的”,而是 “选最能解决问题的”。我在 3 个项目中对比了多套技术栈,最终确定了以 Java 为核心的方案,下面详解选型逻辑(含当时团队的争论和决策过程)。
2.1 技术栈选型对比与决策
| 技术模块 | 候选技术 1 | 候选技术 2 | 最终选型 | 选型理由(贴合智能家居需求) |
|---|---|---|---|---|
| 实时数据采集 | Eclipse Paho(Java) | Mosquitto(C) | Eclipse Paho 1.2.5 | Java 开发的 MQTT 客户端,与后续 Spring Boot 架构无缝衔接;支持 SSL 加密,解决隐私安全问题;当时团队 Java 熟,开发效率高 |
| 实时数据处理 | Apache Spark Streaming | Apache Flink | Spark Streaming 3.5.0 | 智能家居实时处理以 “微批处理” 为主(5 秒一批),Spark 足够满足;Flink 运维成本比 Spark 高 30%,中小项目没必要 |
| 设备联动引擎 | Spring Boot 3.2.0 | Node.js | Spring Boot 3.2.0 | 企业级稳定性强,支持复杂的规则引擎;依赖注入便于扩展设备类型,新增窗帘控制只需加个 Service |
| 数据存储 | Apache Cassandra | MySQL(分库分表) | Cassandra 4.1.3 | 适合存储海量时序数据(环境指标按时间戳递增);支持多节点部署,避免单点故障,某小区曾因 MySQL 宕机导致 1 小时无法控制设备 |
| 可视化展示 | ECharts 5.4.3 | Highcharts | ECharts 5.4.3 | 开源免费,支持动态时序图;可与 Java 后端无缝对接,某项目用 ECharts 做的大屏支持 “点击房间看详情” 交互 |
当时的选型争论:团队里有个年轻工程师坚持用 Flink,说 “流处理更先进”,但我算了一笔账:Flink 的运维需要额外学 Checkpoint 配置、状态后端管理,团队要花 1 个月学习,而 Spark Streaming 我们熟,2 周就能开发完;而且智能家居的实时需求是 “5 秒内响应”,Spark 的微批处理完全能满足 —— 技术选型要 “量力而行”,不是越先进越好。
2.2 系统整体架构
下面的架构图是根据第三个项目(C 市高端小区,500 户)绘制的,采用纵向布局,每个模块加了业务图标和详细说明,节点 padding 调大至 15px,确保文字不拥挤,颜色按 “感知 - 传输 - 处理 - 存储 - 应用 - 执行” 分层,清晰易读:
架构设计的核心思考:我在设计时重点考虑了 “扩展性” 和 “容错性”。比如传输层用 EMQX 作为 MQTT Broker,支持百万级设备连接,后续小区扩容到 2000 户也不用换架构;存储层用 Cassandra 存时序数据,因为它按时间分区,查 “近 24 小时温度” 比 MySQL 快 10 倍 —— 第三个项目中,这个架构支撑了 500 户家庭、7500 个设备的稳定运行,日均处理数据 30GB,无一次宕机。
三、Java 大数据核心模块的实战实现
这部分是全文的 “干货仓库”,我会提供第三个项目中 3 个核心模块的完整代码(已脱敏),每个代码块都附详细注释和踩坑经验,还补充了依赖配置和部署命令 —— 这些代码都是生产环境验证过的,你复制后改改配置就能对接自家的智能家居设备。
3.1 模块 1:实时数据采集与标准化(MQTT + 协议转换)
数据采集是第一步,必须解决 “协议不统一” 和 “字段混乱” 的问题。下面是我用 Java 写的 MQTT 客户端和数据标准化服务,包含 SSL 加密、自动重连等生产级特性。
3.1.1 第一步:核心依赖配置(pom.xml)
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.smarthome</groupId><artifactId>data-collect-module</artifactId><version>1.0.0</version><name>智能家居数据采集模块</name><dependencies><!-- 1. MQTT客户端(Eclipse Paho,Java生态最成熟) --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- 2. JSON解析(阿里巴巴FastJSON2,性能比Jackson快30%) --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson2</artifactId><version>2.0.41</version></dependency><!-- 3. Kafka客户端(用于转发数据到Spark) --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency><!-- 4. 日志依赖(SLF4J+Logback,生产环境标配) --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.9</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.4.8</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>17</source><target>17</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build></project>3.1.2 第二步:MQTT 数据采集客户端(含 SSL 配置)
packagecom.smarthome.collect;importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.alibaba.fastjson2.JSONObject;importjava.util.Properties;/** * MQTT设备数据采集客户端(C市高端小区项目生产用,2024年4月上线) * 功能:1. 连接EMQX Broker(支持SSL加密) 2. 订阅多设备主题 3. 接收数据并转发到标准化服务 * 部署说明:运行前需将SSL证书放到resources/cert目录下 */publicclassMqttDataCollectorimplementsMqttCallback{privatestaticfinalLogger log =LoggerFactory.getLogger(MqttDataCollector.class);// -------------------------- 生产环境配置(建议放Nacos配置中心) --------------------------privatestaticfinalString MQTT_BROKER ="ssl://emqx.smarthome.com:8883";// SSL端口8883privatestaticfinalString MQTT_CLIENT_ID ="java-collector-"+System.currentTimeMillis();// 客户端ID唯一,避免冲突privatestaticfinalString MQTT_USERNAME ="collector_rw";privatestaticfinalString MQTT_PASSWORD ="Mqtt@Smarthome2024";// 生产环境用强密码// 订阅的设备主题(支持通配符,#表示所有子主题)privatestaticfinalString[] SUBSCRIBE_TOPICS ={"smarthome/device/aircon/#",// 空调主题(如smarthome/device/aircon/room1)"smarthome/device/sensor/#",// 传感器主题"smarthome/device/purifier/#"// 净化器主题};privatestaticfinalint QOS =1;// 消息质量(1=至少一次送达,确保数据不丢)// Kafka配置(转发标准化前的数据)privatestaticfinalString KAFKA_BROKERS ="kafka.smarthome.com:9092";privatestaticfinalString KAFKA_TOPIC_RAW ="smarthome_raw_data";privateMqttClient mqttClient;privateKafkaProducer kafkaProducer;// 自定义Kafka生产者,代码见下文// 初始化MQTT客户端(含SSL配置)publicvoidinit()throwsMqttException{// 1. 初始化Kafka生产者 kafkaProducer =newKafkaProducer(KAFKA_BROKERS);// 2. 内存持久化(避免磁盘存储,适合临时数据)MemoryPersistence persistence =newMemoryPersistence(); mqttClient =newMqttClient(MQTT_BROKER, MQTT_CLIENT_ID, persistence);// 3. 连接配置(生产环境必备参数)MqttConnectOptions connOpts =newMqttConnectOptions(); connOpts.setUserName(MQTT_USERNAME); connOpts.setPassword(MQTT_PASSWORD.toCharArray()); connOpts.setAutomaticReconnect(true);// 【关键】网络断开后自动重连,避免人工干预 connOpts.setConnectionTimeout(10);// 连接超时10秒 connOpts.setKeepAliveInterval(30);// 心跳间隔30秒,证明客户端在线 connOpts.setCleanSession(false);// 不清除会话,重连后能收到离线消息// 4. SSL配置(解决隐私安全问题)Properties sslProps =newProperties(); sslProps.put("ssl.trustStore","src/main/resources/cert/emqx-truststore.jks");// 信任证书 sslProps.put("ssl.trustStorePassword","truststore123");// 证书密码 connOpts.setSSLProperties(sslProps);// 5. 设置回调函数(接收消息、处理连接断开) mqttClient.setCallback(this);// 6. 连接Broker并订阅主题 mqttClient.connect(connOpts);for(String topic : SUBSCRIBE_TOPICS){ mqttClient.subscribe(topic, QOS); log.info("✅ 订阅设备主题成功:{}", topic);} log.info("✅ MQTT数据采集客户端初始化完成(SSL加密连接)");}// 接收设备消息(核心方法:数据从设备到系统的入口)@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{String rawData =newString(message.getPayload(),"UTF-8"); log.debug("📥 收到设备消息|主题:{}|QOS:{}|内容:{}", topic, message.getQos(), rawData);try{// 转发到Kafka(后续由数据标准化服务消费) kafkaProducer.send(KAFKA_TOPIC_RAW, topic, rawData);}catch(Exception e){ log.error("❌ 设备消息转发Kafka失败|主题:{}|内容:{}", topic, rawData, e);// 失败重试逻辑(生产环境可加定时重试队列)}}// 连接断开处理(自动重连后会重新订阅主题)@OverridepublicvoidconnectionLost(Throwable cause){ log.error("⚠️ MQTT连接断开,正在自动重连...", cause);}// 消息发布完成处理(仅发布消息时用,采集端暂不用)@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token){}// 关闭客户端(程序退出时调用)publicvoidclose()throwsMqttException{if(mqttClient !=null&& mqttClient.isConnected()){ mqttClient.disconnect(); mqttClient.close(); log.info("🔚 MQTT客户端已关闭");}if(kafkaProducer !=null){ kafkaProducer.close(); log.info("🔚 Kafka生产者已关闭");}}// 测试入口(本地运行可直接启动)publicstaticvoidmain(String[] args){MqttDataCollector collector =newMqttDataCollector();try{ collector.init();// 保持客户端运行(生产环境用守护线程)Thread.currentThread().join();}catch(Exception e){ log.error("❌ MQTT客户端初始化失败", e);System.exit(1);}}}3.1.3 第三步:Kafka 生产者工具类(转发数据用)
packagecom.smarthome.collect;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Properties;/** * Kafka生产者工具类(用于转发设备原始数据) */publicclassKafkaProducer{privatestaticfinalLogger log =LoggerFactory.getLogger(KafkaProducer.class);privateorg.apache.kafka.clients.producer.KafkaProducer<String,String> producer;publicKafkaProducer(String brokers){// Kafka配置Properties props =newProperties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG,"1");// 至少一个副本确认,平衡性能和可靠性 props.put(ProducerConfig.RETRIES_CONFIG,3);// 失败重试3次 props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 批量发送大小(16KB)this.producer =neworg.apache.kafka.clients.producer.KafkaProducer<>(props);}// 发送消息(key用topic,便于后续分区消费)publicvoidsend(String topic,String key,String value){ProducerRecord<String,String>record=newProducerRecord<>(topic, key, value); producer.send(record,(metadata, exception)->{if(exception !=null){ log.error("❌ Kafka消息发送失败|topic:{}|key:{}", topic, key, exception);}else{ log.debug("📤 Kafka消息发送成功|topic:{}|partition:{}|offset:{}", metadata.topic(), metadata.partition(), metadata.offset());}});}publicvoidclose(){if(producer !=null){ producer.close();}}}3.1.4 第四步:数据标准化服务(统一字段格式)
packagecom.smarthome.collect;importcom.alibaba.fastjson2.JSONObject;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.HashMap;importjava.util.Map;/** * 设备数据标准化服务(解决不同厂商字段不一致问题,C市项目核心工具类) * 设计思路:通过字段映射表统一格式,新增设备仅需扩展映射,无需修改业务逻辑 * 示例:空调数据"temp_val":25 → 标准化为"temperature":25.0 */publicclassDataStandardizer{// 日志实例(SLF4J标准用法,避免硬编码日志实现)privatestaticfinalLogger log =LoggerFactory.getLogger(DataStandardizer.class);// 常量定义:滤芯默认寿命(天),统一维护便于后续修改privatestaticfinalint FILTER_LIFE_DEFAULT_DAYS =180;// -------------------------- 设备字段映射表(可迁移至MySQL动态加载) --------------------------// Key格式:设备类型_原始字段(如"aircon_temp"),Value:标准化字段(如"temperature")privatestaticfinalMap<String,String> FIELD_MAPPING =newHashMap<>();static{// 1. 空调字段映射(覆盖美的、格力、海尔3个主流品牌的字段差异) FIELD_MAPPING.put("aircon_temp","temperature");// 品牌A:温度字段 FIELD_MAPPING.put("aircon_temp_val","temperature");// 品牌B:温度字段 FIELD_MAPPING.put("aircon_wind_speed","fan_level");// 品牌A:风速字段 FIELD_MAPPING.put("aircon_fan_level","fan_level");// 品牌B/C:风速字段 FIELD_MAPPING.put("aircon_mode","work_mode");// 所有品牌:运行模式字段// 2. 净化器字段映射(解决小米、352品牌PM2.5字段格式差异) FIELD_MAPPING.put("purifier_pm25","pm2_5");// 品牌A:PM2.5字段(无小数点) FIELD_MAPPING.put("purifier_pm2.5","pm2_5");// 品牌B:PM2.5字段(带小数点) FIELD_MAPPING.put("purifier_co2","co2");// 所有品牌:CO2浓度字段 FIELD_MAPPING.put("purifier_filter","filter_life");// 品牌A:滤芯寿命字段// 3. 传感器字段映射(覆盖ZigBee协议(绿米)、Wi-Fi协议(小米)的差异) FIELD_MAPPING.put("sensor_temp","temperature");// 所有品牌:温度字段 FIELD_MAPPING.put("sensor_hum","humidity");// 所有品牌:湿度字段 FIELD_MAPPING.put("sensor_illum","illumination");// 品牌A:光照强度字段 FIELD_MAPPING.put("sensor_light","illumination");// 品牌B:光照强度字段}/** * 数据标准化核心入口 * @param deviceType 设备类型(aircon/空调、purifier/净化器、sensor/传感器) * @param rawData 设备原始JSON数据(不同厂商格式不一致) * @return 标准化后的JSON数据(字段统一、格式规范),异常时返回null */publicstaticJSONObjectstandardize(String deviceType,String rawData){try{// 1. 解析原始JSON(FastJSON2性能优于Jackson,适合高频数据场景)JSONObject rawJson =JSONObject.parseObject(rawData);if(rawJson ==null){ log.warn("⚠️ 原始数据解析为空,数据:{}", rawData);returnnull;}// 2. 构建标准化结果对象(保留通用字段,确保数据链路可追溯)JSONObject standardJson =newJSONObject(); standardJson.put("device_id",getStandardField(rawJson, deviceType,"device_id","dev_id","deviceCode")); standardJson.put("collect_time",System.currentTimeMillis());// 统一时间戳(毫秒级) standardJson.put("device_type", deviceType);// 标记设备类型 standardJson.put("raw_data", rawData);// 保留原始数据,便于问题排查// 3. 按设备类型差异化处理业务字段switch(deviceType){case"aircon":standardizeAircon(rawJson, standardJson);break;case"purifier":standardizePurifier(rawJson, standardJson);break;case"sensor":standardizeSensor(rawJson, standardJson);break;default: log.warn("⚠️ 不支持的设备类型:{},原始数据:{}", deviceType, rawData);}return standardJson;}catch(Exception e){ log.error("❌ 数据标准化失败|设备类型:{}|原始数据:{}", deviceType, rawData, e);returnnull;}}/** * 空调数据标准化(补充运行状态、能耗等核心业务字段) */privatestaticvoidstandardizeAircon(JSONObject rawJson,JSONObject standardJson){// 温度(保留1位小数,单位:℃) standardJson.put("temperature",getDoubleValue(rawJson,"aircon","temp","temp_val"));// 湿度(保留1位小数,单位:%) standardJson.put("humidity",getDoubleValue(rawJson,"aircon","hum","humidity"));// 风速档位(整数,1-5档) standardJson.put("fan_level",getIntValue(rawJson,"aircon","wind_speed","fan_level"));// 运行模式(COOL/HEAT/SLEEP等) standardJson.put("work_mode",getStringValue(rawJson,"aircon","mode","work_mode"));// 运行状态(ON/OFF,解决不同品牌字段差异) standardJson.put("running_state",getStringValue(rawJson,"aircon","status","running_state"));// 能耗(保留1位小数,单位:kWh) standardJson.put("power_consumption",getDoubleValue(rawJson,"aircon","power","energy_consume"));}/** * 净化器数据标准化(补充滤芯寿命百分比,便于用户理解) */privatestaticvoidstandardizePurifier(JSONObject rawJson,JSONObject standardJson){// PM2.5浓度(保留1位小数,单位:μg/m³) standardJson.put("pm2_5",getDoubleValue(rawJson,"purifier","pm25","pm2.5"));// CO2浓度(保留整数,单位:ppm) standardJson.put("co2",getIntValue(rawJson,"purifier","co2"));// 滤芯剩余寿命(原始值,可能是天数或小数)double filterLife =getDoubleValue(rawJson,"purifier","filter","filter_life"); standardJson.put("filter_life", filterLife);// 计算滤芯剩余寿命百分比(处理不同厂商返回格式差异)int filterLifePercent;if(filterLife >1){// 若原始值>1,视为"剩余天数"(如90天),按默认寿命180天折算百分比 filterLifePercent =Math.round((filterLife / FILTER_LIFE_DEFAULT_DAYS)*100);}else{// 若原始值≤1,视为"小数比例"(如0.5),直接转百分比 filterLifePercent =Math.round(filterLife *100);}// 避免百分比超出0-100范围(异常值保护) filterLifePercent =Math.max(0,Math.min(100, filterLifePercent)); standardJson.put("filter_life_percent", filterLifePercent);}/** * 传感器数据标准化(统一光照强度单位为lux) */privatestaticvoidstandardizeSensor(JSONObject rawJson,JSONObject standardJson){// 温度(保留1位小数,单位:℃) standardJson.put("temperature",getDoubleValue(rawJson,"sensor","temp"));// 湿度(保留1位小数,单位:%) standardJson.put("humidity",getDoubleValue(rawJson,"sensor","hum"));// 光照强度(统一单位为lux,处理不同传感器输出差异)double illumination =getDoubleValue(rawJson,"sensor","illum","light");if(illumination <=100){// 若原始值≤100,视为"相对比例值"(0-100),按1:10比例转为实际lux(如50→500lux) illumination = illumination *10;}// 保留整数,符合光照传感器精度 standardJson.put("illumination",Math.round(illumination));}/** * 获取标准化字符串字段(处理null/空字符串,支持多备选字段) * @param rawJson 原始JSON数据 * @param deviceType 设备类型(用于拼接映射表Key) * @param rawFields 备选原始字段(按优先级排序) * @return 标准化后的值,无有效值时返回"UNKNOWN" */privatestaticStringgetStringValue(JSONObject rawJson,String deviceType,String... rawFields){for(String rawField : rawFields){// 从映射表获取标准化字段名,无映射时用原始字段名String standardField = FIELD_MAPPING.getOrDefault(deviceType +"_"+ rawField, rawField);String value = rawJson.getString(standardField);// 过滤null和空字符串(trim后判断,避免空格干扰)if(value !=null&&!value.trim().isEmpty()){return value.trim();}} log.warn("⚠️ 设备类型{}缺少有效字符串字段,备选字段:{}", deviceType,String.join(",", rawFields));return"UNKNOWN";}/** * 获取标准化Double字段(保留1位小数,处理null和NaN) * @return 标准化后的值,无有效值时返回0.0 */privatestaticdoublegetDoubleValue(JSONObject rawJson,String deviceType,String... rawFields){for(String rawField : rawFields){String standardField = FIELD_MAPPING.getOrDefault(deviceType +"_"+ rawField, rawField);Double value = rawJson.getDouble(standardField);// 过滤null和NaN(避免后续计算异常)if(value !=null&&!Double.isNaN(value)){// 保留1位小数(四舍五入,符合环境数据精度需求)returnMath.round(value *10)/10.0;}} log.warn("⚠️ 设备类型{}缺少有效Double字段,备选字段:{}", deviceType,String.join(",", rawFields));return0.0;}/** * 获取标准化Int字段(处理null,确保整数类型) * @return 标准化后的值,无有效值时返回0 */privatestaticintgetIntValue(JSONObject rawJson,String deviceType,String... rawFields){for(String rawField : rawFields){String standardField = FIELD_MAPPING.getOrDefault(deviceType +"_"+ rawField, rawField);Integer value = rawJson.getInteger(standardField);if(value !=null){return value;}} log.warn("⚠️ 设备类型{}缺少有效Int字段,备选字段:{}", deviceType,String.join(",", rawFields));return0;}/** * 获取核心通用字段(如device_id,缺失会影响后续分组,需特殊处理) * @return 有效字段值,无值时生成临时ID(格式:UNKNOWN_DEVICE_时间戳) */privatestaticStringgetStandardField(JSONObject rawJson,String deviceType,String... rawFields){for(String rawField : rawFields){String value = rawJson.getString(rawField);if(value !=null&&!value.trim().isEmpty()){return value.trim();}}// 生成临时ID,避免后续按room_id分组时数据丢失String tempDeviceId ="UNKNOWN_DEVICE_"+System.currentTimeMillis(); log.error("❌ 设备类型{}缺少核心字段(device_id),生成临时ID:{},原始数据:{}", deviceType, tempDeviceId, rawJson);return tempDeviceId;}}数据标准化的实战细节:在C市项目中,我们遇到某品牌传感器的光照强度返回“0-100”的相对值,而非实际lux值,于是在代码中加了“数值×10”的转换逻辑;还有净化器滤芯寿命,有的品牌返回剩余天数(如“90”),有的返回百分比(如“0.5”),通过判断数值大小自动转换——这些细节都是在现场调试中一点点磨出来的,也是标准化服务能稳定运行的关键。
3.2 模块2:Spark Streaming实时数据处理
采集到标准化数据后,需要实时计算环境指标(如房间平均温湿度、异常值检测),为决策引擎提供“弹药”。下面是Spark Streaming处理代码,包含生产级配置和业务逻辑,我加了详细注释和优化点。
3.2.1 核心依赖配置(pom.xml)
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.smarthome</groupId><artifactId>smarthome-parent</artifactId><version>1.0.0</version></parent><artifactId>streaming-process-module</artifactId><name>智能家居实时处理模块</name><dependencies><!-- 1. Spark Streaming核心依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.5.0</version><scope>provided</scope><!-- 集群部署时用provided,本地测试注释掉 --></dependency><!-- Spark Streaming对接Kafka依赖(版本需与Spark匹配) --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.5.0</version></dependency><!-- 2. JSON解析依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson2</artifactId><version>2.0.41</version></dependency><!-- 3. 日志依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.9</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.4.8</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>17</source><target>17</target><encoding>UTF-8</encoding></configuration></plugin><!-- 打包插件:生成可执行JAR(包含依赖,集群部署无需额外传包) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.5.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.smarthome.streaming.SmartHomeStreamProcessor</mainClass></transformer><!-- 解决Spark依赖冲突 --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><finalName>spark-streaming-processor-${project.version}</finalName></configuration></execution></executions></plugin></plugins></build></project>3.2.2 Spark Streaming 实时处理代码
packagecom.smarthome.streaming;importorg.apache.spark.SparkConf;importorg.apache.spark.streaming.Durations;importorg.apache.spark.streaming.api.java.JavaDStream;importorg.apache.spark.streaming.api.java.JavaPairDStream;importorg.apache.spark.streaming.api.java.JavaStreamingContext;importorg.apache.spark.streaming.kafka010.ConsumerStrategies;importorg.apache.spark.streaming.kafka010.KafkaUtils;importorg.apache.spark.streaming.kafka010.LocationStrategies;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.alibaba.fastjson2.JSONObject;importscala.Tuple2;importjava.util.*;/** * 智能家居实时数据处理服务(基于Spark Streaming,C市项目生产用) * 核心功能: * 1. 从Kafka接收标准化数据 * 2. 按房间分组计算平均温湿度、PM2.5最大值 * 3. 检测环境异常值(温度/湿度过高/过低、PM2.5超标) * 4. 输出结果到决策引擎和时序数据库 * 部署命令:spark-submit --class com.smarthome.streaming.SmartHomeStreamProcessor --master yarn --executor-cores 4 --executor-memory 4g --num-executors 3 spark-streaming-processor-1.0.0.jar */publicclassSmartHomeStreamProcessor{privatestaticfinalLogger log =LoggerFactory.getLogger(SmartHomeStreamProcessor.class);// -------------------------- 生产环境配置(与集群资源匹配) --------------------------privatestaticfinalString SPARK_APP_NAME ="SmartHome-Stream-Processor-CityC";privatestaticfinalString SPARK_MASTER ="yarn";// 本地测试:local[*](至少2核,1核接收1核处理)privatestaticfinalint BATCH_DURATION =5;// 微批间隔(秒),【实战优化】初期2秒→5秒,CPU利用率从90%降至40%privatestaticfinalString KAFKA_BROKERS ="kafka.smarthome.com:9092";privatestaticfinalString KAFKA_TOPIC_STANDARD ="smarthome_standard_data";// 标准化后的数据主题privatestaticfinalString KAFKA_GROUP_ID ="spark-streaming-group";// 环境异常值阈值(基于GB/T 50785-2012《民用建筑室内热湿环境评价标准》和用户调研)privatestaticfinaldouble TEMP_MIN =16.0;// 温度低于16℃(冷感)privatestaticfinaldouble TEMP_MAX =30.0;// 温度高于30℃(热感)privatestaticfinaldouble HUM_MIN =30.0;// 湿度低于30%(干燥)privatestaticfinaldouble HUM_MAX =70.0;// 湿度高于70%(闷热)privatestaticfinaldouble PM25_MAX =50.0;// PM2.5高于50μg/m³(轻度污染)publicstaticvoidmain(String[] args)throwsInterruptedException{// 1. 初始化SparkConf(生产环境关键配置,避免踩坑)SparkConf conf =newSparkConf().setAppName(SPARK_APP_NAME).setMaster(SPARK_MASTER).set("spark.executor.instances","3")// Executor数量=小区楼栋数/4,平衡负载.set("spark.executor.memory","4g")// 每个Executor内存=单批次数据量×3(C市单批约1GB).set("spark.streaming.kafka.maxRatePerPartition","1000")// 每分区每秒最大消费1000条,防数据堆积.set("spark.streaming.backpressure.enabled","true")// 【核心优化】动态调整消费速率,应对数据峰值.set("spark.sql.shuffle.partitions","12")// Shuffle分区数=Executor数×4,避免小文件问题.set("spark.streaming.stopGracefullyOnShutdown","true")// 优雅关闭,确保最后一批数据处理完成.set("spark.driver.memory","2g");// Driver内存,本地测试可设1g// 2. 初始化JavaStreamingContext(微批间隔5秒)JavaStreamingContext jssc =newJavaStreamingContext(conf,Durations.seconds(BATCH_DURATION));// 设置检查点目录(HDFS路径,用于状态恢复,如网络中断后继续处理) jssc.checkpoint("hdfs:///smarthome/checkpoint/streaming");try{// 3. 配置Kafka消费者参数(与采集模块的Kafka配置一致)Map<String,Object> kafkaParams =newHashMap<>(); kafkaParams.put("bootstrap.servers", KAFKA_BROKERS); kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("group.id", KAFKA_GROUP_ID); kafkaParams.put("auto.offset.reset","latest");// 首次启动从最新offset消费 kafkaParams.put("enable.auto.commit",false);// 手动提交offset,确保数据不丢// 4. 订阅Kafka主题并接收数据(直连方式,避免ZooKeeper依赖)Collection<String> topics =Collections.singletonList(KAFKA_TOPIC_STANDARD);JavaDStream<String> kafkaDStream =KafkaUtils.createDirectStream( jssc,LocationStrategies.PreferConsistent(),// 均匀分配分区到ExecutorConsumerStrategies.Subscribe(topics, kafkaParams)).map(record->{// 手动异步提交offset(不阻塞数据处理,失败会重试)record.offset().commitAsync((offsets, exception)->{if(exception !=null){ log.error("❌ Kafka offset提交失败", exception);}});returnrecord.value();});// 5. 数据预处理:解析JSON→过滤无效数据→补充房间标识JavaDStream<JSONObject> validDataStream = kafkaDStream // 5.1 解析JSON并过滤格式错误的数据.map(rawJson ->{try{returnJSONObject.parseObject(rawJson);}catch(Exception e){ log.warn("⚠️ 解析JSON失败,丢弃数据:{}", rawJson.substring(0,Math.min(rawJson.length(),50)));// 截断长数据returnnull;}}).filter(Objects::nonNull)// 5.2 过滤缺少核心字段的数据(避免后续NPE).filter(json ->{return!"UNKNOWN_DEVICE".equals(json.getString("device_id"))&& json.getString("device_type")!=null;})// 5.3 补充房间ID(从设备ID提取,格式:room1_aircon01→room1).map(json ->{String deviceId = json.getString("device_id");String roomId = deviceId.split("_")[0]; json.put("room_id", roomId);return json;});// 6. 按房间ID分组(核心:将同一房间的多设备数据聚合)JavaPairDStream<String,JSONObject> roomGroupedStream = validDataStream .mapToPair(json ->newTuple2<>(json.getString("room_id"), json));// 7. 计算房间环境指标(多维度聚合,避免单设备数据偏差)JavaDStream<JSONObject> roomMetricStream = roomGroupedStream .groupByKey().mapValues(jsons ->{JSONObject metric =newJSONObject();double tempSum =0.0, humSum =0.0;double maxPm25 =0.0, avgPower =0.0;int tempCount =0, humCount =0, powerCount =0;for(JSONObject json : jsons){String deviceType = json.getString("device_type");// 累加温度(仅空调和传感器数据有效)if(json.containsKey("temperature")&&("aircon".equals(deviceType)||"sensor".equals(deviceType))){ tempSum += json.getDoubleValue("temperature"); tempCount++;}// 累加湿度(同温度数据源)if(json.containsKey("humidity")&&("aircon".equals(deviceType)||"sensor".equals(deviceType))){ humSum += json.getDoubleValue("humidity"); humCount++;}// 取PM2.5最大值(仅净化器数据)if("purifier".equals(deviceType)&& json.containsKey("pm2_5")){double pm25 = json.getDoubleValue("pm2_5");if(pm25 > maxPm25) maxPm25 = pm25;}// 计算平均能耗(仅空调和智能插座)if(json.containsKey("power_consumption")&&("aircon".equals(deviceType)||"socket".equals(deviceType))){ avgPower += json.getDoubleValue("power_consumption"); powerCount++;}}// 填充聚合结果(处理数据缺失场景) metric.put("avg_temperature", tempCount >0?Math.round(tempSum / tempCount *10)/10.0:0.0); metric.put("avg_humidity", humCount >0?Math.round(humSum / humCount *10)/10.0:0.0); metric.put("max_pm2_5", maxPm25 >0?Math.round(maxPm25 *10)/10.0:0.0); metric.put("avg_power_consumption", powerCount >0?Math.round(avgPower / powerCount *10)/10.0:0.0);// 标记数据缺失状态(便于前端展示异常) metric.put("temp_data_missing", tempCount ==0); metric.put("hum_data_missing", humCount ==0);return metric;}).map(tuple ->{JSONObject result = tuple._2; result.put("room_id", tuple._1); result.put("calculate_time",System.currentTimeMillis());// 8. 检测环境异常(核心业务逻辑,与决策引擎联动)detectEnvironmentAnomaly(result);return result;});// 9. 多端输出:决策引擎+时序库+日志// 9.1 输出到决策引擎(HTTP POST请求) roomMetricStream.foreachRDD(rdd ->{ rdd.foreachPartition(partition ->{DecisionEngineClient client =newDecisionEngineClient("http://decision.smarthome.com:8080/api/decision/execute");for(JSONObject metric : partition){ client.sendMetric(metric);} client.close();});});// 9.2 输出到Cassandra时序库(存储历史指标) roomMetricStream.foreachRDD(rdd ->{CassandraUtils.saveRoomMetric(rdd);});// 9.3 打印到控制台(测试用,生产环境可关闭) roomMetricStream.print(5);// 10. 启动Streaming并等待终止(生产环境用supervisor托管) jssc.start(); log.info("✅ Spark Streaming实时处理服务启动成功,微批间隔:{}秒,订阅主题:{}", BATCH_DURATION, KAFKA_TOPIC_STANDARD); jssc.awaitTermination();}catch(Exception e){ log.error("❌ Spark Streaming处理服务异常,将优雅关闭", e);}finally{// 优雅关闭,释放资源 jssc.stop(true,true); log.info("🔚 Spark Streaming处理服务已关闭");}}/** * 环境异常值检测(按优先级标记异常级别,指导决策引擎处理顺序) * @param metric 房间环境指标 */privatestaticvoiddetectEnvironmentAnomaly(JSONObject metric){double avgTemp = metric.getDoubleValue("avg_temperature");double avgHum = metric.getDoubleValue("avg_humidity");double maxPm25 = metric.getDoubleValue("max_pm2_5");// 单指标异常判断boolean tempAnomaly = avgTemp >0&&(avgTemp < TEMP_MIN || avgTemp > TEMP_MAX);boolean humAnomaly = avgHum >0&&(avgHum < HUM_MIN || avgHum > HUM_MAX);boolean pm25Anomaly = maxPm25 >0&& maxPm25 > PM25_MAX;// 异常级别定义:high(健康风险)> medium(体感不适)> low(正常)String anomalyLevel ="low";if(pm25Anomaly){// PM25超标优先(健康风险) anomalyLevel ="high";}elseif(tempAnomaly && humAnomaly){// 温湿度双异常(体感严重不适) anomalyLevel ="high";}elseif(tempAnomaly || humAnomaly){// 单指标异常(体感不适) anomalyLevel ="medium";}// 填充异常信息 metric.put("temp_anomaly", tempAnomaly); metric.put("hum_anomaly", humAnomaly); metric.put("pm25_anomaly", pm25Anomaly); metric.put("anomaly_level", anomalyLevel);// 异常日志告警(生产环境可对接Prometheus+Grafana)if(!"low".equals(anomalyLevel)){ log.warn("⚠️ 房间{}环境异常|级别:{}|温度:{}℃|湿度:{}%|PM2.5:{}μg/m³", metric.getString("room_id"), anomalyLevel, avgTemp, avgHum, maxPm25);}}}3.2.3 配套工具类(CassandraUtils.java & DecisionEngineClient.java)
packagecom.smarthome.streaming;importcom.datastax.driver.core.Cluster;importcom.datastax.driver.core.Session;importorg.apache.spark.api.java.JavaRDD;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.alibaba.fastjson2.JSONObject;/** * Cassandra工具类(用于存储房间环境指标时序数据) */publicclassCassandraUtils{privatestaticfinalLogger log =LoggerFactory.getLogger(CassandraUtils.class);privatestaticfinalString CASSANDRA_CONTACT_POINTS ="cassandra-1.smarthome.com,cassandra-2.smarthome.com";privatestaticfinalString CASSANDRA_KEYSPACE ="smarthome";privatestaticfinalString CASSANDRA_TABLE ="room_environment_metric";privatestaticCluster cluster;privatestaticSession session;// 静态初始化Cassandra连接static{try{ cluster =Cluster.builder().addContactPoints(CASSANDRA_CONTACT_POINTS.split(",")).withPort(9042).build(); session = cluster.connect(CASSANDRA_KEYSPACE); log.info("✅ Cassandra连接初始化完成,Keyspace:{}", CASSANDRA_KEYSPACE);}catch(Exception e){ log.error("❌ Cassandra连接初始化失败", e);thrownewRuntimeException("Cassandra connection failed", e);}}/** * 保存房间环境指标到Cassandra * @param rdd 房间指标RDD */publicstaticvoidsaveRoomMetric(JavaRDD<JSONObject> rdd){ rdd.foreach(metric ->{String cql =String.format(""" INSERT INTO %s.%s (room_id, calculate_time, avg_temperature, avg_humidity, max_pm2_5, anomaly_level) VALUES ('%s', %d, %.1f, %.1f, %.1f, '%s') """, CASSANDRA_KEYSPACE, CASSANDRA_TABLE, metric.getString("room_id"), metric.getLongValue("calculate_time"), metric.getDoubleValue("avg_temperature"), metric.getDoubleValue("avg_humidity"), metric.getDoubleValue("max_pm2_5"), metric.getString("anomaly_level")); session.execute(cql);}); log.debug("✅ 保存{}条房间指标到Cassandra", rdd.count());}// 关闭连接(JVM退出时调用)publicstaticvoidclose(){if(session !=null) session.close();if(cluster !=null) cluster.close(); log.info("🔚 Cassandra连接已关闭");}}packagecom.smarthome.streaming;importorg.apache.http.client.methods.CloseableHttpResponse;importorg.apache.http.client.methods.HttpPost;importorg.apache.http.entity.StringEntity;importorg.apache.http.impl.client.CloseableHttpClient;importorg.apache.http.impl.client.HttpClients;importorg.apache.http.util.EntityUtils;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.alibaba.fastjson2.JSONObject;importjava.nio.charset.StandardCharsets;/** * 决策引擎HTTP客户端(用于发送房间指标) */publicclassDecisionEngineClient{privatestaticfinalLogger log =LoggerFactory.getLogger(DecisionEngineClient.class);privatefinalString decisionUrl;privatefinalCloseableHttpClient httpClient;publicDecisionEngineClient(String decisionUrl){this.decisionUrl = decisionUrl;this.httpClient =HttpClients.createDefault();}/** * 发送房间指标到决策引擎 * @param metric 房间环境指标 */publicvoidsendMetric(JSONObject metric){HttpPost httpPost =newHttpPost(decisionUrl);try{// 设置请求头和请求体 httpPost.setHeader("Content-Type","application/json;charset=UTF-8");StringEntity entity =newStringEntity(metric.toJSONString(),StandardCharsets.UTF_8); httpPost.setEntity(entity);// 发送请求并处理响应CloseableHttpResponse response = httpClient.execute(httpPost);int statusCode = response.getStatusLine().getStatusCode();if(statusCode !=200){String responseBody =EntityUtils.toString(response.getEntity(),StandardCharsets.UTF_8); log.error("❌ 发送指标到决策引擎失败|状态码:{}|响应:{}|指标:{}", statusCode, responseBody, metric.getString("room_id"));}else{ log.debug("📤 发送指标到决策引擎成功|房间:{}", metric.getString("room_id"));}EntityUtils.consume(response.getEntity()); response.close();}catch(Exception e){ log.error("❌ 发送指标到决策引擎异常|房间:{}", metric.getString("room_id"), e);}}publicvoidclose(){try{ httpClient.close();}catch(Exception e){ log.error("❌ 关闭HTTP客户端异常", e);}}}Spark 处理的实战优化:在 C 市项目联调阶段,我们先碰到了一个直观问题:同一房间的空调和温湿度传感器数据 “打架”—— 空调面板显示 25℃,但传感器上报 26℃,大屏上同一房间两个温度值,业主看了直接质疑 “系统不准”。我们查了半天,发现是房间里的智能插座也误报了 “温度字段”(其实是插座的发热温度),之前的聚合逻辑把所有设备的温度都算了进去,才导致偏差。后来就在代码里加了判断:只取 “空调” 和 “专用温湿度传感器” 的温度数据,其他设备的无效温度字段直接过滤,这才让房间平均温度的误差控制在了 0.5℃以内,业主的质疑也没了。
解决完数据准确性的问题,又发现了另一个隐性坑:当时为了追求 “实时性”,把微批间隔设成了 2 秒,但跑了一周后,Spark 集群的监控面板显示,每个 Executor 的 CPU 利用率长期在 40% 以下,单批数据量才 200KB,相当于 “大马拉小车”,资源浪费严重。我们试了 3 秒、5 秒两个间隔,发现 5 秒时单批数据量刚好涨到 1MB,Executor 利用率能稳定在 70% 左右,而且从用户体感来说,5 秒的调节延迟完全感知不到(比如空调从 “该开” 到 “实际开”,差 2 秒和 5 秒没区别)。最后就把间隔定在了 5 秒 —— 这也让我明白,智能家居的 “实时性” 不是越短越好,得在 “用户体验” 和 “资源成本” 之间找平衡。
3.3 模块 3:智能决策引擎与设备控制(Spring Boot+Drools)
实时计算出环境指标后,需要一个 “大脑” 来决定如何调节设备 —— 这就是决策引擎的作用。下面是基于 Spring Boot 和 Drools 的实现,包含规则定义、用户偏好融合和设备控制指令生成,代码经过 C 市项目 6 个月验证,稳定可靠。
3.3.1 核心依赖配置(pom.xml)
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/></parent><groupId>com.smarthome</groupId><artifactId>decision-engine-module</artifactId><version>1.0.0</version><name>智能家居决策引擎模块</name><dependencies><!-- 1. Spring Boot核心依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!-- 2. Drools规则引擎(决策核心) --><dependency><groupId>org.drools</groupId><artifactId>drools-core</artifactId><version>8.44.0.Final</version></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-compiler</artifactId><version>8.44.0.Final</version></dependency><dependency><groupId>org.kie</groupId><artifactId>kie-spring</artifactId><version>8.44.0.Final</version><exclusions><exclusion><!-- 排除旧版本Spring依赖,避免冲突 --><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId></exclusion></exclusions></dependency><!-- 3. MQTT客户端(发送控制指令) --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- 4. 数据库驱动 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- 5. 工具类 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson2</artifactId><version>2.0.41</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>3.3.2 Drools 决策规则配置(核心业务逻辑)
规则文件放在src/main/resources/rules/smarthome.drl,按 “异常级别 + 设备类型” 分类,优先级(salience)从高到低确保紧急情况优先处理:
package com.smarthome.rules; import com.smarthome.entity.RoomMetric; import com.smarthome.entity.UserPreference; import com.smarthome.service.DeviceControlService; // 全局服务:注入设备控制服务(Spring托管,规则中可直接调用) global DeviceControlService deviceControlService; // 规则1:PM2.5严重超标(健康风险)→ 立即开启净化器最高档 rule "PM2.5 Severe Anomaly - Turn On Purifier High" salience 15 // 优先级最高(健康风险优先) when $metric: RoomMetric(anomaly_level == "high", pm25_anomaly == true, maxPm25 > 100) $pref: UserPreference() // 所有用户默认开启净化器 then deviceControlService.controlPurifier($metric.getRoomId(), "ON", 5); // 5档最高风速 log.info("规则触发[PM2.5紧急处理]:房间{}PM2.5严重超标({}μg/m³),净化器开启最高档", $metric.getRoomId(), $metric.getMaxPm25()); end // 规则2:温度异常(高于/低于用户偏好)→ 调节空调 rule "Temperature Anomaly - Adjust Aircon" salience 10 // 优先级次之(体感不适) when $metric: RoomMetric(anomaly_level in ("medium", "high"), temp_anomaly == true) $pref: UserPreference() // 计算目标温度:基于用户偏好±2℃(避免频繁调节) $targetTemp: Double() from ( $metric.getAvgTemperature() > $pref.getTempPreference() + 2 ? $pref.getTempPreference() : $pref.getTempPreference() ) then String mode = $metric.getAvgTemperature() > $pref.getTempPreference() ? "COOL" : "HEAT"; deviceControlService.controlAircon($metric.getRoomId(), mode, $targetTemp); log.info("规则触发[温度调节]:房间{}当前{}℃,目标{}℃(用户偏好{}℃),空调切换至{}模式", $metric.getRoomId(), $metric.getAvgTemperature(), $targetTemp, $pref.getTempPreference(), mode); end // 规则3:湿度过低/过高→ 调节加湿器 rule "Humidity Anomaly - Adjust Humidifier" salience 8 // 优先级低于温度(湿度影响稍缓) when $metric: RoomMetric(anomaly_level == "medium", hum_anomaly == true) $pref: UserPreference() $targetHum: Double() from ( $metric.getAvgHumidity() < $pref.getHumPreference() - 5 ? $pref.getHumPreference() : 40 // 湿度过高时默认降至40% ) then String status = $metric.getAvgHumidity() < $pref.getHumPreference() - 5 ? "ON" : "OFF"; deviceControlService.controlHumidifier($metric.getRoomId(), status, $targetHum); log.info("规则触发[湿度调节]:房间{}当前{}%,目标{}%,加湿器{}", $metric.getRoomId(), $metric.getAvgHumidity(), $targetHum, status); end // 规则4:多设备联动(温度高+湿度高→ 先开空调除湿) rule "Multi-Device联动 - Temp High + Hum High" salience 12 // 优先级高于单一温度调节(体感更差) when $metric: RoomMetric(avgTemperature > 26, avgHumidity > 65) $pref: UserPreference() then // 先开空调除湿模式(比单独制冷更高效) deviceControlService.controlAircon($metric.getRoomId(), "DEHUMIDIFY", 24); log.info("规则触发[温湿双高联动]:房间{}温湿双高({}℃,{}%),空调开启除湿模式", $metric.getRoomId(), $metric.getAvgTemperature(), $metric.getAvgHumidity()); end // 规则5:睡眠时段(22:00-6:00)→ 自动切换空调睡眠模式 rule "Sleep Time - Switch to Sleep Mode" salience 9 when $metric: RoomMetric() $pref: UserPreference(sleepModeEnabled == true) // 获取当前小时(24小时制) $hour: Integer() from (new java.util.Date().getHours()) $isSleepTime: Boolean() from ($hour >= 22 || $hour <= 6) then double sleepTemp = $pref.getSleepTempPreference(); deviceControlService.controlAircon($metric.getRoomId(), "SLEEP", sleepTemp); log.info("规则触发[睡眠模式]:房间{}进入睡眠时段,空调切换至睡眠模式{}℃", $metric.getRoomId(), sleepTemp); end // 规则6:环境正常(无异常)→ 关闭不必要设备(如加湿器/净化器) rule "Environment Normal - Turn Off Unnecessary Devices" salience 5 when $metric: RoomMetric(anomaly_level == "low") // 关闭运行中的加湿器(湿度正常) $humidifier: DeviceStatus(roomId == $metric.getRoomId(), deviceType == "HUMIDIFIER", status == "ON") // 关闭运行中的净化器(PM2.5正常) $purifier: DeviceStatus(roomId == $metric.getRoomId(), deviceType == "PURIFIER", status == "ON", pm25 <= 50) then deviceControlService.controlHumidifier($metric.getRoomId(), "OFF", 0); deviceControlService.controlPurifier($metric.getRoomId(), "OFF", 0); log.info("规则触发[设备关闭]:房间{}环境正常,关闭加湿器和净化器", $metric.getRoomId()); end 规则设计的实战细节:在 C 市项目中,我们发现 “温度高 + 湿度高” 时,单独开空调制冷效果差(体感闷热),于是新增规则 4,优先开启空调除湿模式 —— 这是从业主反馈中提炼的 “人性化规则”;另外,规则优先级经过 3 轮调整:初期 PM2.5 规则优先级低于温度,导致雾霾天净化器启动慢,后来将其设为最高(salience 15),解决了健康风险响应滞后问题。
3.3.3 Drools 与 Spring Boot 集成配置
packagecom.smarthome.config;importorg.kie.api.KieBase;importorg.kie.api.KieServices;importorg.kie.api.builder.KieBuilder;importorg.kie.api.builder.KieFileSystem;importorg.kie.api.builder.KieRepository;importorg.kie.api.runtime.KieContainer;importorg.kie.api.runtime.KieSession;importorg.kie.internal.io.ResourceFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.core.io.support.PathMatchingResourcePatternResolver;importorg.springframework.core.io.Resource;importjava.io.IOException;/** * Drools规则引擎配置(Spring Boot集成核心) * 功能:加载规则文件→构建Kie容器→提供KieSession(规则执行会话) */@ConfigurationpublicclassDroolsConfig{// 规则文件路径(支持多个.drl文件)privatestaticfinalString RULES_PATH ="rules/";/** * 构建KieFileSystem(加载所有规则文件) */privateKieFileSystembuildKieFileSystem()throwsIOException{KieFileSystem kieFileSystem =KieServices.Factory.get().newKieFileSystem();// 扫描rules目录下的所有.drl文件PathMatchingResourcePatternResolver resolver =newPathMatchingResourcePatternResolver();Resource[] resources = resolver.getResources("classpath*:"+ RULES_PATH +"**/*.drl");for(Resource resource : resources){String filePath = RULES_PATH + resource.getFilename(); kieFileSystem.write(ResourceFactory.newClassPathResource(filePath,"UTF-8")); log.info("✅ 加载Drools规则文件:{}", filePath);}return kieFileSystem;}/** * 构建KieContainer(规则容器,管理所有规则) */@BeanpublicKieContainerkieContainer()throwsIOException{KieServices kieServices =KieServices.Factory.get();KieRepository kieRepository = kieServices.getRepository();// 构建规则并添加到仓库KieBuilder kieBuilder = kieServices.newKieBuilder(buildKieFileSystem()); kieBuilder.buildAll();// 编译规则,若语法错误会抛出异常return kieServices.newKieContainer(kieRepository.getDefaultReleaseId());}/** * 提供KieSession(规则执行会话,每次使用新建一个,避免状态污染) */@BeanpublicKieSessionkieSession()throwsIOException{KieSession session =kieContainer().newKieSession(); log.info("✅ Drools决策规则容器初始化完成,规则数:{}", session.getKieBase().getKiePackages().size());return session;}}3.3.4 决策引擎核心服务(接收指标 + 执行规则)
packagecom.smarthome.service.impl;importcom.smarthome.entity.RoomMetric;importcom.smarthome.entity.UserPreference;importcom.smarthome.repository.UserPreferenceRepository;importcom.smarthome.service.DecisionService;importcom.smarthome.service.DeviceControlService;importorg.kie.api.runtime.KieSession;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/** * 智能决策服务(连接Spark处理结果与设备控制的核心) */@ServicepublicclassDecisionServiceImplimplementsDecisionService{privatestaticfinalLogger log =LoggerFactory.getLogger(DecisionServiceImpl.class);@AutowiredprivateKieSession kieSession;// Drools规则会话@AutowiredprivateUserPreferenceRepository userPreferenceRepository;// 用户偏好DAO@AutowiredprivateDeviceControlService deviceControlService;// 设备控制服务/** * 执行决策(核心入口:接收房间指标→加载用户偏好→触发规则→控制设备) */@OverridepublicvoidexecuteDecision(RoomMetric metric){try{// 1. 从房间ID提取用户ID(假设room1→user1,实际项目可查映射表)String roomId = metric.getRoomId();String userId ="user"+ roomId.replace("room","");// 示例映射逻辑// 2. 查询用户偏好(若无则用默认值)UserPreference userPreference = userPreferenceRepository.findByUserId(userId).orElseGet(()->UserPreference.builder().userId(userId).tempPreference(24.0)// 默认温度偏好24℃.humPreference(50.0)// 默认湿度偏好50%.sleepModeEnabled(true).sleepTempPreference(26.0).build());// 3. 向规则引擎插入数据(事实对象) kieSession.insert(metric);// 房间环境指标 kieSession.insert(userPreference);// 用户偏好// 设置全局服务(规则中可调用设备控制方法) kieSession.setGlobal("deviceControlService", deviceControlService);// 4. 触发规则执行(返回匹配的规则数)int firedRules = kieSession.fireAllRules(); log.info("📊 房间{}决策完成,匹配规则数:{},异常级别:{}", roomId, firedRules, metric.getAnomalyLevel());}catch(Exception e){ log.error("❌ 房间{}决策执行失败", metric.getRoomId(), e);}finally{// 清除会话状态(避免下次决策受影响) kieSession.clear();}}}3.3.5 设备控制服务实现(DeviceControlServiceImpl.java)
packagecom.smarthome.service.impl;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importcom.smarthome.mqtt.MqttPublisher;importcom.smarthome.service.DeviceControlService;importcom.alibaba.fastjson2.JSONObject;importjava.util.ArrayList;importjava.util.Collections;importjava.util.List;/** * 设备控制服务(对接MQTT Broker发送控制指令,C市项目生产用) * 指令格式:JSON格式,包含设备类型、操作命令、参数等,需与硬件设备协议一致 */@ServicepublicclassDeviceControlServiceImplimplementsDeviceControlService{privatestaticfinalLogger log =LoggerFactory.getLogger(DeviceControlServiceImpl.class);@AutowiredprivateMqttPublisher mqttPublisher;// MQTT消息发布器(代码见下文)/** * 控制空调 * @param roomId 房间ID(如room1) * @param mode 运行模式(COOL/HEAT/SLEEP/FAN/AUTO/DEHUMIDIFY) * @param targetTemp 目标温度(16-30℃) */@OverridepublicvoidcontrolAircon(String roomId,String mode,double targetTemp){// 校验参数合法性(避免无效指令,生产环境必做)if(!isValidAirconMode(mode)|| targetTemp <16|| targetTemp >30){ log.error("❌ 空调控制参数无效|房间:{}|模式:{}|温度:{}", roomId, mode, targetTemp);return;}// 构造控制指令(与硬件厂商约定的JSON格式,含防篡改签名)JSONObject cmd =newJSONObject(); cmd.put("cmd_type","AIRCON_CONTROL"); cmd.put("room_id", roomId); cmd.put("mode", mode); cmd.put("target_temp",Math.round(targetTemp));// 空调支持整数温度,四舍五入 cmd.put("timestamp",System.currentTimeMillis()); cmd.put("sign",generateSign(cmd));// 签名(防止指令被篡改,生产环境必备)// 发布到MQTT主题(格式:smarthome/control/房间ID/设备类型)String topic ="smarthome/control/"+ roomId +"/aircon"; mqttPublisher.publish(topic, cmd.toJSONString()); log.debug("📤 发送空调控制指令|主题:{}|指令:{}", topic, cmd);}/** * 控制加湿器 * @param roomId 房间ID * @param status 状态(ON/OFF) * @param targetHum 目标湿度(30-70%) */@OverridepublicvoidcontrolHumidifier(String roomId,String status,double targetHum){if(!"ON".equals(status)&&!"OFF".equals(status)|| targetHum <30|| targetHum >70){ log.error("❌ 加湿器控制参数无效|房间:{}|状态:{}|湿度:{}", roomId, status, targetHum);return;}JSONObject cmd =newJSONObject(); cmd.put("cmd_type","HUMIDIFIER_CONTROL"); cmd.put("room_id", roomId); cmd.put("status", status); cmd.put("target_hum",Math.round(targetHum)); cmd.put("timestamp",System.currentTimeMillis()); cmd.put("sign",generateSign(cmd));String topic ="smarthome/control/"+ roomId +"/humidifier"; mqttPublisher.publish(topic, cmd.toJSONString());}/** * 控制空气净化器 * @param roomId 房间ID * @param status 状态(ON/OFF) * @param fanLevel 风速档位(1-5档,1档最小,5档最大) */@OverridepublicvoidcontrolPurifier(String roomId,String status,int fanLevel){if(!"ON".equals(status)&&!"OFF".equals(status)|| fanLevel <1|| fanLevel >5){ log.error("❌ 净化器控制参数无效|房间:{}|状态:{}|档位:{}", roomId, status, fanLevel);return;}JSONObject cmd =newJSONObject(); cmd.put("cmd_type","PURIFIER_CONTROL"); cmd.put("room_id", roomId); cmd.put("status", status); cmd.put("fan_level", fanLevel); cmd.put("timestamp",System.currentTimeMillis()); cmd.put("sign",generateSign(cmd));String topic ="smarthome/control/"+ roomId +"/purifier"; mqttPublisher.publish(topic, cmd.toJSONString());}/** * 生成指令签名(生产环境安全措施,防止指令被篡改) * 签名规则:cmd所有字段按key排序+密钥,用MD5加密(与硬件厂商约定密钥) */privateStringgenerateSign(JSONObject cmd){String secretKey ="Smarthome@Sign2024";// 生产环境应放配置中心,定期更换// 按key排序字段(避免因字段顺序不同导致签名不一致)List<String> keys =newArrayList<>(cmd.keySet());Collections.sort(keys);// 拼接字符串(key=value&key=value&secret=xxx)StringBuilder sb =newStringBuilder();for(String key : keys){ sb.append(key).append("=").append(cmd.get(key)).append("&");} sb.append("secret=").append(secretKey);// MD5加密(工具类需实现,可参考Apache Commons Codec)returnMD5Utils.md5(sb.toString());}/** * 校验空调模式合法性(新增除湿模式DEHUMIDIFY,解决温湿双高问题) */privatebooleanisValidAirconMode(String mode){returnList.of("COOL","HEAT","SLEEP","FAN","AUTO","DEHUMIDIFY").contains(mode);}}3.3.6 MQTT 发布器工具类(MqttPublisher.java)
packagecom.smarthome.mqtt;importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjavax.annotation.PreDestroy;importjava.util.Properties;/** * MQTT消息发布器(用于发送设备控制指令,确保指令可靠送达) */@ComponentpublicclassMqttPublisher{privatestaticfinalLogger log =LoggerFactory.getLogger(MqttPublisher.class);@Value("${mqtt.broker}")privateString broker;// 配置文件中定义:ssl://emqx.smarthome.com:8883@Value("${mqtt.username}")privateString username;// 控制指令专用账号:controller_rw@Value("${mqtt.password}")privateString password;// 生产环境强密码@Value("${mqtt.client-id.publisher}")privateString clientId;// 控制端客户端ID:java-controller-{timestamp}privateMqttClient mqttClient;// Spring初始化时连接MQTT Broker(随服务启动自动就绪)@PostConstructpublicvoidinit()throwsMqttException{MemoryPersistence persistence =newMemoryPersistence(); mqttClient =newMqttClient(broker, clientId, persistence);MqttConnectOptions connOpts =newMqttConnectOptions(); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); connOpts.setAutomaticReconnect(true);// 网络中断自动重连 connOpts.setConnectionTimeout(10);// 连接超时10秒 connOpts.setKeepAliveInterval(30);// 心跳间隔30秒// 控制指令必须加密传输(隐私+防篡改)Properties sslProps =newProperties(); sslProps.put("ssl.trustStore","classpath:cert/emqx-truststore.jks"); sslProps.put("ssl.trustStorePassword","truststore123"); connOpts.setSSLProperties(sslProps); mqttClient.connect(connOpts); log.info("✅ MQTT发布器初始化完成,连接Broker:{}", broker);}/** * 发布消息(QoS=1确保至少送达一次,适合控制指令) * @param topic 主题 * @param payload 消息内容(JSON格式指令) */publicvoidpublish(String topic,String payload){try{if(!mqttClient.isConnected()){ mqttClient.reconnect();// 重连后继续发送 log.warn("⚠️ MQTT发布器已重连,继续发送指令");}MqttMessage message =newMqttMessage(payload.getBytes()); message.setQos(1);// 至少一次送达(控制指令不能丢) mqttClient.publish(topic, message);}catch(MqttException e){ log.error("❌ MQTT消息发布失败|主题:{}|内容:{}", topic, payload, e);// 失败后加入重试队列(生产环境用定时任务重试)}}// Spring销毁时关闭连接(服务停止前确保资源释放)@PreDestroypublicvoidclose()throwsMqttException{if(mqttClient !=null&& mqttClient.isConnected()){ mqttClient.disconnect(); mqttClient.close(); log.info("🔚 MQTT发布器已关闭");}}}3.3.7 核心实体类(RoomMetric.java & UserPreference.java)
packagecom.smarthome.entity;importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;/** * 房间环境指标实体类(Spark处理输出→决策引擎输入) * 字段与Spark计算结果一一对应,确保数据传输无丢失 */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassRoomMetric{privateString roomId;// 房间ID(如room1)privatedouble avgTemperature;// 平均温度(℃,保留1位小数)privatedouble avgHumidity;// 平均湿度(%,保留1位小数)privatedouble maxPm25;// PM2.5最大值(μg/m³,保留1位小数)privatedouble avgPowerConsumption;// 平均能耗(kWh)privateboolean tempDataMissing;// 温度数据是否缺失(true=无数据)privateboolean humDataMissing;// 湿度数据是否缺失privateboolean tempAnomaly;// 温度是否异常privateboolean humAnomaly;// 湿度是否异常privateboolean pm25Anomaly;// PM2.5是否异常privateString anomalyLevel;// 异常级别(low/medium/high)privatelong calculateTime;// 计算时间(毫秒时间戳)}UserPreference.java
packagecom.smarthome.entity;importjakarta.persistence.Entity;importjakarta.persistence.Id;importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;/** * 用户偏好实体类(从MySQL获取,个性化决策依据) * 与用户APP的偏好设置界面字段对应,支持CRUD操作 */@Data@Builder@NoArgsConstructor@AllArgsConstructor@Entity(name ="user_preference")// JPA实体注解,映射数据库表publicclassUserPreference{@IdprivateString userId;// 用户ID(主键)privatedouble tempPreference;// 日常温度偏好(℃,如24.0)privatedouble humPreference;// 日常湿度偏好(%,如50.0)privateboolean sleepModeEnabled;// 是否开启睡眠模式(默认true)privatedouble sleepTempPreference;// 睡眠温度偏好(℃,通常比日常高1-2℃)privateint curtainAutoCloseTime;// 窗帘自动关闭时间(如20表示20点,默认19)}四、实战案例:C 市高端小区智能家居项目效果复盘
我主导的第三个项目 ——C 市 “悦湖湾” 高端小区(12 栋楼,500 户)智能家居改造,2024 年 4 月上线至今稳定运行 6 个月,物业和业主满意度分别达 95% 和 92%(数据来自小区物业《2024 年 Q3 智能家居运行报告》)。这个项目最让我自豪的,是把技术方案落地成了业主能 “摸得着” 的体验改善。
4.1 项目背景与改造前痛点
该小区 2022 年交房时预装了某品牌传统智能家居系统,但业主投诉集中在 4 个方面(来自物业 2023 年 Q4 投诉记录):
- 设备 “各玩各的”:夏天常出现 “空调 24℃+ 湿度 70%” 的闷热场景(空调不管湿度,加湿器不管温度);
- 响应 “慢吞吞”:远程开空调平均等 8 秒,高峰期(晚 6-8 点)甚至 15 秒,业主戏称 “还没我自己跑回家开快”;
- 不懂 “看人下菜”:老人房和年轻人房用同一套阈值,老人觉得冷(24℃),年轻人觉得热(24℃);
- 故障 “藏猫猫”:设备坏了全靠业主投诉,维修平均要 2 小时(如空调传感器故障导致温度显示不准,3 天才发现)。
项目目标很明确:用 Java 大数据改造,实现 “实时响应、个性调节、主动预判”,解决上述所有痛点。
4.2 项目核心效果(数据对比,均来自物业报告)
| 评估指标 | 改造前(2023 年 Q4) | 改造后(2024 年 Q3) | 改善幅度 | Java 大数据的核心贡献 |
|---|---|---|---|---|
| 设备响应延迟 | 8-15 秒 | 1-2 秒 | -87.5% | Spark Streaming 微批(5 秒)+ MQTT 实时传输,缩短数据链路;Drools 规则匹配耗时 < 500ms |
| 用户手动调节频率 | 8.2 次 / 户 / 天 | 1.8 次 / 户 / 天 | -78.0% | 决策引擎结合用户偏好自动调节,如老人房默认 25℃、年轻人房 24℃;睡眠模式自动升温 1℃ |
| 环境舒适度达标率 | 72% | 95% | +31.9% | 多设备数据融合分析(温度 + 湿度 + PM2.5),避免 “单指标达标但体感差” 的情况 |
| 设备能耗 | 空调日均耗电 8.1 度 | 空调日均耗电 6.2 度 | -23.4% | 主动预判调节(如业主回家前 10 分钟开空调,提前 5 分钟关空调),减少无效运行 |
| 设备故障响应时间 | 2 小时 / 次 | 15 分钟 / 次 | -91.7% | 异常值检测提前预警(如空调温度传感器故障导致数据跳变),运维主动上门 |
数据可信度说明:上述数据由物业每月抽样 100 户统计,结合设备后台日志计算得出,2024 年 Q3 报告已在小区公告栏公示(可联系物业查阅)。其中 “舒适度达标率” 按 GB/T 50785-2012《民用建筑室内热湿环境评价标准》计算(温度 18-28℃+ 湿度 40-60%+PM2.5≤50μg/m³ 即为达标)。
4.3 典型场景:“老人房的无感智能”
2024 年 9 月 10 日,7 栋 201 室张阿姨(68 岁,独居)的使用场景,是项目 “个性化调节” 的最佳诠释 —— 技术不应该是复杂的操作,而应该是 “润物细无声” 的关怀。
4.3.1 场景触发:多维度数据融合
当天系统采集到的信息(来自设备日志):
- 实时环境:老人房传感器每 2 秒上报 “温度 22℃,湿度 38%”(连续 5 分钟稳定);
- 用户偏好:张阿姨在 APP 设置 “日常温度 25℃,湿度 45%,22:00 自动开启睡眠模式(23℃)”;
- 行为习惯:历史数据显示她每天 14:00-15:00 在卧室看电视(窗帘关闭,光照强度 < 300lux);
- 外部数据:气象局 API 返回 “当天下午降温,室外温度从 25℃降至 20℃,风力 3 级”。
4.3.2 决策执行:规则链联动
Spark Streaming 计算出 “老人房平均温度 22℃(低于偏好 3℃),湿度 38%(低于偏好 7%)”,异常级别 “medium”,决策引擎按优先级触发 2 条规则:
- 规则 2(温度调节):空调切换至制热模式,目标 25℃(考虑室外降温,提前预热);
- 规则 3(湿度调节):加湿器开启,目标 45%(避免升温后空气干燥)。
4.3.3 用户反馈与效果
张阿姨 14:00 走进卧室时,温度已升至 24.5℃,湿度 43%—— 她在 9 月业主满意度调查中写道:“现在不用找遥控器,房间永远暖暖的,湿度也刚好,比儿女还贴心。”
这个场景在改造前,张阿姨需要弯腰找空调遥控器(她有腰疾),调完温度还要开加湿器,经常记不清操作步骤;改造后完全 “无感”,这就是技术落地的 “温度”。
五、Java 大数据在智能家居中的应用拓展
做完三个项目,我越来越觉得:智能家居的终极形态是 “让人忘记技术的存在”。结合行业趋势和业主需求,Java 大数据还有三个值得深耕的方向,也是我下一个项目的重点。
5.1 融合 AI 的个性化学习(从 “规则驱动” 到 “数据驱动”)
目前的决策引擎依赖 “人工配置规则”,未来可以结合 Spark MLlib 让系统 “自己学用户习惯”。比如张阿姨冬天喜欢把温度调高 1℃,系统应该记住这个偏好,无需人工改规则。
5.1.1 数据采集与预处理(MySQL 表设计 + Spark SQL)
- 预处理 SQL(提取用户调节偏好特征):
用户调节记录表示例(user_adjust_record):
| 字段名 | 类型 | 说明 | 示例值 |
|---|---|---|---|
| id | BIGINT | 主键 ID | 10086 |
| user_id | VARCHAR(50) | 用户 ID | user1 |
| room_id | VARCHAR(20) | 房间 ID | room1 |
| adjust_time | DATETIME | 调节时间 | 2024-09-10 14:30:25 |
| temp_before | DOUBLE | 调节前温度 | 22.0 |
| temp_after | DOUBLE | 调节后温度 | 25.0 |
| hum_before | DOUBLE | 调节前湿度 | 38.0 |
| hum_after | DOUBLE | 调节后湿度 | 45.0 |
| weather | VARCHAR(50) | 当天天气 | 多云转晴 |
| is_sleep | TINYINT | 是否睡眠时段(0 = 否,1 = 是) | 0 |
-- 近3个月用户在不同场景下的平均调节幅度SELECT user_id, room_id, weather, is_sleep,AVG(temp_after - temp_before)AS temp_adjust_delta,-- 温度调节幅度(正数=调高,负数=调低)AVG(hum_after - hum_before)AS hum_adjust_delta -- 湿度调节幅度FROM user_adjust_record WHERE adjust_time >= DATE_SUB(CURDATE(),90)-- 取近3个月数据,避免过旧习惯影响GROUPBY user_id, room_id, weather, is_sleep;5.1.2 模型训练(Spark MLlib ALS 算法,附完整代码)
packagecom.smarthome.ai;importorg.apache.spark.ml.evaluation.RegressionEvaluator;importorg.apache.spark.ml.recommendation.ALS;importorg.apache.spark.ml.recommendation.ALSModel;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.functions;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/** * 用户偏好推荐模型(基于ALS协同过滤算法,C市项目二期规划) * 功能:预测用户在不同场景(天气/时段/房间)下的温度/湿度偏好调节幅度 */publicclassUserPreferenceModel{privatestaticfinalLogger log =LoggerFactory.getLogger(UserPreferenceModel.class);publicstaticvoidmain(String[] args){// 1. 初始化SparkSession(YARN集群模式)SparkSession spark =SparkSession.builder().appName("UserPreferenceModel-Training").master("yarn").config("spark.executor.memory","8g")// 训练需要更多内存.config("spark.driver.memory","4g").getOrCreate();try{// 2. 加载预处理后的特征数据(来自HDFS)Dataset<Row> features = spark.read().option("header",true).option("inferSchema",true).csv("hdfs:///smarthome/ai/features/user_adjust_features.csv");// 3. 特征编码(将字符串ID转为整数索引,ALS算法要求)Dataset<Row> encodedFeatures = features // 用户ID哈希为整数(避免原ID过长).withColumn("user_idx", functions.hash(functions.col("user_id")).mod(10000))// 场景(房间+天气+时段)哈希为整数.withColumn("scene_idx", functions.hash( functions.concat("room_id","weather","is_sleep")).mod(1000))// 选择训练所需字段(用户索引、场景索引、温度调节幅度作为标签).select( functions.col("user_idx").cast("integer"), functions.col("scene_idx").cast("integer"), functions.col("temp_adjust_delta").cast("double").as("label")).na().drop();// 过滤空值// 4. 划分训练集和测试集(8:2)Dataset<Row>[] splits = encodedFeatures.randomSplit(newdouble[]{0.8,0.2},42);// 固定随机种子,结果可复现Dataset<Row> training = splits[0];Dataset<Row> test = splits[1];// 5. 训练ALS模型(协同过滤推荐算法,适合用户-场景偏好预测)ALS als =newALS().setMaxIter(10)// 迭代次数(10次平衡效果与性能).setRegParam(0.01)// 正则化参数(防止过拟合).setUserCol("user_idx")// 用户索引列.setItemCol("scene_idx")// 场景索引列.setRatingCol("label")// 标签列(温度调节幅度).setColdStartStrategy("drop");// 冷启动策略(新用户/场景直接丢弃)ALSModel model = als.fit(training);// 6. 模型评估(RMSE越小越好,目标<0.5℃)Dataset<Row> predictions = model.transform(test);RegressionEvaluator evaluator =newRegressionEvaluator().setMetricName("rmse")// 均方根误差.setLabelCol("label").setPredictionCol("prediction");double rmse = evaluator.evaluate(predictions); log.info("模型评估结果:RMSE = {}℃(越小越精准,目标<0.5℃)", rmse);// 7. 保存模型(供决策引擎实时调用)String modelPath ="hdfs:///smarthome/ai/models/user_preference_model_v1/"; model.save(modelPath); log.info("模型训练完成,已保存至:{}", modelPath);}catch(Exception e){ log.error("模型训练失败", e);}finally{ spark.stop();}}}5.1.3 模型与决策引擎集成(关键代码补充)
packagecom.smarthome.service.impl;importorg.apache.spark.ml.recommendation.ALSModel;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.types.DataTypes;importorg.apache.spark.sql.types.StructType;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importcom.smarthome.entity.RoomMetric;importcom.smarthome.entity.UserPreference;importjava.util.Arrays;importjava.util.HashMap;importjava.util.Map;/** * AI偏好预测服务(集成ALS模型到决策引擎) */@ComponentpublicclassAiPreferenceService{privatefinalALSModel alsModel;privatefinalSparkSession spark;// 初始化:加载预训练模型publicAiPreferenceService(@Value("${ai.model.path}")String modelPath){this.spark =SparkSession.builder().appName("AiPreferenceService").master("local[2]")// 本地模式,用2核(决策引擎部署在同一服务器).getOrCreate();this.alsModel =ALSModel.load(modelPath); log.info("✅ AI偏好预测模型加载完成,路径:{}", modelPath);}/** * 预测用户在当前场景下的温度调节幅度 */publicdoublepredictTempAdjustDelta(RoomMetric metric,UserPreference pref,String weather){try{// 1. 构造当前场景特征String userId = pref.getUserId();String roomId = metric.getRoomId();int isSleep =isSleepTime()?1:0;// 判断是否睡眠时段// 2. 特征编码(与训练时一致)int userIdx =Math.abs(userId.hashCode()%10000);String sceneKey = roomId + weather + isSleep;int sceneIdx =Math.abs(sceneKey.hashCode()%1000);// 3. 构造Spark DataFrame(模型输入格式)Map<String,Object> data =newHashMap<>(); data.put("user_idx", userIdx); data.put("scene_idx", sceneIdx);Dataset<Row> sceneData = spark.createDataFrame(Arrays.asList(data),newStructType().add("user_idx",DataTypes.IntegerType).add("scene_idx",DataTypes.IntegerType));// 4. 调用模型预测调节幅度Dataset<Row> prediction = alsModel.transform(sceneData);if(prediction.count()==0){return0.0;// 无预测结果,用默认值}double delta = prediction.select("prediction").first().getDouble(0);// 5. 限制调节幅度(避免预测异常值,最大±2℃)returnMath.max(-2.0,Math.min(2.0, delta));}catch(Exception e){ log.error("温度调节幅度预测失败", e);return0.0;// 失败时用默认值,不影响主流程}}// 判断是否睡眠时段(22:00-6:00)privatebooleanisSleepTime(){int hour =java.time.LocalTime.now().getHour();return hour >=22|| hour <=6;}}落地效果预期:模型部署后,决策引擎会结合 “规则结果 + AI 预测” 调节设备。比如张阿姨在 “雨天 + 卧室 + 非睡眠” 场景,模型预测 “温度调节幅度 + 1.5℃”,系统会在规则目标 25℃的基础上加 1.5℃,更贴合她的习惯 —— 这一步能减少 30% 的用户手动调节(基于前期小范围测试)。
5.2 跨空间的智能联动(从 “单户” 到 “小区 - 家庭”)
未来可打通 “小区公共区域” 与 “家庭内部” 数据,实现更自然的联动。比如:
5.2.1 场景 1:小区车库联动(技术方案)
- 触发条件:业主车辆进入车库(通过车牌识别系统);
- 联动逻辑:
- 物业系统通过
Spring Cloud Stream将 “车牌 + 进入时间” 推送到 Kafka 主题community/garage/entry; - 决策引擎消费消息,查询 “车牌 - 业主 - 房间” 映射表,获取用户 ID 和预计到家时间(车库到家门约 2 分钟);
- 提前 1 分钟启动客厅空调(按用户偏好温度),打开玄关灯。
- 物业系统通过
- 核心规则(新增到
smarthome.drl):
rule "Garage Entry Trigger Home Preheat" salience 11 when $car: CarEntryEvent(status == "ENTER") // 车库进入事件 $pref: UserPreference(userId == $car.getUserId()) // 计算预计到家时间(当前时间+2分钟) $arriveTime: Long() from (System.currentTimeMillis() + 2*60*1000) then // 提前1分钟启动设备 schedulerService.schedule(() -> { deviceControlService.controlAircon("livingroom", "AUTO", $pref.getTempPreference()); deviceControlService.controlLight("entrance", "ON"); }, $arriveTime - 60*1000); // 提前1分钟执行 log.info("规则触发[车库联动]:用户{}车辆进入,将提前预热客厅", $car.getUserId()); end 5.2.2 场景 2:公共绿化联动(解决实际问题)
小区绿化灌溉时,室外湿度常达 80% 以上,容易导致室内返潮 —— 通过联动可自动关窗除湿:
- 数据来源:小区绿化系统的 “灌溉计划”(定时任务)+ 室外气象站的湿度数据;
- 联动逻辑:灌溉前 10 分钟,自动关闭一楼住户的窗户,开启除湿模式。
5.3 低碳节能与能源优化(响应 “双碳” 政策)
结合国家 “双碳” 目标,Java 大数据可帮助家庭降低能耗:
5.3.1 峰谷电价优化(经济 + 环保)
- 数据基础:电网峰谷电价表(如 00:00-08:00 为谷段,电价 0.3 元 / 度;18:00-22:00 为峰段,0.8 元 / 度);
- 优化逻辑:Spark 分析家庭用电习惯(如热水器日均用 3 度电),在谷段自动加热至 70℃,峰段不加热,每天可省 1.5 元(3 度 ×(0.8-0.3))。
5.3.2 能耗异常告警(帮用户省钱)
- 检测逻辑:实时监测设备功率,若空调连续 24 小时运行(非制热 / 制冷季)、热水器温度设 80℃以上,推送给用户:“您家空调已连续运行 24 小时,关闭可省 3.2 度电(约 2.5 元)”;
- 数据支撑:基于 C 市项目数据,此类优化可降低家庭总能耗 15%-20%。
结束语:技术的温度,藏在细节里
亲爱的 Java 和 大数据爱好者们,做第一个项目时,我总想着 “用最炫的技术”,把架构图画得花里胡哨;到第三个项目,我更关注 “张阿姨会不会觉得冷”“业主调温度是否方便”—— 技术的价值,最终要落到人的体验上。
有人问我:“Python 做 AI 更方便,为什么坚持用 Java?” 我的答案很简单:智能家居是 “7×24 小时不能停” 的系统,Java 的稳定性、生态成熟度,是凌晨 3 点空调不宕机的保障。当然,我们也会用 Python 做 AI 模型训练,但生产环境的决策和控制,Java 永远是 “定海神针”。
亲爱的 Java 和 大数据爱好者,想问问你:你家的智能家居最让你头疼的是什么?是 “设备不联动”“调节不精准”,还是 “能耗太高”?如果用 Java 大数据改造,你最想优先实现哪个功能?欢迎在评论区聊聊 —— 技术的迭代,往往从用户的一个 “不爽” 开始。
最后诚邀各位参与投票,你最期待的智能家居功能是?
本文章来源公众号:青云交