diff --git a/src/backend/app/algorithms/processors/coords_processor.py b/src/backend/app/algorithms/processors/coords_processor.py deleted file mode 100644 index ae2a908..0000000 --- a/src/backend/app/algorithms/processors/coords_processor.py +++ /dev/null @@ -1,132 +0,0 @@ -import pandas as pd -from pathlib import Path -import sys -import logging -import numpy as np -import statsmodels.api as sm - -# 配置日志 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def apply_lowess_and_clipping_scaling(input_csv_path, output_csv_path, lowess_frac, target_range, clipping_percentile): - """ - 应用 Lowess 局部加权回归进行平滑(提取总体趋势),然后使用百分位数裁剪后的 Min-Max 边界来缩放。 - 目标:生成最平滑、最接近单调下降的客观趋势。 - """ - input_path = Path(input_csv_path) - output_path = Path(output_csv_path) - - if not input_path.exists(): - logging.error(f"错误:未找到输入文件 {input_csv_path}") - return - - logging.info(f"读取原始数据: {input_csv_path}") - df = pd.read_csv(input_path) - - df = df.loc[:,~df.columns.duplicated()].copy() - - # 定义原始数据列名 - raw_x_col = 'X_Feature_L2_Norm' - raw_y_col = 'Y_Feature_Variance' - raw_z_col = 'Z_LDM_Loss' - - # --------------------------- 1. Lowess 局部加权回归平滑 (提取总体趋势) --------------------------- - logging.info(f"应用 Lowess 局部加权回归,平滑因子 frac={lowess_frac}。") - - x_coords = df['step'].values - - for raw_col in [raw_x_col, raw_y_col, raw_z_col]: - y_coords = df[raw_col].values - - smoothed_data = sm.nonparametric.lowess( - endog=y_coords, - exog=x_coords, - frac=lowess_frac, - it=0 - ) - df[f'{raw_col}_LOWESS'] = smoothed_data[:, 1] - - - # --------------------------- 2. 百分位数边界缩放与方向统一 --------------------------- - p = clipping_percentile - logging.info(f"应用百分位数边界 (p={p}) 进行线性缩放,目标范围 [0, {target_range:.2f}]") - - scale_cols_map = { - 'X_Feature_L2_Norm': f'{raw_x_col}_LOWESS', - 'Y_Feature_Variance': f'{raw_y_col}_LOWESS', - 'Z_LDM_Loss': f'{raw_z_col}_LOWESS' - } - - for final_col, lowess_col in scale_cols_map.items(): - - data = df[lowess_col] - - # 裁剪:计算裁剪后的 min/max (定义缩放窗口) - lower_bound = data.quantile(p) - upper_bound = data.quantile(1.0 - p) - - min_val = lower_bound - max_val = upper_bound - data_range = max_val - min_val - - if data_range <= 0 or data_range == np.nan: - df[final_col] = 0.0 - logging.warning(f"列 {final_col} 裁剪后的范围为 {data_range:.4f},跳过缩放。") - continue - - # 归一化: (data - Min_window) / Range_window - normalized_data = (data - min_val) / data_range - - # **优化方向统一逻辑 (所有指标都应是越小越好):** - if final_col in ['X_Feature_L2_Norm', 'Y_Feature_Variance']: - # X/Y 反转:将 Max 映射到 0,Min 映射到 TargetRange - final_scaled_data = (1.0 - normalized_data) * target_range - else: # Z_LDM_Loss - # Z 标准缩放:Min 映射到 0,Max 映射到 TargetRange - final_scaled_data = normalized_data * target_range - - # 保留负值,以确保平滑过渡 - df[final_col] = final_scaled_data - - logging.info(f" - 列 {final_col}:裁剪边界: [{min_val:.4f}, {max_val:.4f}]。缩放后范围不再严格约束 [0, {target_range:.2f}],以保留趋势。") - - - # --------------------------- 3. 最终保存 --------------------------- - output_path.parent.mkdir(parents=True, exist_ok=True) - - final_cols = ['step', 'X_Feature_L2_Norm', 'Y_Feature_Variance', 'Z_LDM_Loss'] - - - df[final_cols].to_csv( - output_path, - index=False, - float_format='%.3f' - ) - - logging.info(f"Lowess平滑和缩放后的数据已保存到: {output_csv_path}") - - -if __name__ == '__main__': - if len(sys.argv) != 6: - logging.error("使用方法: python smooth_coords.py <输入CSV路径> <输出CSV路径> <目标视觉范围 (例如 30)> <离散点裁剪百分比 (例如 0.15)>") - else: - input_csv = sys.argv[1] - output_csv = sys.argv[2] - try: - lowess_frac = float(sys.argv[3]) - target_range = float(sys.argv[4]) - clipping_p = float(sys.argv[5]) - - if not (0.0 < lowess_frac <= 1.0): - raise ValueError("Lowess 平滑因子 frac 必须在 (0.0, 1.0] 之间。") - if target_range <= 0: - raise ValueError("目标视觉范围必须大于 0。") - if not (0 <= clipping_p < 0.5): - raise ValueError("裁剪百分比必须在 [0, 0.5) 之间。") - - if not Path(output_csv).suffix: - output_csv = str(Path(output_csv) / "scaled_coords.csv") - - apply_lowess_and_clipping_scaling(input_csv, output_csv, lowess_frac, target_range, clipping_p) - except ValueError as e: - logging.error(f"参数错误: {e}") \ No newline at end of file diff --git a/src/backend/app/algorithms/processors/image_processor.py b/src/backend/app/algorithms/processors/image_processor.py deleted file mode 100644 index 287a197..0000000 --- a/src/backend/app/algorithms/processors/image_processor.py +++ /dev/null @@ -1,149 +0,0 @@ -""" -图片处理功能,用于把原始图片剪裁为中心正方形,指定分辨率,并保存为指定格式,还可以选择是否序列化改名。 -""" - -import argparse -import os -from pathlib import Path -from PIL import Image - -# --- 1. 参数解析 --- -def parse_args(input_args=None): - """ - 解析命令行参数。 - """ - parser = argparse.ArgumentParser(description="Image Processor for Centering, Resizing, and Format Conversion.") - - # 路径和分辨率参数 - parser.add_argument( - "--input_dir", - type=str, - required=True, - help="A folder containing the original images to be processed and overwritten.", - ) - parser.add_argument( - "--resolution", - type=int, - default=512, - help="The target resolution (width and height) for the output images (e.g., 512 for 512x512).", - ) - # 格式参数 - parser.add_argument( - "--target_format", - type=str, - default="png", - choices=["jpeg", "png", "webp", "jpg"], - help="The target format for the saved images (e.g., 'png', 'jpg', 'webp'). The original file will be overwritten, potentially changing the file extension.", - ) - - # 序列化数字重命名参数 - parser.add_argument( - "--rename_sequential", - action="store_true", # 当这个参数存在时,其值为 True - help="If set, images will be sequentially renamed (e.g., 001.jpg, 002.jpg...) instead of preserving the original filename. WARNING: This WILL delete the originals.", - ) - - if input_args is not None: - args = parser.parse_args(input_args) - else: - args = parser.parse_args() - - return args - -# --- 2. 核心图像处理逻辑 --- -def process_image(image_path: Path, output_path: Path, resolution: int, target_format: str, delete_original: bool): - """ - 加载图像,居中取最大正方形,升降分辨率,并保存为目标格式。 - - Args: - image_path: 原始图片路径。 - output_path: 最终保存路径。 - resolution: 目标分辨率。 - target_format: 目标文件格式。 - delete_original: 是否删除原始文件。 - """ - try: - # 加载图像并统一转换为 RGB 模式 - img = Image.open(image_path).convert("RGB") - - # 居中取最大正方形 - width, height = img.size - min_dim = min(width, height) - - # 计算裁剪框 (以最短边为尺寸的中心正方形) - left = (width - min_dim) // 2 - top = (height - min_dim) // 2 - right = left + min_dim - bottom = top + min_dim - - # 裁剪中心正方形 - img = img.crop((left, top, right, bottom)) - - # 升降分辨率到指定 resolution - # 使用 LANCZOS 高质量重采样方法 - img = img.resize((resolution, resolution), resample=Image.Resampling.LANCZOS) - - # 准备输出格式 - save_format = target_format.upper().replace('JPEG', 'JPG') - - # 保存图片 - # 对于 JPEG/JPG,设置 quality 参数 - if save_format == 'JPG': - img.save(output_path, format='JPEG', quality=95) - else: - img.save(output_path, format=save_format) - - # 根据标记决定是否删除原始文件 - if delete_original and image_path.resolve() != output_path.resolve(): - os.remove(image_path) - - print(f"Processed: {image_path.name} -> {output_path.name} ({resolution}x{resolution} {save_format})") - - except Exception as e: - print(f"Error processing {image_path.name}: {e}") - -# --- 3. 主函数 --- -def main(args): - # 路径准备 - input_dir = Path(args.input_dir) - - if not input_dir.is_dir(): - print(f"Error: Input directory not found at {input_dir}") - return - - # 查找所有图片文件 (支持 jpg, jpeg, png, webp) - valid_suffixes = ['.jpg', '.jpeg', '.png', '.webp'] - image_paths = sorted([p for p in input_dir.iterdir() if p.suffix.lower() in valid_suffixes]) # 排序以确保重命名顺序一致 - - if not image_paths: - print(f"No image files found in {input_dir}") - return - - print(f"Found {len(image_paths)} images in {input_dir}. Starting processing...") - - # 准备目标格式的扩展名 - extension = args.target_format.lower().replace('jpeg', 'jpg') - - # 迭代处理图片 - for i, image_path in enumerate(image_paths): - - # 决定输出路径 - if args.rename_sequential: - # 顺序重命名逻辑:001, 002, 003... (至少三位数字) - new_name = f"{i + 1:03d}.{extension}" - output_path = input_dir / new_name - # 如果原始文件与新文件名称不同,则需要删除原始文件 - delete_original = True - else: - # 保持原始文件名,但修改后缀 - output_path = image_path.with_suffix(f'.{extension}') - # 只有当原始后缀与目标后缀不同时,才需要删除原始文件(防止遗留旧格式) - delete_original = (image_path.suffix.lower() != f'.{extension}') - - process_image(image_path, output_path, args.resolution, args.target_format, delete_original) - - print("Processing complete.") - -if __name__ == "__main__": - args = parse_args() - main(args) \ No newline at end of file