首先,利用 Wiener 去噪的振动信号,通过自定义的 bTSTFT 变换提取每个超帧的 256×256 幅值和相位矩阵,构建双通道时频表示。然后,采用卷积自编码器在健康数据上无监督学习正常振动模式的重构;在测试阶段,计算每个超帧的重构误差(MSE)作为异常指标。为提升检测鲁棒性,对 MSE 序列应用 CUSUM 累积和控制图,以及基于滚动方差的早期评分,两者均能有效捕捉退化初期信号的细微变化。实验结果表明,该方法在超帧 460470 附近即触发持续报警,比传统峭度、RMS 等指标提前约 6080 帧(约 1~1.5 分钟)。算法将物理先验(BPFO 谐波抑制、Wiener 去噪)与数据驱动(bTSTFT+ 自编码器)有机结合,在保持计算可行性的前提下(每超帧约 1 秒,可部署于边缘硬件),实现了极高的检测灵敏度和提前量,为旋转机械的预测性维护提供了可靠的技术方案。
算法步骤
预计算 bTSTFT 特征加载
从磁盘加载预计算的 bTSTFT 幅值和相位矩阵,每个超帧(由 5 个连续 1 秒帧构成)生成 256×256 的二维时频表示。 对幅值和相位分别进行 Z-score 标准化,使不同量级的特征统一尺度,避免网络训练偏向幅值或相位。 将标准化后的幅值和相位堆叠为双通道输入,形状为 (980, 256, 256, 2)。
数据集划分
根据轴承全寿命周期,将前 300 个超帧(对应原帧 50300)标记为健康状态,用作训练集。
将中间 300800 超帧(对应原帧 300~800)作为测试集,包含退化初期至失效阶段。
从训练集中随机抽取 20% 作为验证集,用于早停和模型选择。
卷积自编码器构建
编码器:由两层卷积 + 最大池化组成,逐步压缩空间维度,提取高层次抽象特征(通道数 32→64→128)。 解码器:通过上采样和卷积恢复原始尺寸,输出层采用线性激活以保留幅值和相位的真实动态范围。 损失函数为均方误差(MSE),优化器为 Adam,学习率 0.001。
无监督训练
仅使用健康数据训练自编码器,使其学习重构正常振动模式的 bTSTFT 表示。 设置早停(patience=8)、学习率衰减(factor=0.5)和模型检查点,防止过拟合并保存最佳模型。
重构误差计算
对测试集每个超帧,用训练好的自编码器重构,计算像素级 MSE 作为异常分数。 同时计算训练集上的 MSE,确定健康状态的百分位数带(如 P15~P85)作为正常波动范围参考。
CUSUM 累积和控制图
以健康 MSE 的均值为目标值,容许偏差 k=0.5×健康标准差,决策阈值 h=5×健康标准差。 对测试集 MSE 序列逐点计算正向累积和,一旦累积和超过 h,判定为持续异常,记录首次报警帧号。 CUSUM 能有效平滑单点噪声,检测到持续的偏离趋势。
基于 MSE 方差的早期评分
对测试集 MSE 序列计算滚动方差(窗口 10),突出变异增强现象(退化初期信号波动加剧)。 计算健康段滚动方差的均值和标准差,设定阈值=均值 +4×标准差。 检测首次超过阈值的超帧,作为另一种互补的早期报警信号。
结果可视化与报警确认
绘制 MSE 曲线、CUSUM 曲线和方差评分曲线,标注健康带和报警线。
对比传统时域指标(峭度、RMS)的报警时间(约 530540 帧),验证本方法提前至 460470 帧。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers Adam
tensorflow.keras.callbacks EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
scipy.fft fft, fftfreq
data_array_btstft_mag = np.load()
data_array_btstft_phas = np.load()
mag_flat = data_array_btstft_mag.flatten().reshape(-, )
phas_flat = data_array_btstft_phas.flatten().reshape(-, )
scaler_mag = StandardScaler()
scaler_phas = StandardScaler()
mag_norm_flat = scaler_mag.fit_transform(mag_flat)
phas_norm_flat = scaler_phas.fit_transform(phas_flat)
mag_norm = mag_norm_flat.reshape(data_array_btstft_mag.shape)
phas_norm = phas_norm_flat.reshape(data_array_btstft_phas.shape)
input_data = np.stack([mag_norm, phas_norm], axis=-)
healthy_start, healthy_end = ,
test_start, test_end = ,
X_train = input_data[healthy_start:healthy_end]
X_test = input_data[test_start:test_end]
sklearn.model_selection train_test_split
X_train, X_val, _, _ = train_test_split(X_train, X_train, test_size=, random_state=)
input_layer = Input(shape=(, , ))
x = Conv2D(, (,), activation=)(input_layer)
x = MaxPooling2D(,)(x)
x = Conv2D(, (,), activation=)(x)
x = MaxPooling2D(,)(x)
encoded = Conv2D(, (,), activation=)(x)
x = Conv2D(, (,), activation=)(encoded)
x = UpSampling2D()(x)
x = Conv2D(, (,), activation=)(x)
x = UpSampling2D()(x)
decoded = Conv2D(, (,), activation=)(x)
autoencoder = Model(input_layer, decoded)
autoencoder.(optimizer=Adam(learning_rate=), loss=)
early_stop = EarlyStopping(monitor=, patience=, restore_best_weights=)
reduce_lr = ReduceLROnPlateau(monitor=, factor=, patience=, min_lr=)
checkpoint = ModelCheckpoint(, monitor=, save_best_only=)
history = autoencoder.fit(
X_train, X_train, epochs=, batch_size=,
validation_data=(X_val, X_val), shuffle=,
callbacks=[early_stop, reduce_lr, checkpoint], verbose=
)
recon_test = autoencoder.predict(X_test)
mse_per_surframe = np.mean((X_test - recon_test)**, axis=(,,))
recon_train = autoencoder.predict(X_train)
train_mse = np.mean((X_train - recon_train)**, axis=(,,))
p15_threshold = np.percentile(train_mse, )
p85_threshold = np.percentile(train_mse, )
indices = np.arange(test_start, test_start + (mse_per_surframe))
plt.figure()
plt.plot(indices, mse_per_surframe, label=)
plt.axhspan(p15_threshold, p85_threshold, color=, alpha=, label=)
plt.axvline(, color=, linestyle=, label=)
plt.title()
plt.legend()
plt.show()
mse_ref = np.mean(train_mse)
k = * np.std(train_mse)
h = * np.std(train_mse)
cusum_pos =
cusum = []
m mse_per_surframe:
cusum_pos = (, cusum_pos + (m - mse_ref) - k)
cusum.append(cusum_pos)
cusum = cusum[:(mse_per_surframe)]
crossing = np.where(np.array(cusum) > h)[]
(crossing) > :
first_alarm = crossing[] + test_start
()
rolling_var = pd.Series(mse_per_surframe).rolling(window=, min_periods=).var()
healthy_var = rolling_var[:healthy_end - test_start]
thresh_var = np.mean(healthy_var) + * np.std(healthy_var)
alarm_var = np.where(rolling_var > thresh_var)[]
(alarm_var) > :
first_alarm_var = alarm_var[] + test_start
()



