跳到主要内容智能家居联动与节能优化:Java大数据实战拆解 | 极客日志Javajava算法
智能家居联动与节能优化:Java大数据实战拆解
基于真实项目,用Java Flink构建动态联动与场景化节能系统。通过状态流+广播规则实现185ms级联动,并解决10万级规则匹配的性能瓶颈;采用ARIMA预测与贪心调度,使峰电占比从78%降至32%,整体电费减半。同时积累了数据倾斜、指令丢失、隐私防护等生产级避坑方案。
莫名其妙3 浏览 在智能家居里做联动和节能,算不上新鲜概念。但真要在线上扛住几十万设备、让规则匹配快得像本地调用、还能把电费砍掉三成以上,就不是搭个 demo 那么简单了。过去几年,我们用 Java 生态在几个中大型小区里跑了这套系统,踩了不少坑,也攒下一些能直接复用的组件和思路。
架构总览
整个系统要同时解决三个问题:设备数据稳定进来、联动规则实时计算、节能策略精确执行。我们按'采集 - 计算 - 决策'三层去搭,实测能撑住百万级设备并发,端到端延迟压在 500ms 以内。

下表是线上的实际选型,版本号、配置参数都是生产打磨过的。
| 技术层级 | 组件名称 | 版本 | 核心用途 | 生产配置细节 |
|---|
| 数据采集 | Java MQTT Client | 1.2.5 | 边缘设备数据接入 | SSL 加密,QoS=1,心跳 30s |
| Flink CDC | 2.4.0 | 云端设备状态同步 | 捕获 MySQL binlog,增量同步 |
| Kafka | 3.5.1 | 用户行为与设备事件采集 | 3 节点集群,replica=3 |
| 数据存储 | ClickHouse | 23.12.4.11 | 实时设备状态存储 | 3 节点集群,查询延迟≤180ms |
| Hive | 3.1.3 | 历史能耗与行为数据存储 | ORC 压缩,每日自动归档 |
| Redis Cluster | 7.0.12 | 热点数据缓存 | 6 节点,淘汰策略 volatile-lru |
| 计算引擎 | Flink | 1.18.0 | 实时联动与监控 | 并行度 12,Checkpoint 3min |
| Spark | 3.4.1 | 离线建模与预测 |
| 应用层 | Spring Boot | 3.2.5 | 后端服务框架 | 线程池核心数 20,超时 3s |
| MQTT Broker (EMQX) | 5.1.6 | 设备控制指令下发 | 8 节点集群,最大连接数 100万 |
关键模型
设备状态和联动规则需要落在存储里,实体设计得简单直接。
package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class DeviceStatus implements Serializable {
private String deviceId;
private String deviceType;
private String status;
private float value;
private long updateTime;
private int isOnline;
private String roomId;
private String communityId;
private String userId;
}
package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class LinkageRule implements Serializable {
private Long ruleId;
private String ruleName;
private String conditionSql;
private String actionJson;
private int isEnable;
private String sceneType;
private String userId;
private String createTime;
private String updateTime;
}
有个小工具需要补上,后面拿 Bean 的时候会省事很多。
package com.smarthome.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
applicationContext = context;
}
public static <T> T getBean(Class<T> clazz) {
if (applicationContext == null) {
throw new RuntimeException("SpringContext 未初始化");
}
return applicationContext.getBean(clazz);
}
}
动态联动:让规则跟上状态变化
早期智能家居的联动基本都是固化的 if-this-then-that,配完就很难改动,跨品牌基本不互通。我们最开始接小区项目的时候,数了数,平均响应延迟要到 3 秒多,跨品牌兼容率不到 35%。
后面用 Flink 把联动改成了'状态流 + 广播规则流'的模式:设备状态实时上报形成数据流,联动规则通过 CDC 同步以广播形式下发,在算子里做多条件匹配。线上跑起来以后,延迟直接降到 180ms,兼容性问题也在协议适配层彻底搞定了。
核心依赖与输入源
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mqtt.version}</version>
</dependency>
</dependencies>
Kafka 源的构建抽了个工具类,后面各个 Job 复用。
package com.smarthome.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class KafkaSourceBuilder {
public static <T> DataStream<T> build(StreamExecutionEnvironment env, String topic, String groupId, DeserializationSchema<T> deserializer) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-node1:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topic, deserializer, props);
return env.addSource(kafkaConsumer).name("Kafka-Source-" + topic).uid("kafka-source-" + topic);
}
}
设备控制指令通过 MQTT 下发,也封装了一个 RichSink。
package com.smarthome.sink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
public class DeviceControlSink extends RichSinkFunction<String> {
private final String brokerUrl;
private MqttClient mqttClient;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setAutomaticReconnect(true);
mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
mqttClient.connect(connOpts);
}
@Override
public void invoke(String controlCmd, Context context) throws Exception {
if (!mqttClient.isConnected()) return;
mqttClient.publish("device/control/" + extractDeviceId(controlCmd), controlCmd.getBytes(), 1, false);
}
}
联动 Job 骨架
package com.smarthome.flink.job;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
public class DeviceLinkageJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(180000);
env.setParallelism(12);
DataStream<DeviceStatus> deviceStatusStream = KafkaSourceBuilder.build(env, "device_status_topic", "group1", new SimpleStringSchema());
DataStream<LinkageRule> ruleStream = KafkaSourceBuilder.build(env, "linkage_rule_cdc_topic", "group2", new SimpleStringSchema());
BroadcastStream<LinkageRule> broadcastRuleStream = ruleStream.broadcast(new MapStateDescriptor<>("rule-state", String.class, LinkageRule.class));
deviceStatusStream.connect(broadcastRuleStream).process(new BroadcastProcessFunction<DeviceStatus, LinkageRule, String>() {
@Override
public void processElement(DeviceStatus status, ReadOnlyContext ctx, Collector<String> out) {
}
});
env.execute("Device Linkage Job");
}
}
实际落地效果
北京望京 SOHO 公寓的'起床场景'是个典型:工作日早上 7 点拉开窗帘、开空调、预热热水器;周末禁用;雨天窗帘只开 50%;出差时自动暂停。规则用一条 SQL 条件描述:
device_type='temperature_sensor' AND room_id='master_bedroom' AND hour=7 AND day_of_week BETWEEN 1 AND 5
上线后联动响应 180ms,执行准确率 100%,所有接入品牌的设备都能正常响应。
规则太多时的匹配优化
上海项目初期,用户规则膨胀到 10 万条,单设备匹配耗时一度飙到 86ms,Task CPU 经常打满。我们没急着扩资源,先做了一层索引优化:按 userId+roomId 建一级索引,ruleId 做二级,这样每次匹配时实际遍历的规则降到 5 条左右。同时把条件 SQL 的解析结果缓存起来,减少了 90% 的重复解析开销,再对高频场景的规则设置更高优先级。
改造之后,单设备匹配降到 3ms,Task CPU 占用从 85% 回落到 35%。
场景化节能:从被动省电到预判调度
大部分智能家居的节能模式就是设定时间段开关设备,要么完全不管电价,要么牺牲用户体验。在几个大型小区里,我们统计过,68% 的业主有'出门忘关设备'的经历,每天因此浪费 1.8-2.5 度电。
我们的思路是让系统能预测未来 24 小时的能耗,再用贪心调度把任务挤到谷电时段去做。具体来说,基于 ARIMA 对每个大功耗设备做小时级预测,然后结合天气、峰谷电价生成次日调度计划,通过 Flink 定时执行。
模型与调度实体
package com.smarthome.entity;
import lombok.Data;
@Data
public class EnergyConsumption implements Serializable {
private String deviceId;
private String deviceType;
private float energyKwh;
private int runDuration;
private long startTime;
private long endTime;
private String weather;
private float outdoorTemp;
}
package com.smarthome.entity;
import lombok.Data;
@Data
public class EnergySchedule implements Serializable {
private Long scheduleId;
private String deviceId;
private String userId;
private int startHour;
private int endHour;
private String actionJson;
private float energyForecast;
private String priceType;
}
package com.smarthome.util;
import com.alibaba.fastjson.JSONObject;
public class WeatherUtil {
public static JSONObject getCityWeather(String cityAdcode) {
return null;
}
public static float getWeatherFactor(String weather, float outdoorTemp) {
float factor = 1.0f;
if ("rain".equals(weather)) factor += 0.2f;
if (outdoorTemp > 35) factor += 0.15f;
return Math.max(0.8f, Math.min(1.5f, factor));
}
}
ARIMA 预测与贪心调度
package com.smarthome.algorithm;
import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
public class ArimaEnergyPredictor {
private static final int P = 2;
private static final int D = 1;
private static final int Q = 2;
public double[] predictHourlyEnergy(String userId, String deviceId, String cityAdcode) {
return new double[24];
}
public int[] generateEnergySchedule(String userId, String deviceId, String deviceType, double[] predictEnergy, String cityAdcode) {
return new int[24];
}
}
调度执行与效果
执行 Job 的逻辑不复杂:从 Kafka 拉取当日的调度计划,按时触发控制指令,用乐观锁防重。
package com.smarthome.flink.job;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class EnergyScheduleExecuteJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(300000);
env.setParallelism(8);
DataStream<EnergySchedule> scheduleStream = KafkaSourceBuilder.build(env, "energy_schedule_topic", "group", new SimpleStringSchema());
scheduleStream.process(new ProcessFunction<EnergySchedule, String>() {
@Override
public void processElement(EnergySchedule schedule, Context ctx, Collector<String> out) {
}
});
env.execute("Energy Schedule Execute Job");
}
}
上海仁恒河滨城跑下来的数据挺好:热水器只在谷电加热,峰电保温;空调在峰电时段温度自动调高 2℃;洗衣机全部挪到谷电。结果日均总能耗降了 34.1%,峰电时段用电占比从 78% 压到 32%,电费直接砍半。
提升预测准确率
一开始 ARIMA 在极端天气或者用户行为突变的时候偏差率很高,能达到 18.3%。我们做了三件事:一是扩展特征,把天气类型、历史行为模式、星期类型都喂进去;二是用滑动窗口每 7 天重新训练一次,模型不僵化;三是给数据清洗加了三级过滤(有效性、异常值、标签修正)。最终平均预测偏差率稳定在 4.2% 左右。
线上避坑实录
- 数据倾斜:部分大户型设备密度极高,导致 Task 热点 CPU 经常 100%。我们的办法是对高频设备降频上报、Key 里加随机后缀打散,并且让 Flink 动态调整资源。上限从 20 万设备轻松提到 50 万。
- MQTT 指令丢失:网络抖动时指令丢失率一度到 5.2%。解决方案是把 QoS 提到 2,指令先落 Redis,失败自动重试,同时对下发行列加限流。最终丢失率压到 0.08%。
- 数据隐私:整栋楼用统一用户画像那种方式我们没用。所有用户标识在接入层就做了分级脱敏,权限严格绑 RBAC。关键计算尽量推到边缘侧预处理,云端只处理脱敏后的聚合数据。这套通过了等保三级,至今没出过泄露事件。
这些优化都不是一蹴而就的,很多是跑了一段时间才发现。回过头看,Java 大数据在智能家居里的价值,不是堆技术名词,而是能真正解决'忘了关灯多花了多少钱'和'规则半天不反应'这种实际烦恼。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online