遥感时间序列趋势分析:Theil-Sen 与 Mann-Kendall
在气候变化、环境监测及生态演变研究中,准确识别长时间序列数据的趋势至关重要。Theil-Sen Median 趋势分析(Sen 分析)结合 Mann-Kendall 显著性检验(MK 检验)是处理此类问题的经典非参数方法组合。相比传统最小二乘法,该方法对异常值和缺失值具有更强的鲁棒性,且无需数据服从正态分布。
原理概述
1. Theil-Sen Median 趋势分析
Sen 分析通过计算所有点对斜率的中位数来评估趋势。对于时间序列 $ET_1, ET_2, ..., ET_n$,任意两点 $(i, j)$ 的斜率计算公式为:

其中 $ET_i$ 和 $ET_j$ 分别为不同时间点的数据值,$Q$ 为中位数斜率。若 $Q > 0$ 表示上升趋势,$Q < 0$ 表示下降趋势。
2. Mann-Kendall 显著性检验
MK 检验用于判断 Sen 分析得出的趋势是否具有统计学意义。它通过比较时间序列中每对数据点的符号来计算统计量 $S$:

其中 $sgn(ET_j - ET_i)$ 为符号函数。根据 $S$ 及其方差可进一步计算 $Z$ 值:

3. 结果判读
在 0.05 置信水平下,依据 $Z$ 值判断显著性:
- 显著上升:$Z > 1.96$
- 显著下降:$Z < -1.96$
- 不显著:$-1.96 \le Z \le 1.96$
代码实现
以下 Python 脚本实现了从数据加载、Sen 斜率计算、MK 检验到结果重分类的全流程。代码支持逐像元进度可视化,并针对空值进行了处理。
准备工作
确保已安装 rasterio, numpy, tqdm, matplotlib 等依赖库。数据文件夹结构建议如下:

核心代码
import os
import rasterio
numpy np
tqdm tqdm
matplotlib.pyplot plt
pathlib Path
base_path = os.getcwd()
data_path = os.path.join(base_path, )
result_path = os.path.join(base_path, )
os.makedirs(result_path, exist_ok=)
start_year =
end_year =
cd = end_year - start_year +
:
first_file = os.path.join(data_path, )
rasterio.(first_file) src:
a = src.read()
transform = src.transform
metadata = src.meta.copy()
metadata.update({
: ,
: np.nan
})
m, n = a.shape
()
all_data = np.full((m, n, cd), np.nan)
()
i, year (tqdm((start_year, end_year + ), desc=)):
filename = os.path.join(data_path, )
rasterio.(filename) src:
all_data[:, :, i] = src.read()
sen_result = np.full((m, n), np.nan)
valid_pixels =
valid_mask = np.(all_data > , axis=)
total_valid = np.(valid_mask)
()
i tqdm((m), desc=):
j (n):
valid_mask[i, j]:
data = all_data[i, j, :]
np.(~np.isnan(data)) np.(data) > :
slopes = []
k1 (, cd):
k2 (k1):
slope = (data[k1] - data[k2]) / (k1 - k2)
slopes.append(slope)
slopes:
sen_result[i, j] = np.median(slopes)
valid_pixels +=
()
sen_output_path = os.path.join(result_path, )
sen_result = sen_result.astype()
rasterio.(sen_output_path, , **metadata) dst:
dst.write(sen_result, )
(, sen_output_path)
()
mk_result = np.full((m, n), np.nan)
i tqdm((m), desc=):
j (n):
valid_mask[i, j]:
data = all_data[i, j, :]
np.(~np.isnan(data)) np.(data) > :
sgnsum = []
k1 (, cd):
k2 (k1):
sgn = np.sign(data[k1] - data[k2])
sgnsum.append(sgn)
mk_result[i, j] = np.(sgnsum)
()
vars_mk = cd * (cd - ) * ( * cd + ) /
z_scores = np.full((m, n), np.nan)
z_scores[~np.isnan(mk_result) & (mk_result == )] =
z_scores[~np.isnan(mk_result) & (mk_result > )] = (mk_result[~np.isnan(mk_result) & (mk_result > )] - ) / np.sqrt(vars_mk)
z_scores[~np.isnan(mk_result) & (mk_result < )] = (mk_result[~np.isnan(mk_result) & (mk_result < )] + ) / np.sqrt(vars_mk)
mk_output_path = os.path.join(result_path, )
z_scores = z_scores.astype()
rasterio.(mk_output_path, , **metadata) dst:
dst.write(z_scores, )
(, mk_output_path)
()
S2 = np.full((m, n), np.nan)
M2 = np.full((m, n), np.nan)
S2[np.isnan(sen_result)] = -
S2[~np.isnan(sen_result) & (sen_result <= -)] = -
S2[~np.isnan(sen_result) & (sen_result >= )] =
S2[~np.isnan(sen_result) & (sen_result > -) & (sen_result < )] =
M2[z_scores > ] =
M2[~np.isnan(z_scores) & (z_scores <= )] =
reclassify = (S2 * M2).astype(np.int16)
reclass_output_path = os.path.join(result_path, )
metadata.update({
: ,
: -
})
rasterio.(reclass_output_path, , **metadata) dst:
dst.write(reclassify, )
(, reclass_output_path)
Exception e:
()







