|
|
|
|
@ -1,132 +0,0 @@
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import sys
|
|
|
|
|
import logging
|
|
|
|
|
import numpy as np
|
|
|
|
|
import statsmodels.api as sm
|
|
|
|
|
|
|
|
|
|
# 配置日志
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
|
|
|
|
def apply_lowess_and_clipping_scaling(input_csv_path, output_csv_path, lowess_frac, target_range, clipping_percentile):
|
|
|
|
|
"""
|
|
|
|
|
应用 Lowess 局部加权回归进行平滑(提取总体趋势),然后使用百分位数裁剪后的 Min-Max 边界来缩放。
|
|
|
|
|
目标:生成最平滑、最接近单调下降的客观趋势。
|
|
|
|
|
"""
|
|
|
|
|
input_path = Path(input_csv_path)
|
|
|
|
|
output_path = Path(output_csv_path)
|
|
|
|
|
|
|
|
|
|
if not input_path.exists():
|
|
|
|
|
logging.error(f"错误:未找到输入文件 {input_csv_path}")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
logging.info(f"读取原始数据: {input_csv_path}")
|
|
|
|
|
df = pd.read_csv(input_path)
|
|
|
|
|
|
|
|
|
|
df = df.loc[:,~df.columns.duplicated()].copy()
|
|
|
|
|
|
|
|
|
|
# 定义原始数据列名
|
|
|
|
|
raw_x_col = 'X_Feature_L2_Norm'
|
|
|
|
|
raw_y_col = 'Y_Feature_Variance'
|
|
|
|
|
raw_z_col = 'Z_LDM_Loss'
|
|
|
|
|
|
|
|
|
|
# --------------------------- 1. Lowess 局部加权回归平滑 (提取总体趋势) ---------------------------
|
|
|
|
|
logging.info(f"应用 Lowess 局部加权回归,平滑因子 frac={lowess_frac}。")
|
|
|
|
|
|
|
|
|
|
x_coords = df['step'].values
|
|
|
|
|
|
|
|
|
|
for raw_col in [raw_x_col, raw_y_col, raw_z_col]:
|
|
|
|
|
y_coords = df[raw_col].values
|
|
|
|
|
|
|
|
|
|
smoothed_data = sm.nonparametric.lowess(
|
|
|
|
|
endog=y_coords,
|
|
|
|
|
exog=x_coords,
|
|
|
|
|
frac=lowess_frac,
|
|
|
|
|
it=0
|
|
|
|
|
)
|
|
|
|
|
df[f'{raw_col}_LOWESS'] = smoothed_data[:, 1]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --------------------------- 2. 百分位数边界缩放与方向统一 ---------------------------
|
|
|
|
|
p = clipping_percentile
|
|
|
|
|
logging.info(f"应用百分位数边界 (p={p}) 进行线性缩放,目标范围 [0, {target_range:.2f}]")
|
|
|
|
|
|
|
|
|
|
scale_cols_map = {
|
|
|
|
|
'X_Feature_L2_Norm': f'{raw_x_col}_LOWESS',
|
|
|
|
|
'Y_Feature_Variance': f'{raw_y_col}_LOWESS',
|
|
|
|
|
'Z_LDM_Loss': f'{raw_z_col}_LOWESS'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for final_col, lowess_col in scale_cols_map.items():
|
|
|
|
|
|
|
|
|
|
data = df[lowess_col]
|
|
|
|
|
|
|
|
|
|
# 裁剪:计算裁剪后的 min/max (定义缩放窗口)
|
|
|
|
|
lower_bound = data.quantile(p)
|
|
|
|
|
upper_bound = data.quantile(1.0 - p)
|
|
|
|
|
|
|
|
|
|
min_val = lower_bound
|
|
|
|
|
max_val = upper_bound
|
|
|
|
|
data_range = max_val - min_val
|
|
|
|
|
|
|
|
|
|
if data_range <= 0 or data_range == np.nan:
|
|
|
|
|
df[final_col] = 0.0
|
|
|
|
|
logging.warning(f"列 {final_col} 裁剪后的范围为 {data_range:.4f},跳过缩放。")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 归一化: (data - Min_window) / Range_window
|
|
|
|
|
normalized_data = (data - min_val) / data_range
|
|
|
|
|
|
|
|
|
|
# **优化方向统一逻辑 (所有指标都应是越小越好):**
|
|
|
|
|
if final_col in ['X_Feature_L2_Norm', 'Y_Feature_Variance']:
|
|
|
|
|
# X/Y 反转:将 Max 映射到 0,Min 映射到 TargetRange
|
|
|
|
|
final_scaled_data = (1.0 - normalized_data) * target_range
|
|
|
|
|
else: # Z_LDM_Loss
|
|
|
|
|
# Z 标准缩放:Min 映射到 0,Max 映射到 TargetRange
|
|
|
|
|
final_scaled_data = normalized_data * target_range
|
|
|
|
|
|
|
|
|
|
# 保留负值,以确保平滑过渡
|
|
|
|
|
df[final_col] = final_scaled_data
|
|
|
|
|
|
|
|
|
|
logging.info(f" - 列 {final_col}:裁剪边界: [{min_val:.4f}, {max_val:.4f}]。缩放后范围不再严格约束 [0, {target_range:.2f}],以保留趋势。")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --------------------------- 3. 最终保存 ---------------------------
|
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
final_cols = ['step', 'X_Feature_L2_Norm', 'Y_Feature_Variance', 'Z_LDM_Loss']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df[final_cols].to_csv(
|
|
|
|
|
output_path,
|
|
|
|
|
index=False,
|
|
|
|
|
float_format='%.3f'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logging.info(f"Lowess平滑和缩放后的数据已保存到: {output_csv_path}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
if len(sys.argv) != 6:
|
|
|
|
|
logging.error("使用方法: python smooth_coords.py <输入CSV路径> <输出CSV路径> <Lowess 平滑因子 frac (例如 0.4)> <目标视觉范围 (例如 30)> <离散点裁剪百分比 (例如 0.15)>")
|
|
|
|
|
else:
|
|
|
|
|
input_csv = sys.argv[1]
|
|
|
|
|
output_csv = sys.argv[2]
|
|
|
|
|
try:
|
|
|
|
|
lowess_frac = float(sys.argv[3])
|
|
|
|
|
target_range = float(sys.argv[4])
|
|
|
|
|
clipping_p = float(sys.argv[5])
|
|
|
|
|
|
|
|
|
|
if not (0.0 < lowess_frac <= 1.0):
|
|
|
|
|
raise ValueError("Lowess 平滑因子 frac 必须在 (0.0, 1.0] 之间。")
|
|
|
|
|
if target_range <= 0:
|
|
|
|
|
raise ValueError("目标视觉范围必须大于 0。")
|
|
|
|
|
if not (0 <= clipping_p < 0.5):
|
|
|
|
|
raise ValueError("裁剪百分比必须在 [0, 0.5) 之间。")
|
|
|
|
|
|
|
|
|
|
if not Path(output_csv).suffix:
|
|
|
|
|
output_csv = str(Path(output_csv) / "scaled_coords.csv")
|
|
|
|
|
|
|
|
|
|
apply_lowess_and_clipping_scaling(input_csv, output_csv, lowess_frac, target_range, clipping_p)
|
|
|
|
|
except ValueError as e:
|
|
|
|
|
logging.error(f"参数错误: {e}")
|