跳到主要内容
Java 大数据在智能家居设备联动与场景化节能中的应用 | 极客日志
Java AI java 算法
Java 大数据在智能家居设备联动与场景化节能中的应用 介绍基于 Java 大数据技术的智能家居解决方案。架构采用 Flink、Kafka、ClickHouse 实现百万级设备接入与实时计算。核心场景包括动态联动引擎,通过 Flink SQL 和 Calcite 解决规则匹配延迟问题;场景化节能优化,利用 ARIMA 模型预测能耗并结合峰谷电价调度。生产实践涵盖数据倾斜处理、MQTT 指令可靠性保障及隐私合规方案。实测显示联动延迟降至 180ms,节能率达 34.1%。
小熊软糖 发布于 2026/4/6 更新于 2026/5/22 26 浏览一、技术基石:Java 大数据赋能智能家居的'三位一体'架构
要实现'设备联动 + 场景节能',必须先解决三个核心问题:设备数据怎么稳定收?联动规则怎么快速算?节能策略怎么精准优? 基于 Java 生态构建的'采集 - 计算 - 决策'三位一体架构,经多项目压测验证,可支撑百万级设备并发接入,实时计算延迟≤500ms。
1.1 架构全景图
1.2 核心技术栈选型与生产配置
技术层级 组件名称 版本 核心用途 生产配置细节 数据采集 Java MQTT Client 1.2.5 边缘设备数据接入 SSL 加密,QoS=1,心跳 30 秒,连接池大小 50 Flink CDC 2.4.0 云端设备状态同步 捕获 MySQL binlog(ROW 格式),增量同步 Kafka 3.5.1 用户行为与设备事件采集 3 节点集群,replica=3,分区数 32 数据存储 ClickHouse 23.12.4.11 实时设备状态存储 3 节点集群,单表分区 100+,查询延迟≤180ms Hive 3.1.3 历史能耗与行为数据存储 ORC 压缩,分区字段 dt+device_type Redis Cluster 7.0.12 热点数据缓存 6 节点(3 主 3 从),淘汰策略 volatile-lru 计算引擎 Flink 1.18.0 实时联动与监控 并行度 12,Checkpoint 3 分钟 / 次,RocksDB 状态后端 Spark 3.4.1 离线建模与预测 executor.cores=4,executor.memory=8g,动态资源分配 应用层 Spring Boot 3.2.5 后端服务框架 线程池核心数 20,最大 40,超时时间 3 秒 MQTT Broker(EMQX) 5.1.6 设备控制指令下发 8 节点集群,最大连接数 100 万
1.3 核心数据模型(POJO 类,附表结构与业务含义)
1.3.1 设备状态实体类(对应 ClickHouse 实时表)
com.smarthome.entity;
lombok.Data;
java.io.Serializable;
{
String deviceId;
String deviceType;
String status;
value;
updateTime;
isOnline;
String roomId;
String communityId;
String userId;
}
package
import
import
@Data
public
class
DeviceStatus
implements
Serializable
private
private
private
private
float
private
long
private
int
private
private
private
1.3.2 联动规则实体类(对应 MySQL 配置表) package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class LinkageRule implements Serializable {
private Long ruleId;
private String ruleName;
private String conditionSql;
private String actionJson;
private int isEnable;
private String sceneType;
private String userId;
private String createTime;
private String updateTime;
}
1.3.3 缺失工具类补充:SpringContextUtil(生产必用) package com.smarthome.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext (ApplicationContext context) throws BeansException {
applicationContext = context;
}
public static <T> T getBean (Class<T> clazz) {
if (applicationContext == null ) {
throw new RuntimeException ("SpringContext 未初始化" );
}
return applicationContext.getBean(clazz);
}
public static <T> T getBean (String beanName, Class<T> clazz) {
if (applicationContext == null ) {
throw new RuntimeException ("SpringContext 未初始化" );
}
return applicationContext.getBean(beanName, clazz);
}
}
二、核心场景 1:动态联动引擎 —— 从'固定规则'到'数据驱动'
2.1 行业痛点:传统联动的'三大死穴'
规则刚性,不会'变通' :定时关窗帘在出差时仍执行,无法结合上下文动态调整。
无上下文感知,响应滞后 :依赖'定时轮询'触发规则,平均延迟 3.2 秒。
跨品牌兼容差,联而不动 :多品牌设备仅 35% 实现跨品牌联动。
2.2 解决方案:Flink SQL 驱动的动态联动引擎 基于 Flink 构建'状态流 + 广播规则流'的联动引擎,核心逻辑是'设备状态实时感知 + 联动规则动态更新 + 多条件智能匹配'。
2.2.1 核心依赖(pom.xml 关键配置) <dependencies >
<dependency >
<groupId > org.apache.flink</groupId >
<artifactId > flink-streaming-java</artifactId >
<version > ${flink.version}</version >
<scope > provided</scope >
</dependency >
<dependency >
<groupId > org.apache.flink</groupId >
<artifactId > flink-connector-kafka</artifactId >
<version > ${flink.version}</version >
</dependency >
<dependency >
<groupId > org.apache.calcite</groupId >
<artifactId > calcite-core</artifactId >
<version > ${calcite.version}</version >
</dependency >
<dependency >
<groupId > org.eclipse.paho</groupId >
<artifactId > org.eclipse.paho.client.mqttv3</artifactId >
<version > ${mqtt.version}</version >
</dependency >
</dependencies >
2.2.2 关键工具类:KafkaSourceBuilder package com.smarthome.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaSourceBuilder {
private static final Logger log = LoggerFactory.getLogger(KafkaSourceBuilder.class);
public static <T> DataStream<T> build (StreamExecutionEnvironment env, String topic, String groupId, DeserializationSchema<T> deserializer) {
if (env == null || topic == null || groupId == null || deserializer == null ) {
throw new IllegalArgumentException ("参数不可为空" );
}
Properties props = new Properties ();
props.setProperty(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-node1:9092,kafka-node2:9092,kafka-node3:9092" );
props.setProperty(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest" );
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer <>(topic, deserializer, props);
return env.addSource(kafkaConsumer).name("Kafka-Source-" + topic).uid("kafka-source-" + topic);
}
}
2.2.3 关键工具类:DeviceControlSink(MQTT 设备控制) package com.smarthome.sink;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeviceControlSink extends RichSinkFunction <String> {
private static final Logger log = LoggerFactory.getLogger(DeviceControlSink.class);
private final String brokerUrl;
private final String clientId;
private final String username;
private final String password;
private final int qos;
private MqttClient mqttClient;
private static final int MAX_RETRY = 3 ;
private static final long [] RETRY_INTERVALS = {1000 , 2000 , 4000 };
public DeviceControlSink (String brokerUrl) {
this .brokerUrl = brokerUrl;
this .clientId = "device-control-" + System.currentTimeMillis();
this .username = "device-control" ;
this .password = "control@2024_Smarthome" ;
this .qos = 1 ;
}
@Override
public void open (Configuration parameters) throws Exception {
super .open(parameters);
MqttConnectOptions connOpts = new MqttConnectOptions ();
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setAutomaticReconnect(true );
connOpts.setConnectionTimeout(30 );
connOpts.setKeepAliveInterval(60 );
connOpts.setCleanSession(true );
mqttClient = new MqttClient (brokerUrl, clientId, new MemoryPersistence ());
mqttClient.setCallback(new MqttCallback () {
@Override
public void connectionLost (Throwable cause) {
log.error("MQTT 连接断开" , cause);
}
@Override
public void messageArrived (String topic, MqttMessage message) {}
@Override
public void deliveryComplete (IMqttDeliveryToken token) {
if (!token.isComplete()) {
log.error("设备控制指令投递失败" );
}
}
});
int connectRetry = 0 ;
while (connectRetry < MAX_RETRY) {
try {
if (!mqttClient.isConnected()) {
mqttClient.connect(connOpts);
break ;
}
} catch (MqttException e) {
connectRetry++;
Thread.sleep(RETRY_INTERVALS[connectRetry - 1 ]);
}
}
}
@Override
public void invoke (String controlCmd, Context context) throws Exception {
if (controlCmd == null || !mqttClient.isConnected()) {
throw new RuntimeException ("MQTT 连接已断开" );
}
JSONObject cmdJson = JSONObject.parseObject(controlCmd);
String deviceId = cmdJson.getString("deviceId" );
String topic = "device/control/" + deviceId;
MqttMessage message = new MqttMessage (controlCmd.getBytes("UTF-8" ));
message.setQos(qos);
message.setRetained(false );
int retryCount = 0 ;
while (retryCount < MAX_RETRY) {
try {
mqttClient.publish(topic, message);
break ;
} catch (MqttException e) {
retryCount++;
Thread.sleep(RETRY_INTERVALS[retryCount - 1 ]);
}
}
}
@Override
public void close () throws Exception {
super .close();
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
mqttClient.close();
}
}
}
2.2.4 动态联动核心 Job(Flink 1.18.0 生产版) package com.smarthome.flink.job;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.smarthome.entity.DeviceStatus;
import com.smarthome.entity.LinkageRule;
import com.smarthome.source.KafkaSourceBuilder;
import com.smarthome.sink.DeviceControlSink;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.tools.Frameworks;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.state.MapStateDescriptor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
public class DeviceLinkageJob {
private static final Logger log = LoggerFactory.getLogger(DeviceLinkageJob.class);
private static final MapStateDescriptor<String, LinkageRule> RULE_STATE_DESC = new MapStateDescriptor <>("linkage-rule-state" , String.class, LinkageRule.class);
private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser.config().withCaseSensitive(false );
private static final FrameworkConfig FRAMEWORK_CONFIG = Frameworks.newConfigBuilder().build();
private static final org.apache.calcite.sql.validate.SqlValidator SQL_VALIDATOR = SqlValidatorUtil.newValidator(null , null , FRAMEWORK_CONFIG.getTypeFactory(), org.apache.calcite.sql.validate.SqlValidator.Config.DEFAULT);
public static void main (String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(180000 );
env.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(12 );
DataStream<DeviceStatus> deviceStatusStream = KafkaSourceBuilder.build(env, "device_status_topic" , "device-linkage-status-group" , new SimpleStringSchema ())
.filter(jsonStr -> jsonStr != null && !jsonStr.isEmpty())
.map(new MapFunction <String, DeviceStatus>() {
@Override
public DeviceStatus map (String jsonStr) throws Exception {
return JSONObject.parseObject(jsonStr, DeviceStatus.class);
}
})
.filter(status -> status != null )
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1 )).withTimestampAssigner((status, ts) -> status.getUpdateTime()));
DataStream<LinkageRule> ruleStream = KafkaSourceBuilder.build(env, "linkage_rule_cdc_topic" , "device-linkage-rule-group" , new SimpleStringSchema ())
.filter(jsonStr -> jsonStr != null && !jsonStr.isEmpty())
.map(new MapFunction <String, LinkageRule>() {
@Override
public LinkageRule map (String jsonStr) throws Exception {
LinkageRule rule = JSONObject.parseObject(jsonStr, LinkageRule.class);
if (rule.getConditionSql() == null || rule.getActionJson() == null ) return null ;
if (!validateSql(rule.getConditionSql())) return null ;
return rule;
}
}).filter(rule -> rule != null );
BroadcastStream<LinkageRule> broadcastRuleStream = ruleStream.broadcast(RULE_STATE_DESC);
DataStream<String> controlStream = deviceStatusStream
.connect(broadcastRuleStream)
.process(new BroadcastProcessFunction <DeviceStatus, LinkageRule, String>() {
@Override
public void processElement (DeviceStatus status, ReadOnlyContext ctx, Collector<String> out) throws Exception {
if (status.getIsOnline() != 1 ) return ;
for (LinkageRule rule : ctx.getBroadcastState(RULE_STATE_DESC).values()) {
if (rule.getIsEnable() != 1 ) continue ;
if (rule.getUserId() != null && !rule.getUserId().equals(status.getUserId())) continue ;
String conditionSql = buildConditionSql(rule.getConditionSql(), status);
boolean isTrigger = evaluateCondition(conditionSql);
if (isTrigger) {
generateControlCmds(rule, status, out);
}
}
}
@Override
public void processBroadcastElement (LinkageRule rule, Context ctx, Collector<String> out) throws Exception {
if (rule.getIsEnable() == 1 ) {
ctx.getBroadcastState(RULE_STATE_DESC).put(rule.getRuleId().toString(), rule);
} else {
ctx.getBroadcastState(RULE_STATE_DESC).remove(rule.getRuleId().toString());
}
}
});
controlStream.addSink(new DeviceControlSink ("ssl://mqtt-broker:8883" )).name("Device-Control-Sink" ).uid("device-control-sink" );
env.execute("Device Linkage Job V3.0" );
}
private static String buildConditionSql (String templateSql, DeviceStatus status) {
Map<String, String> varMap = new HashMap <>();
varMap.put("device_id" , "'" + status.getDeviceId() + "'" );
varMap.put("device_type" , "'" + status.getDeviceType() + "'" );
varMap.put("value" , String.valueOf(status.getValue()));
LocalDateTime now = LocalDateTime.now();
varMap.put("hour" , String.valueOf(now.getHour()));
varMap.put("minute" , String.valueOf(now.getMinute()));
varMap.put("day_of_week" , String.valueOf(now.getDayOfWeek().getValue()));
String executableSql = templateSql;
for (Map.Entry<String, String> entry : varMap.entrySet()) {
executableSql = executableSql.replace(entry.getKey(), entry.getValue());
}
return executableSql;
}
private static boolean evaluateCondition (String conditionSql) {
try {
SqlParser parser = SqlParser.create(conditionSql, SQL_PARSER_CONFIG);
SqlNode sqlNode = parser.parseQuery();
SqlNode validatedNode = SQL_VALIDATOR.validate(sqlNode);
if (!(validatedNode instanceof org.apache.calcite.sql.SqlLiteral)) return false ;
org.apache.calcite.sql.SqlLiteral literal = (org.apache.calcite.sql.SqlLiteral) validatedNode;
return literal.getValueAs(Boolean.class);
} catch (Exception e) {
return false ;
}
}
private static boolean validateSql (String sql) {
try {
SqlParser parser = SqlParser.create(sql, SQL_PARSER_CONFIG);
parser.parseQuery();
return true ;
} catch (Exception e) {
return false ;
}
}
private static void generateControlCmds (LinkageRule rule, DeviceStatus triggerStatus, Collector<String> out) {
try {
JSONArray actions = JSONArray.parseArray(rule.getActionJson());
for (Object actionObj : actions) {
JSONObject action = (JSONObject) actionObj;
JSONObject controlCmd = new JSONObject ();
controlCmd.put("deviceId" , action.getString("deviceId" ));
controlCmd.put("action" , action.getString("action" ));
controlCmd.put("param" , action.getJSONObject("param" ));
controlCmd.put("triggerRuleId" , rule.getRuleId());
controlCmd.put("triggerTime" , System.currentTimeMillis());
out.collect(controlCmd.toString());
}
} catch (Exception e) {
log.error("生成控制指令失败" , e);
}
}
}
2.3 真实案例:北京望京 SOHO 公寓'起床场景'动态联动
2.3.1 需求背景
每天早上 7 点起床,窗帘 10 分钟内开到 100%;
空调从睡眠模式自动切换到舒适模式(26℃);
热水器提前预热到 50℃;
周末自动禁用规则;
雨天时窗帘只开 50%。
2.3.2 规则配置与执行流程 "device_type='temperature_sensor' AND room_id='master_bedroom' AND hour=7 AND minute BETWEEN 0 AND 10 AND day_of_week BETWEEN 1 AND 5 AND (SELECT status FROM dws_device_real_time WHERE device_id='WIFI-1001' AND update_time>UNIX_TIMESTAMP()-86400*1000)='connected'"
[
{ "deviceId" : "DUYA-DT82-1001" , "action" : "set_open" , "param" : { "speed" : 10 , "target" : 100 } } ,
{ "deviceId" : "GREE-KFR-35-1001" , "action" : "set_mode" , "param" : { "mode" : "comfort" , "temp" : 26 } }
]
2.3.3 落地效果 指标 实测结果 联动响应延迟 180ms 规则执行准确率 100% 跨品牌兼容性 100%
2.4 生产级优化:解决'规则匹配延迟飙升'问题
遍历效率低下:每条设备状态需遍历所有 10 万条规则。
SQL 重复解析:相同规则的条件 SQL 被不同设备状态重复解析。
状态存储无序:广播状态中的规则以 ruleId 为 key 无序存储。
规则二级索引优化 :一级 key=userId+roomId,二级 key=ruleId。遍历量从 10 万条降至平均 5 条/次。
SQL 预解析缓存 :新增 ConcurrentHashMap 缓存 AST,解析次数减少 90%。
规则优先级排序 :高频场景优先匹配。
指标 优化前 优化后 单设备匹配耗时 86ms 3ms Task CPU 占用 85% 35%
三、核心场景 2:场景化节能优化 —— 从'被动节能'到'预判调度'
3.1 行业痛点:传统节能的'伪命题'
预判缺失,被动节能 :出门忘关设备,空转浪费。
体验牺牲,用户抵触 :强制降低温度导致体验差。
政策脱节,成本不降 :不知道峰谷电价差异。
3.2 解决方案:'预测 - 调度 - 反馈'节能闭环 通过 ARIMA 模型预测未来 24 小时能耗需求,再用贪心算法生成错峰用电调度计划。
3.2.1 节能架构核心流程
3.2.2 核心数据模型
3.2.2.1 能耗数据实体类(EnergyConsumption) package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class EnergyConsumption implements Serializable {
private String deviceId;
private String deviceType;
private float energyKwh;
private int runDuration;
private long startTime;
private long endTime;
private String roomId;
private String userId;
private String communityId;
private String weather;
private float outdoorTemp;
}
3.2.2.2 节能调度计划实体类(EnergySchedule) package com.smarthome.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class EnergySchedule implements Serializable {
private Long scheduleId;
private String deviceId;
private String userId;
private int startHour;
private int endHour;
private String actionJson;
private float energyForecast;
private String priceType;
private int isExecuted;
private String executeTime;
private String createTime;
}
3.2.3 关键工具类:WeatherUtil(高德天气 API 调用) package com.smarthome.util;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.CloseableHttpResponse;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WeatherUtil {
private static final Logger log = LoggerFactory.getLogger(WeatherUtil.class);
private static final String AMAP_WEATHER_URL = "https://restapi.amap.com/v3/weather/weatherInfo" ;
private static final String AMAP_API_KEY = "${amap.api.key}" ;
private static final int HTTP_TIMEOUT = 3000 ;
private static final RedisUtil REDIS_UTIL = SpringContextUtil.getBean(RedisUtil.class);
private static final String WEATHER_CACHE_KEY_PREFIX = "weather:city:" ;
private static final int CACHE_EXPIRE_SECONDS = 2 * 3600 ;
public static JSONObject getCityWeather (String cityAdcode) {
if (cityAdcode == null || cityAdcode.isEmpty()) return null ;
String cacheKey = WEATHER_CACHE_KEY_PREFIX + cityAdcode;
String cacheValue = REDIS_UTIL.get(cacheKey);
if (cacheValue != null && !cacheValue.isEmpty()) {
return JSONObject.parseObject(cacheValue);
}
CloseableHttpClient httpClient = null ;
CloseableHttpResponse response = null ;
try {
String requestUrl = String.format("%s?key=%s&city=%s&extensions=base" , AMAP_WEATHER_URL, AMAP_API_KEY, cityAdcode);
HttpGet httpGet = new HttpGet (requestUrl);
httpClient = HttpClients.createDefault();
response = httpClient.execute(httpGet);
if (response.getStatusLine().getStatusCode() == 200 ) {
String responseStr = EntityUtils.toString(response.getEntity(), "UTF-8" );
JSONObject resultJson = JSONObject.parseObject(responseStr);
if ("10000" .equals(resultJson.getString("status" ))) {
JSONObject weatherJson = resultJson.getJSONArray("lives" ).getJSONObject(0 );
REDIS_UTIL.set(cacheKey, weatherJson.toString(), CACHE_EXPIRE_SECONDS);
return weatherJson;
}
}
} catch (Exception e) {
log.error("获取天气信息异常" , e);
} finally {
try {
if (response != null ) response.close();
if (httpClient != null ) httpClient.close();
} catch (Exception e) {}
}
return null ;
}
public static float getWeatherFactor (String weather, float outdoorTemp) {
float factor = 1.0f ;
if ("rain" .equals(weather) || "snow" .equals(weather)) factor += 0.2f ;
else if ("cloudy" .equals(weather)) factor += 0.1f ;
if (outdoorTemp > 35 ) factor += 0.15f ;
else if (outdoorTemp < 5 ) factor += 0.15f ;
return Math.max(0.8f , Math.min(1.5f , factor));
}
}
3.2.4 核心算法实现:ARIMA 能耗预测 package com.smarthome.algorithm;
import com.smarthome.entity.EnergyConsumption;
import com.smarthome.mapper.EnergyMapper;
import com.smarthome.util.RedisUtil;
import com.smarthome.util.WeatherUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.math3.linear.Array2DRowRealMatrix;
import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.NelderMeadSimplex;
import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.SimplexOptimizer;
import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class ArimaEnergyPredictor {
private final EnergyMapper energyMapper;
private final RedisUtil redisUtil;
private static final int P = 2 ;
private static final int D = 1 ;
private static final int Q = 2 ;
private static final int PREDICT_HOURS = 24 ;
private static final int HISTORY_DAYS = 180 ;
public double [] predictHourlyEnergy(String userId, String deviceId, String cityAdcode) {
log.info("开始能耗预测|userId={}|deviceId={}" , userId, deviceId);
String cacheKey = "energy:predict:" + userId + "_" + deviceId;
String cacheValue = redisUtil.get(cacheKey);
if (cacheValue != null && !cacheValue.isEmpty()) {
String[] strArray = cacheValue.split("," );
double [] result = new double [strArray.length];
for (int i = 0 ; i < strArray.length; i++) result[i] = Double.parseDouble(strArray[i]);
return result;
}
List<EnergyConsumption> historyData = energyMapper.selectHourlyEnergy(userId, deviceId, HISTORY_DAYS);
if (historyData.size() < 30 * 24 ) return getDefaultPrediction(deviceId);
double [] rawEnergy = new double [historyData.size()];
double [] weatherFactors = new double [historyData.size()];
for (int i = 0 ; i < historyData.size(); i++) {
EnergyConsumption data = historyData.get(i);
rawEnergy[i] = data.getEnergyKwh();
weatherFactors[i] = data.getWeather() != null ? WeatherUtil.getWeatherFactor(data.getWeather(), data.getOutdoorTemp()) : 1.0f ;
}
double [] filteredEnergy = filterOutliers(rawEnergy);
double [] diffEnergy = differencing(filteredEnergy, D);
double [] arCoefficients = trainARModel(diffEnergy, P);
double [] residuals = calculateARResiduals(diffEnergy, arCoefficients, P);
double [] maCoefficients = trainMAModelWithMLE(residuals, Q);
double [] predictDiff = predictDiffSequence(diffEnergy, residuals, arCoefficients, maCoefficients);
double [] predictRaw = inverseDifferencing(filteredEnergy, predictDiff, D);
double [] finalPredict = adjustWithFutureWeather(predictRaw, cityAdcode);
StringBuilder cacheBuilder = new StringBuilder ();
for (double v : finalPredict) cacheBuilder.append(v).append("," );
redisUtil.set(cacheKey, cacheBuilder.toString().substring(0 , cacheBuilder.length() - 1 ), 3600 );
return finalPredict;
}
private double [] filterOutliers(double [] data) {
double mean = calculateAverage(data);
double std = calculateStandardDeviation(data, mean);
List<Double> filteredList = new ArrayList <>();
for (double v : data) {
if (v >= mean - 3 * std && v <= mean + 3 * std) filteredList.add(v);
else filteredList.add(mean);
}
double [] result = new double [filteredList.size()];
for (int i = 0 ; i < filteredList.size(); i++) result[i] = filteredList.get(i);
return result;
}
private double [] differencing(double [] data, int d) {
double [] result = data.clone();
for (int i = 0 ; i < d; i++) {
double [] temp = new double [result.length - 1 ];
for (int j = 0 ; j < temp.length; j++) temp[j] = result[j + 1 ] - result[j];
result = temp;
}
return result;
}
private double [] trainARModel(double [] diffData, int p) {
int n = diffData.length - p;
if (n <= 0 ) return new double [p + 1 ];
double [][] x = new double [n][p + 1 ];
double [] y = new double [n];
for (int i = 0 ; i < n; i++) {
x[i][0 ] = 1 ;
for (int j = 0 ; j < p; j++) x[i][j + 1 ] = diffData[i + p - 1 - j];
y[i] = diffData[i + p];
}
OLSMultipleLinearRegression regression = new OLSMultipleLinearRegression ();
regression.newSampleData(y, x);
return regression.estimateRegressionParameters();
}
private double [] calculateARResiduals(double [] diffData, double [] arCoeffs, int p) {
int n = diffData.length - p;
double [] residuals = new double [n];
for (int i = 0 ; i < n; i++) {
double arPredict = arCoeffs[0 ];
for (int j = 0 ; j < p; j++) arPredict += arCoeffs[j + 1 ] * diffData[i + p - 1 - j];
residuals[i] = diffData[i + p] - arPredict;
}
return residuals;
}
private double [] trainMAModelWithMLE(double [] residuals, int q) {
double [] initialGuess = new double [q];
for (int i = 0 ; i < q; i++) initialGuess[i] = 0.1 ;
ObjectiveFunction objectiveFunction = new ObjectiveFunction (params -> {
int n = residuals.length;
double sigmaSquared = 0.0 ;
double [] epsilon = new double [n];
for (int i = q; i < n; i++) {
double maPredict = 0.0 ;
for (int j = 0 ; j < q; j++) maPredict += params[j] * residuals[i - 1 - j];
epsilon[i] = residuals[i] - maPredict;
sigmaSquared += Math.pow(epsilon[i], 2 );
}
sigmaSquared /= (n - q);
double logLikelihood = -0.5 * (n - q) * Math.log(2 * Math.PI * sigmaSquared) - 0.5 * (n - q);
return -logLikelihood;
});
SimplexOptimizer optimizer = new SimplexOptimizer (1e-6 , 1e-8 );
NelderMeadSimplex simplex = new NelderMeadSimplex (initialGuess.length, 1.0 );
return optimizer.optimize(new org .apache.commons.math3.optim.MaxEval(1000 ), objectiveFunction, GoalType.MINIMIZE, new org .apache.commons.math3.optim.InitialGuess(initialGuess), simplex).getPoint();
}
private double [] predictDiffSequence(double [] diffData, double [] residuals, double [] arCoeffs, double [] maCoeffs) {
double [] predictDiff = new double [PREDICT_HOURS];
int p = arCoeffs.length - 1 ;
int q = maCoeffs.length;
double [] lastPDiff = new double [p];
System.arraycopy(diffData, diffData.length - p, lastPDiff, 0 , p);
double [] lastQResiduals = new double [q];
System.arraycopy(residuals, residuals.length - q, lastQResiduals, 0 , q);
for (int i = 0 ; i < PREDICT_HOURS; i++) {
double arPredict = arCoeffs[0 ];
for (int j = 0 ; j < p; j++) arPredict += arCoeffs[j + 1 ] * lastPDiff[p - 1 - j];
double maCorrect = 0.0 ;
for (int j = 0 ; j < q; j++) maCorrect += maCoeffs[j] * lastQResiduals[q - 1 - j];
predictDiff[i] = arPredict + maCorrect;
System.arraycopy(lastPDiff, 1 , lastPDiff, 0 , p - 1 );
lastPDiff[p - 1 ] = predictDiff[i];
System.arraycopy(lastQResiduals, 1 , lastQResiduals, 0 , q - 1 );
lastQResiduals[q - 1 ] = predictDiff[i] - arPredict;
}
return predictDiff;
}
private double [] inverseDifferencing(double [] originalData, double [] predictDiff, int d) {
double [] result = predictDiff.clone();
for (int i = 0 ; i < d; i++) {
double [] temp = new double [result.length + 1 ];
temp[0 ] = originalData[originalData.length - 1 - (d - 1 - i)];
for (int j = 0 ; j < result.length; j++) temp[j + 1 ] = temp[j] + result[j];
result = temp;
}
double [] finalResult = new double [PREDICT_HOURS];
System.arraycopy(result, result.length - PREDICT_HOURS, finalResult, 0 , PREDICT_HOURS);
for (int i = 0 ; i < finalResult.length; i++) {
finalResult[i] = Math.max(0.0 , finalResult[i]);
finalResult[i] = Math.round(finalResult[i] * 100 ) / 100.0 ;
}
return finalResult;
}
private double [] adjustWithFutureWeather(double [] predictEnergy, String cityAdcode) {
JSONObject weatherJson = WeatherUtil.getCityWeather(cityAdcode);
if (weatherJson == null ) return predictEnergy;
String weather = weatherJson.getString("weather" );
float outdoorTemp = weatherJson.getFloatValue("temperature" );
float weatherFactor = WeatherUtil.getWeatherFactor(weather, outdoorTemp);
double [] adjustedEnergy = new double [predictEnergy.length];
for (int i = 0 ; i < predictEnergy.length; i++) {
adjustedEnergy[i] = Math.round(predictEnergy[i] * weatherFactor * 100 ) / 100.0 ;
}
return adjustedEnergy;
}
private double [] getDefaultPrediction(String deviceId) {
double [] defaultPred = new double [PREDICT_HOURS];
if (deviceId.contains("GREE" ) || deviceId.contains("MIDEA" ) && deviceId.contains("AC" )) {
for (int i = 0 ; i < 24 ; i++) defaultPred[i] = (i >= 6 && i < 22 ) ? 1.2 : 1.1 ;
} else if (deviceId.contains("HAIER" ) && deviceId.contains("EC" )) {
for (int i = 0 ; i < 24 ; i++) defaultPred[i] = (i >= 22 || i < 6 ) ? 0.8 : 0.1 ;
} else {
for (int i = 0 ; i < 24 ; i++) defaultPred[i] = 0.1 ;
}
return defaultPred;
}
private double calculateAverage (double [] data) {
if (data == null || data.length == 0 ) return 0.0 ;
double sum = 0.0 ;
for (double v : data) sum += v;
return sum / data.length;
}
private double calculateStandardDeviation (double [] data, double mean) {
if (data == null || data.length <= 1 ) return 0.0 ;
double sum = 0.0 ;
for (double v : data) sum += Math.pow(v - mean, 2 );
return Math.sqrt(sum / (data.length - 1 ));
}
}
3.2.5 节能调度执行 Job(Flink 实时执行) package com.smarthome.flink.job;
import com.alibaba.fastjson.JSONObject;
import com.smarthome.entity.EnergySchedule;
import com.smarthome.source.KafkaSourceBuilder;
import com.smarthome.sink.DeviceControlSink;
import com.smarthome.util.SpringContextUtil;
import com.smarthome.mapper.EnergyScheduleMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Slf4j
@Component
public class EnergyScheduleExecuteJob {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" );
private transient EnergyScheduleMapper scheduleMapper;
public static void main (String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(300000 );
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints/energy-schedule" );
env.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(8 );
DataStream<EnergySchedule> scheduleStream = KafkaSourceBuilder.build(env, "energy_schedule_topic" , "energy-schedule-execute-group" , new SimpleStringSchema ())
.filter(jsonStr -> jsonStr != null && !jsonStr.isEmpty())
.map(new MapFunction <String, EnergySchedule>() {
@Override
public EnergySchedule map (String jsonStr) throws Exception {
return JSONObject.parseObject(jsonStr, EnergySchedule.class);
}
})
.filter(schedule -> schedule != null && schedule.getIsExecuted() == 0 && schedule.getDeviceId() != null && schedule.getActionJson() != null )
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5 )).withTimestampAssigner((schedule, ts) -> System.currentTimeMillis()));
DataStream<String> controlStream = scheduleStream.process(new ProcessFunction <EnergySchedule, String>() {
@Override
public void open (Configuration parameters) throws Exception {
super .open(parameters);
scheduleMapper = SpringContextUtil.getBean(EnergyScheduleMapper.class);
}
@Override
public void processElement (EnergySchedule schedule, Context ctx, Collector<String> out) throws Exception {
LocalDateTime now = LocalDateTime.now();
int currentHour = now.getHour();
boolean isInTimeSlot;
if (schedule.getStartHour() < schedule.getEndHour()) {
isInTimeSlot = currentHour >= schedule.getStartHour() && currentHour < schedule.getEndHour();
} else {
isInTimeSlot = currentHour >= schedule.getStartHour() || currentHour < schedule.getEndHour();
}
if (!isInTimeSlot) return ;
int updateCount = scheduleMapper.updateExecutedStatus(schedule.getScheduleId(), now.format(DATE_TIME_FORMATTER));
if (updateCount == 0 ) return ;
String controlCmd = buildControlCmd(schedule, now.format(DATE_TIME_FORMATTER));
if (controlCmd != null ) out.collect(controlCmd);
}
});
controlStream.addSink(new DeviceControlSink ("ssl://mqtt-broker:8883" )).name("Energy-Schedule-Control-Sink" ).uid("energy-schedule-control-sink" );
env.execute("Energy Schedule Execute Job V2.0" );
}
private static String buildControlCmd (EnergySchedule schedule, String executeTime) {
try {
JSONObject actionJson = JSONObject.parseObject(schedule.getActionJson());
JSONObject controlCmd = new JSONObject ();
controlCmd.put("deviceId" , schedule.getDeviceId());
controlCmd.put("action" , actionJson.getString("action" ));
controlCmd.put("param" , actionJson.getJSONObject("param" ));
controlCmd.put("triggerType" , "energy_schedule" );
controlCmd.put("triggerScheduleId" , schedule.getScheduleId());
controlCmd.put("triggerTime" , System.currentTimeMillis());
controlCmd.put("executeTime" , executeTime);
return controlCmd.toString();
} catch (Exception e) {
log.error("构建节能控制指令失败" , e);
return null ;
}
}
}
3.3 真实案例:上海仁恒河滨城'全屋家电错峰调度'
3.3.1 需求背景
热水器能在谷电时段加热,早晚有热水;
空调在峰电时段别太费电,但温度不低于 26℃;
洗衣机自动在便宜时段洗衣服;
每天查看省了多少电。
3.3.2 调度计划生成 设备类型 调度时段 电价类型 执行动作 预计电费(元) 海尔热水器 22:00-23:00 谷电 加热至 50℃ 0.55 格力空调 6:30-8:30 峰电 温度 27℃ 1.48 西门子洗衣机 0:00-1:00 谷电 标准洗程序 0.15
3.3.3 落地效果 指标 优化前 优化后 提升幅度 日均总能耗 12.6 kWh 8.3 kWh -34.1% 日均电费 7.77 元 3.82 元 -50.8% 设备运行效率 随机运行 按需启停 -33.3%
3.4 生产级优化:解决'ARIMA 模型预测准确率低'问题
特征维度单一:仅输入历史能耗,未考虑天气、用户行为。
模型静态固化:未随季节变化更新。
异常数据污染:设备故障数据未过滤。
特征工程升级 :新增环境特征(天气)、行为特征(在家/出差)、时间特征。
模型动态迭代 :滑动窗口训练,每 7 天更新。
数据清洗强化 :三级过滤流程(有效性、异常值、标签修正)。
指标 优化前 优化后 平均预测偏差率 18.3% 4.2% 极端天气偏差率 24.1% 6.8%
四、技术挑战与生产级避坑指南
4.1 挑战 1:设备数据倾斜 问题场景: 10% 的高频设备集中在 Flink Task 3,CPU 100%,延迟飙升。
数据降频分级 :按设备活跃度动态降频。
Key 打散与重分区 :原始 Key 打散后聚合。
资源动态调整 :启用动态扩缩容,热点 Task 单独配置。
4.2 挑战 2:MQTT 指令丢失
协议与 Broker 优化 :QoS=1,Broker 集群扩容,SSL 加密。
指令持久化与重试机制 :先落库,三级重试策略。
流量削峰与限流 :Redis 缓存,指令合并。
4.3 挑战 3:数据安全与隐私保护 问题场景: 日志明文打印用户地址,存在隐私泄露风险。
数据脱敏分级 :高敏感数据 AES 加密,中敏感数据部分脱敏。
权限严格管控 :基于 RBAC 模型,操作审计。
边缘侧预处理 :边缘网关完成数据预处理,仅上传状态。
本文介绍了基于 Java 大数据技术的智能家居解决方案。架构采用 Flink、Kafka、ClickHouse 实现百万级设备接入与实时计算。核心场景包括动态联动引擎,通过 Flink SQL 和 Calcite 解决规则匹配延迟问题;场景化节能优化,利用 ARIMA 模型预测能耗并结合峰谷电价调度。生产实践涵盖数据倾斜处理、MQTT 指令可靠性保障及隐私合规方案。实测显示联动延迟降至 180ms,节能率达 34.1%。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online