一、技术基石:Java 大数据赋能智能家居的'三位一体'架构
要实现'设备联动 + 场景节能',必须先解决三个核心问题:设备数据怎么稳定收?联动规则怎么快速算?节能策略怎么精准优?基于 Java 生态构建的'采集 - 计算 - 决策'三位一体架构,经多项目压测验证,可支撑百万级设备并发接入,实时计算延迟≤500ms。
1.1 架构全景图

1.2 核心技术栈选型与生产配置
| 技术层级 | 组件名称 | 版本 | 核心用途 | 生产配置细节 |
|---|
| 数据采集 | Java MQTT Client | 1.2.5 | 边缘设备数据接入 | SSL 加密,QoS=1,心跳 30 秒,连接池大小 50 |
| Flink CDC | 2.4.0 | 云端设备状态同步 | 捕获 MySQL binlog(ROW 格式),增量同步 |
| Kafka | 3.5.1 | 用户行为与设备事件采集 | 3 节点集群,replica=3,分区数 32 |
| 数据存储 | ClickHouse | 23.12.4.11 | 实时设备状态存储 | 3 节点集群,单表分区 100+,查询延迟≤180ms |
| Hive | 3.1.3 | 历史能耗与行为数据存储 | ORC 压缩,分区字段 dt+device_type |
| Redis Cluster | 7.0.12 | 热点数据缓存 | 6 节点(3 主 3 从),淘汰策略 volatile-lru |
| 计算引擎 | Flink | 1.18.0 | 实时联动与监控 | 并行度 12,Checkpoint 3 分钟/次,RocksDB 状态后端 |
| Spark | 3.4.1 | 离线建模与预测 | executor.cores=4,executor.memory=8g,动态资源分配 |
| TensorFlow Java API | 2.15.0 | AI 场景预测 | 模型轻量化(ONNX 格式),推理延迟≤90ms |
| 应用层 | Spring Boot | 3.2.5 | 后端服务框架 | 线程池核心数 20,最大 40,超时时间 3 秒 |
| MQTT Broker(EMQX) | 5.1.6 | 设备控制指令下发 | 8 节点集群,最大连接数 100 万,QoS=1 投递成功率 99.99% |
1.3 核心数据模型(POJO 类,附表结构与业务含义)
1.3.1 设备状态实体类(对应 ClickHouse 实时表)
package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class DeviceStatus implements Serializable {
private String deviceId;
private String deviceType;
private String status;
private float value;
private long updateTime;
private int isOnline;
private String roomId;
private String communityId;
private String userId;
}
1.3.2 联动规则实体类(对应 MySQL 配置表)
package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class LinkageRule implements Serializable {
private Long ruleId;
private String ruleName;
private String conditionSql;
private String actionJson;
private int isEnable;
private String sceneType;
private String userId;
private String createTime;
private String updateTime;
}
1.3.3 缺失工具类补充:SpringContextUtil(生产必用)
package com.smarthome.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
applicationContext = context;
}
public static <T> T getBean(Class<T> clazz) {
if (applicationContext == null) {
throw new RuntimeException("SpringContext 未初始化");
}
try {
return applicationContext.getBean(clazz);
} catch (Exception e) {
throw new RuntimeException("获取 Bean 失败", e);
}
}
public static <T> T getBean(String beanName, Class<T> clazz) {
if (applicationContext == null) {
throw new ();
}
{
applicationContext.getBean(beanName, clazz);
} (Exception e) {
(, e);
}
}
}
二、核心场景 1:动态联动引擎 —— 从'固定规则'到'数据驱动'
2.1 行业痛点:传统联动的'三大死穴'
- 规则刚性,不会'变通':定时关窗帘在出差时仍执行,雨天开窗与空调开着冲突。
- 无上下文感知,响应滞后:依赖'定时轮询'触发规则,平均延迟 3.2 秒,无法结合'用户是否在家'动态调整。
- 跨品牌兼容差,联而不动:多品牌设备(如格力空调 + 小米窗帘)仅 35% 实现跨品牌联动。
2.2 解决方案:Flink SQL 驱动的动态联动引擎
基于 Flink 构建'状态流 + 广播规则流'的联动引擎,核心逻辑是'设备状态实时感知 + 联动规则动态更新 + 多条件智能匹配'。
2.2.1 核心依赖(pom.xml 关键配置)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
</parent>
<groupId>com.smarthome</groupId>
<artifactId>smart-home-bigdata</artifactId>
<version>1.0.0</version>
<properties>
<java.version>17</java.version>
<flink.version>1.18.0</flink.version>
<kafka.version>3.5.1</kafka.version>
</properties>
<dependencies>
<dependency>
org.springframework.boot
spring-boot-starter
org.apache.flink
flink-streaming-java
${flink.version}
provided
org.apache.flink
flink-connector-kafka
${flink.version}
org.apache.calcite
calcite-core
1.34.0
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.5
2.2.2 关键工具类:KafkaSourceBuilder
package com.smarthome.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class KafkaSourceBuilder {
private static final Logger log = LoggerFactory.getLogger(KafkaSourceBuilder.class);
public static <T> DataStream<T> build(StreamExecutionEnvironment env, String topic, String groupId, DeserializationSchema<T> deserializer) {
if (env == null || topic == null || groupId == null || deserializer == null) {
throw new IllegalArgumentException("参数不可为空");
}
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-node1:9092,kafka-node2:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
FlinkKafkaConsumer<T> kafkaConsumer = <>(topic, deserializer, props);
env.addSource(kafkaConsumer).name( + topic).uid( + topic);
}
}
2.2.3 关键工具类:DeviceControlSink
package com.smarthome.sink;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeviceControlSink extends RichSinkFunction<String> {
private static final Logger log = LoggerFactory.getLogger(DeviceControlSink.class);
private final String brokerUrl;
private final String clientId;
private final String username;
private final String password;
private final int qos;
private MqttClient mqttClient;
private static final int MAX_RETRY = 3;
private static final long[] RETRY_INTERVALS = {1000, 2000, 4000};
public DeviceControlSink(String brokerUrl) {
this.brokerUrl = brokerUrl;
this.clientId = + System.currentTimeMillis();
.username = ;
.password = ;
.qos = ;
}
Exception {
.open(parameters);
();
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setAutomaticReconnect();
connOpts.setConnectionTimeout();
connOpts.setKeepAliveInterval();
connOpts.setCleanSession();
mqttClient = (brokerUrl, clientId, ());
mqttClient.setCallback( () {
{
log.error(, cause);
}
{}
{
(!token.isComplete()) {
log.error();
}
}
});
;
(connectRetry < MAX_RETRY) {
{
(!mqttClient.isConnected()) {
mqttClient.connect(connOpts);
;
}
} (MqttException e) {
connectRetry++;
Thread.sleep(RETRY_INTERVALS[connectRetry - ]);
}
}
}
Exception {
(controlCmd == || !mqttClient.isConnected()) {
();
}
JSONObject.parseObject(controlCmd);
cmdJson.getString();
+ deviceId;
(controlCmd.getBytes());
message.setQos(qos);
message.setRetained();
mqttClient.publish(topic, message);
}
Exception {
.close();
(mqttClient != && mqttClient.isConnected()) {
mqttClient.disconnect();
mqttClient.close();
}
}
}
2.2.4 动态联动核心 Job(Flink 1.18.0 生产版)
package com.smarthome.flink.job;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.smarthome.entity.DeviceStatus;
import com.smarthome.entity.LinkageRule;
import com.smarthome.source.KafkaSourceBuilder;
import com.smarthome.sink.DeviceControlSink;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.state.MapStateDescriptor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
public class DeviceLinkageJob {
private static final Logger log = LoggerFactory.getLogger(DeviceLinkageJob.class);
private static final MapStateDescriptor<String, LinkageRule> RULE_STATE_DESC = new MapStateDescriptor<>("linkage-rule-state", String.class, LinkageRule.class);
Exception {
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing();
env.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE);
env.setParallelism();
DataStream<DeviceStatus> deviceStatusStream = KafkaSourceBuilder.build(
env, , , ())
.filter(jsonStr -> jsonStr != && !jsonStr.isEmpty())
.map( <String, DeviceStatus>() {
DeviceStatus {
{
JSONObject.parseObject(jsonStr, DeviceStatus.class);
} (Exception e) {
;
}
}
})
.filter(status -> status != )
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds())
.withTimestampAssigner((status, ts) -> status.getUpdateTime()));
DataStream<LinkageRule> ruleStream = KafkaSourceBuilder.build(
env, , , ())
.filter(jsonStr -> jsonStr != && !jsonStr.isEmpty())
.map( <String, LinkageRule>() {
LinkageRule {
{
JSONObject.parseObject(jsonStr, LinkageRule.class);
(rule.getConditionSql() == || rule.getActionJson() == ) ;
(!validateSql(rule.getConditionSql())) ;
rule;
} (Exception e) {
;
}
}
})
.filter(rule -> rule != );
BroadcastStream<LinkageRule> broadcastRuleStream = ruleStream.broadcast(RULE_STATE_DESC);
DataStream<String> controlStream = deviceStatusStream
.connect(broadcastRuleStream)
.process( <DeviceStatus, LinkageRule, String>() {
Exception {
(status.getIsOnline() != ) ;
(LinkageRule rule : ctx.getBroadcastState(RULE_STATE_DESC).values()) {
(rule.getIsEnable() != ) ;
(rule.getUserId() != && !rule.getUserId().equals(status.getUserId())) ;
buildConditionSql(rule.getConditionSql(), status);
evaluateCondition(conditionSql);
(isTrigger) {
log.info();
generateControlCmds(rule, status, out);
}
}
}
Exception {
(rule.getIsEnable() == ) {
ctx.getBroadcastState(RULE_STATE_DESC).put(rule.getRuleId().toString(), rule);
} {
ctx.getBroadcastState(RULE_STATE_DESC).remove(rule.getRuleId().toString());
}
}
});
controlStream.addSink( ()).name().uid();
env.execute();
}
String {
Map<String, String> varMap = <>();
varMap.put(, + status.getDeviceId() + );
varMap.put(, + status.getDeviceType() + );
varMap.put(, String.valueOf(status.getValue()));
templateSql;
(Map.Entry<String, String> entry : varMap.entrySet()) {
executableSql = executableSql.replace(entry.getKey(), entry.getValue());
}
executableSql;
}
{
{
SqlParser.create(conditionSql, SqlParser.config().withCaseSensitive());
org.apache.calcite.sql. parser.parseQuery();
org.apache.calcite.sql.validate. SqlValidatorUtil.newValidator(, , , org.apache.calcite.sql.validate.SqlValidator.Config.DEFAULT).validate(sqlNode);
(!(validatedNode org.apache.calcite.sql.SqlLiteral)) ;
org.apache.calcite.sql. (org.apache.calcite.sql.SqlLiteral) validatedNode;
literal.getValueAs(Boolean.class);
} (Exception e) {
;
}
}
{
{
SqlParser.create(sql, SqlParser.config().withCaseSensitive());
parser.parseQuery();
;
} (Exception e) {
;
}
}
{
{
JSONArray.parseArray(rule.getActionJson());
(Object actionObj : actions) {
(JSONObject) actionObj;
();
controlCmd.put(, action.getString());
controlCmd.put(, action.getString());
controlCmd.put(, action.getJSONObject());
controlCmd.put(, rule.getRuleId());
controlCmd.put(, System.currentTimeMillis());
out.collect(controlCmd.toString());
}
} (Exception e) {
log.error(, e);
}
}
}
2.3 真实案例:北京望京 SOHO 公寓'起床场景'动态联动
2.3.1 需求背景
- 窗帘从 7:00 开始匀速拉开,10 分钟内开到 100%。
- 空调从睡眠模式(20℃)自动切换到舒适模式(26℃)。
- 热水器提前预热到 50℃。
- 周末自动禁用规则。
- 出差时(手机 24 小时没连家里 WiFi),所有设备暂停联动。
- 雨天时窗帘只开 50%。
2.3.2 规则配置与执行流程
| 规则配置项 | 具体内容 | 配置逻辑说明 |
|---|
| 规则名称 | 起床场景联动(主卧) | 按房间 + 场景命名 |
| 触发条件 | 1. 时间:周一至周五 7:00-7:10 2. 设备:主卧温湿度传感器有数据 3. 设备:WiFi 传感器检测到手机连接 | 时间 + 设备状态 + 用户在场三重校验 |
| 执行动作 | 1. 窗帘:开至 100% 2. 空调:模式舒适,温度 26℃ 3. 热水器:温度 50℃ | 动作参数与设备型号匹配 |
| 例外条件 | 1. 周末禁用 2. 手机 WiFi 断开 24 小时禁用 3. 雨天窗帘开 50% | 覆盖特殊场景 |
2.3.3 底层规则 SQL 与动作 JSON
"device_type='temperature_sensor' AND room_id='master_bedroom' AND hour=7 AND minute BETWEEN 0 AND 10 AND day_of_week BETWEEN 1 AND 5 AND (SELECT status FROM dws_device_real_time WHERE device_id='WIFI-1001' AND update_time>UNIX_TIMESTAMP()-86400*1000)='connected' AND NOT (SELECT status FROM dws_device_real_time WHERE device_id='WEATHER-1001' AND update_time>UNIX_TIMESTAMP()-3600*1000)='rain'"
-- 执行动作 JSON
[ {"deviceId":"DUYA-DT82-1001","action":"set_open","param":{"speed":10,"target":100,"duration":600}}, {"deviceId":"GREE-KFR-35-1001","action":"set_mode","param":{"mode":"comfort","temp":26}}, {"deviceId":"HAIER-EC60-1001","action":"set_temp","param":{
2.3.4 落地效果
| 指标 | 实测结果 |
|---|
| 联动响应延迟 | 180ms |
| 规则执行准确率 | 100% |
| 跨品牌兼容性 | 100% |
| 例外场景适配率 | 100% |
2.4 生产级优化:解决'规则匹配延迟飙升'问题
2.4.1 问题爆发场景
当小区用户规则总数突破 10 万条时,Flink Task 的规则匹配耗时从 12ms/条飙升至 86ms/条,联动延迟突破 300ms。
2.4.2 根因定位
- 遍历效率低下:每条设备状态需遍历所有 10 万条规则。
- SQL 重复解析:相同规则的条件 SQL 被不同设备状态重复解析为 AST。
- 状态存储无序:广播状态中的规则以 ruleId 为 key 无序存储。
2.4.3 优化方案落地
- 规则二级索引优化:优化后
Map<String, Map<String, LinkageRule>>(一级 key=userId+roomId,二级 key=ruleId)。
- SQL 预解析缓存:新增
sqlAstCache 缓存解析后的 AST。
- 规则优先级排序:新增
priority 字段,高频场景优先匹配。
2.4.4 优化前后对比
| 指标 | 优化前 | 优化后 |
|---|
| 单设备匹配耗时 | 86ms | 3ms |
| Task CPU 占用 | 85% | 35% |
| 规则遍历数量 | 10 万条/次 | 5 条/次 |
三、核心场景 2:场景化节能优化 —— 从'被动节能'到'预判调度'
3.1 行业痛点:传统节能的'伪命题'
- 预判缺失,被动节能:68% 的业主有'出门忘关设备'的经历。
- 体验牺牲,用户抵触:72% 的'节能模式'是'一刀切'操作。
- 政策脱节,成本不降:85% 的用户不知道所在地峰谷电价差异。
3.2 解决方案:'预测 - 调度 - 反馈'节能闭环
通过分析 180 天历史数据,用 ARIMA 模型预测未来 24 小时能耗需求,再用贪心算法生成'错峰用电 + 按需启停'的调度计划。
3.2.1 节能架构核心流程

3.2.2 核心数据模型
3.2.2.1 能耗数据实体类(EnergyConsumption)
package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class EnergyConsumption implements Serializable {
private String deviceId;
private String deviceType;
private float energyKwh;
private int runDuration;
private long startTime;
private long endTime;
private String roomId;
private String userId;
private String communityId;
private String weather;
private float outdoorTemp;
}
3.2.2.2 节能调度计划实体类(EnergySchedule)
package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class EnergySchedule implements Serializable {
private Long scheduleId;
private String deviceId;
private String userId;
private int startHour;
private int endHour;
private String actionJson;
private float energyForecast;
private String priceType;
private int isExecuted;
private String executeTime;
private String createTime;
}
3.2.3 关键工具类:WeatherUtil
package com.smarthome.util;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WeatherUtil {
private static final Logger log = LoggerFactory.getLogger(WeatherUtil.class);
private static final String AMAP_WEATHER_URL = "https://restapi.amap.com/v3/weather/weatherInfo";
private static final String AMAP_API_KEY = "${amap.api.key}";
private static final int HTTP_TIMEOUT = 3000;
private static final RedisUtil REDIS_UTIL = SpringContextUtil.getBean(RedisUtil.class);
private static final String WEATHER_CACHE_KEY_PREFIX = ;
* ;
JSONObject {
(cityAdcode == || cityAdcode.isEmpty()) ;
WEATHER_CACHE_KEY_PREFIX + cityAdcode;
REDIS_UTIL.get(cacheKey);
(cacheValue != && !cacheValue.isEmpty()) {
JSONObject.parseObject(cacheValue);
}
;
;
{
String.format(, AMAP_WEATHER_URL, AMAP_API_KEY, cityAdcode);
(requestUrl);
httpClient = HttpClients.createDefault();
response = httpClient.execute(httpGet);
(response.getStatusLine().getStatusCode() == ) {
EntityUtils.toString(response.getEntity(), );
JSONObject.parseObject(responseStr);
(.equals(resultJson.getString())) {
resultJson.getJSONArray().getJSONObject();
REDIS_UTIL.set(cacheKey, weatherJson.toString(), CACHE_EXPIRE_SECONDS);
weatherJson;
}
}
} (Exception e) {
log.error(, e);
} {
{
(response != ) response.close();
(httpClient != ) httpClient.close();
} (Exception e) {
log.error(, e);
}
}
;
}
{
;
(.equals(weather) || .equals(weather)) factor += ;
(.equals(weather)) factor += ;
(outdoorTemp > ) factor += ;
(outdoorTemp < ) factor += ;
Math.max(, Math.min(, factor));
}
}
3.2.4 核心算法实现:ARIMA 能耗预测
package com.smarthome.algorithm;
import com.smarthome.entity.EnergyConsumption;
import com.smarthome.mapper.EnergyMapper;
import com.smarthome.util.RedisUtil;
import com.smarthome.util.WeatherUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.math3.linear.Array2DRowRealMatrix;
import org.apache.commons.math3.linear.RealMatrix;
import org.apache.commons.math3.optim.InitialGuess;
import org.apache.commons.math3.optim.MaxEval;
import org.apache.commons.math3.optim.PointValuePair;
import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.NelderMeadSimplex;
import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.SimplexOptimizer;
import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class ArimaEnergyPredictor {
private final EnergyMapper energyMapper;
private final RedisUtil redisUtil;
private static final int P = 2;
private static final int ;
;
;
;
[] predictHourlyEnergy(String userId, String deviceId, String cityAdcode) {
log.info();
+ userId + + deviceId;
redisUtil.get(cacheKey);
(cacheValue != && !cacheValue.isEmpty()) {
String[] strArray = cacheValue.split();
[] result = [strArray.length];
( ; i < strArray.length; i++) result[i] = Double.parseDouble(strArray[i]);
result;
}
List<EnergyConsumption> historyData = energyMapper.selectHourlyEnergy(userId, deviceId, HISTORY_DAYS);
(historyData.size() < * ) getDefaultPrediction(deviceId);
[] rawEnergy = [historyData.size()];
[] weatherFactors = [historyData.size()];
( ; i < historyData.size(); i++) {
historyData.get(i);
rawEnergy[i] = data.getEnergyKwh();
weatherFactors[i] = data.getWeather() != ? WeatherUtil.getWeatherFactor(data.getWeather(), data.getOutdoorTemp()) : ;
}
[] filteredEnergy = filterOutliers(rawEnergy);
[] diffEnergy = differencing(filteredEnergy, D);
[] arCoefficients = trainARModel(diffEnergy, P);
[] residuals = calculateARResiduals(diffEnergy, arCoefficients, P);
[] maCoefficients = trainMAModelWithMLE(residuals, Q);
[] predictDiff = predictDiffSequence(diffEnergy, residuals, arCoefficients, maCoefficients);
[] predictRaw = inverseDifferencing(filteredEnergy, predictDiff, D);
[] finalPredict = adjustWithFutureWeather(predictRaw, cityAdcode);
();
( v : finalPredict) cacheBuilder.append(v).append();
redisUtil.set(cacheKey, cacheBuilder.toString().substring(, cacheBuilder.length() - ), CACHE_EXPIRE_SECONDS);
finalPredict;
}
[] filterOutliers([] data) {
calculateAverage(data);
calculateStandardDeviation(data, mean);
List<Double> filteredList = <>();
( v : data) {
(v >= mean - * std && v <= mean + * std) filteredList.add(v);
filteredList.add(mean);
}
[] result = [filteredList.size()];
( ; i < filteredList.size(); i++) result[i] = filteredList.get(i);
result;
}
[] differencing([] data, d) {
[] result = data.clone();
( ; i < d; i++) {
[] temp = [result.length - ];
( ; j < temp.length; j++) temp[j] = result[j + ] - result[j];
result = temp;
}
result;
}
[] trainARModel([] diffData, p) {
diffData.length - p;
(n <= ) [p + ];
[][] x = [n][p + ];
[] y = [n];
( ; i < n; i++) {
x[i][] = ;
( ; j < p; j++) x[i][j + ] = diffData[i + p - - j];
y[i] = diffData[i + p];
}
();
regression.newSampleData(y, x);
regression.estimateRegressionParameters();
}
[] calculateARResiduals([] diffData, [] arCoeffs, p) {
diffData.length - p;
[] residuals = [n];
( ; i < n; i++) {
arCoeffs[];
( ; j < p; j++) arPredict += arCoeffs[j + ] * diffData[i + p - - j];
residuals[i] = diffData[i + p] - arPredict;
}
residuals;
}
[] trainMAModelWithMLE([] residuals, q) {
[] initialGuess = [q];
( ; i < q; i++) initialGuess[i] = ;
(params -> {
residuals.length;
;
[] epsilon = [n];
( q; i < n; i++) {
;
( ; j < q; j++) maPredict += params[j] * residuals[i - - j];
epsilon[i] = residuals[i] - maPredict;
sigmaSquared += Math.pow(epsilon[i], );
}
sigmaSquared /= (n - q);
- * (n - q) * Math.log( * Math.PI * sigmaSquared) - * (n - q);
-logLikelihood;
});
(, );
(initialGuess.length, );
optimizer.optimize( (), objectiveFunction, GoalType.MINIMIZE, (initialGuess), simplex);
result.getPoint();
}
[] predictDiffSequence([] diffData, [] residuals, [] arCoeffs, [] maCoeffs) {
[] predictDiff = [PREDICT_HOURS];
arCoeffs.length - ;
maCoeffs.length;
[] lastPDiff = [p];
System.arraycopy(diffData, diffData.length - p, lastPDiff, , p);
[] lastQResiduals = [q];
System.arraycopy(residuals, residuals.length - q, lastQResiduals, , q);
( ; i < PREDICT_HOURS; i++) {
arCoeffs[];
( ; j < p; j++) arPredict += arCoeffs[j + ] * lastPDiff[p - - j];
;
( ; j < q; j++) maCorrect += maCoeffs[j] * lastQResiduals[q - - j];
predictDiff[i] = arPredict + maCorrect;
System.arraycopy(lastPDiff, , lastPDiff, , p - );
lastPDiff[p - ] = predictDiff[i];
System.arraycopy(lastQResiduals, , lastQResiduals, , q - );
lastQResiduals[q - ] = predictDiff[i] - arPredict;
}
predictDiff;
}
[] inverseDifferencing([] originalData, [] predictDiff, d) {
[] result = predictDiff.clone();
( ; i < d; i++) {
[] temp = [result.length + ];
temp[] = originalData[originalData.length - - (d - - i)];
( ; j < result.length; j++) temp[j + ] = temp[j] + result[j];
result = temp;
}
[] finalResult = [PREDICT_HOURS];
System.arraycopy(result, result.length - PREDICT_HOURS, finalResult, , PREDICT_HOURS);
( ; i < finalResult.length; i++) {
finalResult[i] = Math.max(, finalResult[i]);
finalResult[i] = Math.round(finalResult[i] * ) / ;
}
finalResult;
}
[] adjustWithFutureWeather([] predictEnergy, String cityAdcode) {
WeatherUtil.getCityWeather(cityAdcode);
(weatherJson == ) predictEnergy;
weatherJson.getString();
weatherJson.getFloatValue();
WeatherUtil.getWeatherFactor(weather, outdoorTemp);
[] adjustedEnergy = [predictEnergy.length];
( ; i < predictEnergy.length; i++) {
adjustedEnergy[i] = Math.round(predictEnergy[i] * weatherFactor * ) / ;
}
adjustedEnergy;
}
String[] getCityPriceTimeSlots(String cityAdcode) {
String[] priceTypes = [];
(.equals(cityAdcode)) {
( ; i < ; i++) priceTypes[i] = (i >= && i < ) ? : ;
} (.equals(cityAdcode)) {
( ; i < ; i++) {
((i >= && i < ) || (i >= && i < )) priceTypes[i] = ;
((i >= && i < ) || (i >= && i < )) priceTypes[i] = ;
priceTypes[i] = ;
}
} (.equals(cityAdcode)) {
( ; i < ; i++) {
((i >= && i < ) || (i >= && i < )) priceTypes[i] = ;
((i >= && i < ) || (i >= && i < ) || (i >= && i < )) priceTypes[i] = ;
priceTypes[i] = ;
}
} {
( ; i < ; i++) priceTypes[i] = (i >= && i < ) ? : ;
}
priceTypes;
}
[] generateWaterHeaterSchedule(String userId, [] predictEnergy, String[] priceTypes) {
[] schedule = [];
[] waterUsageHours = getUserWaterUsageHours(userId);
( ; i < ; i++) {
(.equals(priceTypes[i])) schedule[i] = ;
{
;
( hour : waterUsageHours) (i == hour) isUsageHour = ;
schedule[i] = isUsageHour ? : ;
}
}
schedule;
}
[] generateAirConditionerSchedule(String userId, [] predictEnergy, String[] priceTypes) {
[] schedule = [];
[] homeHours = getUserHomeHours(userId);
( ; i < ; i++) {
(homeHours[i] == ) schedule[i] = ;
schedule[i] = ;
}
schedule;
}
[] generateWashingMachineSchedule([] predictEnergy, String[] priceTypes) {
[] schedule = [];
-;
;
( ; i < ; i++) {
(.equals(priceTypes[i]) && predictEnergy[i] > maxEnergy) {
maxEnergy = predictEnergy[i];
targetHour = i;
}
}
(targetHour != -) schedule[targetHour] = ;
schedule;
}
[] generateLightSchedule(String userId, [] predictEnergy) {
[] schedule = [];
[] homeHours = getUserHomeHours(userId);
( ; i < ; i++) {
(i >= || i < );
schedule[i] = (isNight && homeHours[i] == && predictEnergy[i] > ) ? : ;
}
schedule;
}
[] getUserWaterUsageHours(String userId) {
[] hours = [];
( ; i < ; i++) hours[i] = ;
( ; i < ; i++) hours[i] = ;
hours;
}
[] getUserHomeHours(String userId) {
[] hours = [];
( ; i < ; i++) hours[i] = ;
( ; i < ; i++) hours[i] = ;
hours;
}
{
(data == || data.length == ) ;
;
( v : data) sum += v;
sum / data.length;
}
{
(data == || data.length <= ) ;
;
( v : data) sum += Math.pow(v - mean, );
Math.sqrt(sum / (data.length - ));
}
{
(data == || data.length == ) ;
data[];
( v : data) (v > max) max = v;
max;
}
{
(data == || data.length == ) ;
data[];
( v : data) (v < min) min = v;
min;
}
[] getDefaultPrediction(String deviceId) {
[] defaultPred = [PREDICT_HOURS];
(deviceId.contains() || deviceId.contains() && deviceId.contains()) {
( ; i < ; i++) defaultPred[i] = (i >= && i < ) ? : ;
} (deviceId.contains() && deviceId.contains()) {
( ; i < ; i++) defaultPred[i] = (i >= || i < ) ? : ;
} {
( ; i < ; i++) defaultPred[i] = ;
}
defaultPred;
}
}
3.2.5 节能调度执行 Job
package com.smarthome.flink.job;
import com.alibaba.fastjson.JSONObject;
import com.smarthome.entity.EnergySchedule;
import com.smarthome.source.KafkaSourceBuilder;
import com.smarthome.sink.DeviceControlSink;
import com.smarthome.util.SpringContextUtil;
import com.smarthome.mapper.EnergyScheduleMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Slf4j
@Component
public class EnergyScheduleExecuteJob {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private transient EnergyScheduleMapper scheduleMapper;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing();
env.getCheckpointConfig().setCheckpointStorage();
env.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE);
env.setParallelism();
DataStream<EnergySchedule> scheduleStream = KafkaSourceBuilder.build(
env, , , ())
.filter(jsonStr -> jsonStr != && !jsonStr.isEmpty())
.map( <String, EnergySchedule>() {
EnergySchedule {
{
JSONObject.parseObject(jsonStr, EnergySchedule.class);
} (Exception e) {
;
}
}
})
.filter(schedule -> schedule != && schedule.getIsExecuted() == && schedule.getDeviceId() != && schedule.getActionJson() != )
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds())
.withTimestampAssigner((schedule, ts) -> System.currentTimeMillis()));
DataStream<String> controlStream = scheduleStream
.process( <EnergySchedule, String>() {
Exception {
.open(parameters);
scheduleMapper = SpringContextUtil.getBean(EnergyScheduleMapper.class);
}
Exception {
LocalDateTime.now();
now.getHour();
now.format(DATE_TIME_FORMATTER);
isInTimeSlot;
(schedule.getStartHour() < schedule.getEndHour()) {
isInTimeSlot = currentHour >= schedule.getStartHour() && currentHour < schedule.getEndHour();
} {
isInTimeSlot = currentHour >= schedule.getStartHour() || currentHour < schedule.getEndHour();
}
(!isInTimeSlot) ;
scheduleMapper.updateExecutedStatus(schedule.getScheduleId(), currentTime);
(updateCount == ) ;
buildControlCmd(schedule, currentTime);
(controlCmd != ) out.collect(controlCmd);
}
});
controlStream.addSink( ()).name().uid();
env.execute();
}
String {
{
JSONObject.parseObject(schedule.getActionJson());
();
controlCmd.put(, schedule.getDeviceId());
controlCmd.put(, actionJson.getString());
controlCmd.put(, actionJson.getJSONObject());
controlCmd.put(, );
controlCmd.put(, schedule.getScheduleId());
controlCmd.put(, System.currentTimeMillis());
controlCmd.put(, executeTime);
controlCmd.toString();
} (Exception e) {
log.error(, e);
;
}
}
}
3.3 真实案例:上海仁恒河滨城'全屋家电错峰调度'
3.3.1 需求背景
- 热水器能在谷电时段加热,早上 6-8 点、晚上 18-22 点有热水。
- 空调在峰电时段别太费电,但温度不能低于 26℃。
- 洗衣机不用盯着,自动在便宜时段洗衣服。
- 每天能看到省了多少电、省了多少钱。
3.3.2 落地方案与执行细节
| 设备类型 | 调度时段 | 电价类型 | 执行动作 | 预测能耗(kWh) | 预计电费(元) |
|---|
| 海尔热水器 | 22:00-23:00 | 谷电 | 加热至 50℃,保温至次日 9:00 | 1.8 | 0.55 |
| 格力空调 | 6:30-8:30 | 峰电 | 温度 27℃,风速中挡 | 2.4 | 1.48 |
| 格力空调 | 18:00-22:00 | 峰电 + 谷电 | 18:00-20:00(峰)26℃;20:00-22:00(谷)24℃ | 4.6 | 1.848 |
| 西门子洗衣机 | 0:00-1:00 | 谷电 | 标准洗程序 | 0.5 | 0.15 |
3.3.3 落地效果
| 指标 | 优化前 | 优化后 | 提升幅度 |
|---|
| 日均总能耗 | 12.6 kWh | 8.3 kWh | -34.1% |
| 峰电时段能耗占比 | 78% | 32% | -59.0% |
| 日均电费 | 7.77 元 | 3.82 元 | -50.8% |
| 设备运行效率 | 随机运行 | 按需启停 | -33.3% |
3.3.4 节能报告示例
【6 月 30 日节能报告】
🏠 家庭:上海仁恒河滨城 12-302
🔋 当日能耗:8.1 kWh(环比 -2.4%,同比 -35.7%)
💰 当日电费:3.76 元(环比 -1.6%,同比 -51.2%)
🤑 当月节省:118.5 元
🌱 减少碳排放:约 6.3kg
📊 设备能耗占比:
空调:4.2 kWh(51.9%)→ 同比 -24.5%
热水器:2.1 kWh(25.9%)→ 同比 -32.3%
洗衣机:0.5 kWh(6.2%)→ 同比 -0%
💡 明日节能建议:
明天有小雨(25-29℃),空调可调至 27℃,预计再省 0.3 kWh
洗衣机可提前至 23:00 执行,避开凌晨用电高峰
3.4 生产级优化:解决'ARIMA 模型预测准确率低'问题
3.4.1 问题爆发场景
- 极端天气:6 月 15 日上海高温 38℃,模型预测空调能耗 4.2kWh/天,实际达 5.2kWh,偏差率 24%。
- 用户行为突变:业主出差 3 天,模型仍按正常作息预测能耗 3.8kWh/天,实际仅 0.8kWh,偏差率 78.9%。
3.4.2 根因定位
- 特征维度单一:仅输入'历史能耗'一个特征。
- 模型静态固化:用固定 180 天数据训练一次模型。
- 异常数据污染:设备故障、数据采集错误的异常值未过滤。
3.4.3 优化方案落地
- 特征工程升级:新增环境特征(天气)、行为特征(在家/出差)、时间特征(季节/周末)。
- 模型动态迭代:每 7 天触发一次模型更新,新增最近 1 天数据,淘汰最早 1 天数据。
- 数据清洗强化:三级过滤流程(有效性过滤、异常值过滤、标签修正)。
3.4.4 优化效果对比
| 指标 | 优化前(V1.0) | 优化后(V3.0) | 提升幅度 |
|---|
| 平均预测偏差率 | 18.3% | 4.2% | -77.0% |
| 极端天气偏差率 | 24.1% | 6.8% | -71.8% |
| 用户行为突变偏差率 | 21.7% | 5.3% | -75.6% |
| 模型训练耗时 | 12 分钟 | 4.5 分钟 | -62.5% |
四、技术挑战与生产级避坑指南
4.1 挑战 1:设备数据倾斜
4.1.1 问题场景
10% 的高频设备集中在 Flink Task 3,导致该 Task CPU 占用率持续 100%,联动延迟从 180ms 飙升至 3.2 秒。
4.1.2 根因分析
- Key 分布不均:设备状态流按 deviceId 分区,高频设备集中映射到同一 Task。
- 数据量差异大:高频设备日均上报 8.6 万条数据,低频设备仅 2880 条。
- 资源分配固化:所有 Task 均分配 2 核 CPU。
4.1.3 避坑方案
- 数据降频分级:按设备活跃度动态降频,设备静置 30 分钟后,上报频率从 1 秒/次降至 30 秒/次。
- Key 打散与重分区:原始分区 Key deviceId → 打散 Key deviceId + "_" + (updateTime % 8)。
- 资源动态调整:启用 Flink ResourceManager,支持动态扩缩容。
4.1.4 避坑效果
| 指标 | 优化前 | 优化后 | 提升幅度 |
|---|
| 热点 Task CPU 占用 | 100% | 45% | -55% |
| 设备联动延迟(99 分位) | 3200ms | 150ms | -95.3% |
| 单 Task 最大数据量 | 8.6 万条/天 | 1.2 万条/天 | -86% |
| 集群支撑设备上限 | 5 万台 | 50 万台 | +900% |
4.2 挑战 2:MQTT 指令丢失
4.2.1 问题场景
设备控制指令丢失率达 5.2%,主要表现为:窗帘接收到指令但未执行、EMQX Broker 因连接数过载拒绝新指令投递。
4.2.2 避坑方案
- MQTT 协议与 Broker 优化:QoS 等级升级至 QoS=1,Broker 集群扩容,启用 SSL 加密。
- 指令持久化与重试机制:新增 MySQL 表存储指令内容、状态,三级重试策略(即时重试、延迟重试、离线补推)。
- 流量削峰与限流:用 Redis 做指令缓存,高峰期每秒限流 5000 条。
4.2.3 避坑效果
| 指标 | 优化前 | 优化后 | 提升幅度 |
|---|
| 指令丢失率 | 5.2% | 0.08% | -98.5% |
| Broker 连接成功率 | 88% | 99.99% | +13.6% |
| 高峰期指令延迟 | 1200ms | 150ms | -87.5% |
| 用户投诉率 | 15% | 0% | -100% |
4.3 挑战 3:数据安全与隐私保护
4.3.1 问题场景
- 日志中明文打印用户家庭住址、WiFi 密码。
- 运维人员可查询任意用户的行为数据。
- 设备原始数据直接上传云端。
4.3.2 避坑方案
- 数据脱敏分级:高敏感数据 AES-256 加密存储,中敏感数据部分脱敏。
- 权限严格管控:基于 RBAC 模型,普通用户仅能查询自己家数据,操作审计记录留存 3 年。
- 边缘侧预处理:在边缘 MQTT 网关完成数据预处理,仅上传状态,不上传原始日志。
4.3.3 避坑效果
- 合规认证:通过国家信息安全等级保护三级认证。
- 安全事件:2024.2-7 月无数据泄露、权限越权事件。
- 用户信任:用户隐私保护满意度从 78% 升至 96%。
总结
通过架构优化、动态联动引擎及预测性节能调度,Java 大数据方案显著降低了能耗与延迟。从架构设计到代码部署,从场景实现到合规避坑,技术价值在于解决用户真实痛点,让用户感受不到技术的存在,却能实实在在享受便利。