CREATE TABLE device_energy_consumption (
device_id STRING TAG COMMENT '设备 ID',
device_type STRING TAG COMMENT '设备类型',
user_id STRING TAG COMMENT '用户 ID',
area_code STRING TAG COMMENT '区域编码',
power DOUBLE FIELD COMMENT '实时功率(W)',
energy DOUBLE FIELD COMMENT '累计能耗(kWh)',
run_status BOOLEAN FIELD COMMENT '运行状态',
collect_time TIMESTAMP COMMENT '采集时间'
) ENGINE=InfluxDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE weather_data (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
area_code STRING NOT NULL COMMENT '区域编码',
temperature DOUBLE NOT NULL COMMENT '温度(℃)',
humidity DOUBLE NOT NULL COMMENT '湿度(%)',
weather_type STRING NOT NULL COMMENT '天气类型',
forecast_time TIMESTAMP NOT NULL COMMENT '预报时间',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_area_forecast (area_code, forecast_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE electricity_price (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
area_code STRING NOT NULL COMMENT '区域编码',
hour INT NOT NULL COMMENT '小时(0-23)',
price_type TINYINT NOT NULL COMMENT '电价类型',
price DOUBLE NOT NULL COMMENT '电价(元/kWh)',
effective_date DATE NOT NULL COMMENT '生效日期',
expire_date DATE COMMENT '失效日期',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_area_hour_date (area_code, hour, effective_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE energy_forecast_result (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id STRING NOT NULL COMMENT '用户 ID',
forecast_date DATE NOT NULL COMMENT '预测日期',
forecast_hour INT NOT NULL COMMENT '预测小时',
total_energy DOUBLE NOT NULL COMMENT '预测总能耗',
accuracy DOUBLE NOT NULL COMMENT '预测精度',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_date (user_id, forecast_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
package com.qingyunjiao.smarthome.energy.forecast;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class EnergyForecastService {
private static final Logger log = LoggerFactory.getLogger(EnergyForecastService.class);
@Autowired
private SparkSession sparkSession;
public List<EnergyForecastVO> forecast24HourEnergy(String userId) {
log.info("开始预测用户{}未来 24 小时能耗", maskUserId(userId));
try {
Dataset<Row> featureData = loadFeatureData(userId);
Dataset<Row> predictResult = forecastModel.transform(featureData);
Dataset<Row> fusedResult = fusePredictResult(predictResult);
List<EnergyForecastVO> result = processPredictResult(fusedResult, userId);
return result;
} catch (Exception e) {
log.error("用户{}未来 24 小时能耗预测失败", maskUserId(userId), e);
throw new RuntimeException("能耗预测失败", e);
}
}
private Dataset<Row> loadFeatureData(String userId) {
String energySql = String.format(
"SELECT hour(collect_time) AS hour, AVG(power) AS avg_power FROM hive_db.device_energy_consumption WHERE user_id = '%s' GROUP BY hour(collect_time)",
userId);
Dataset<Row> energyData = sparkSession.sql(energySql).cache();
return energyData;
}
private List<EnergyForecastVO> processPredictResult(Dataset<Row> fusedResult, String userId) {
List<EnergyForecastVO> result = fusedResult.toJavaRDD().map(row -> {
EnergyForecastVO vo = new EnergyForecastVO();
vo.setUserId(userId);
vo.setHourlyEnergy(roundToTwoDecimal(row.getDouble(row.fieldIndex("hourly_energy"))));
return vo;
}).collect();
return result;
}
}