|
|
"""
|
|
|
mmWave_RA_RE.py - 深度学习特征生成基线脚本
|
|
|
|
|
|
功能: 从雷达数据立方体生成 Range-Doppler, Range-Azimuth, Range-Elevation 特征图
|
|
|
输入: Cube_data 目录下的 *_cube.npy 文件 [Frames, VirtRx, Chirps, Samples]
|
|
|
输出:
|
|
|
- *_features.npz: 包含 rd, ra, re 三个数组
|
|
|
- *_merged.npy: 拼接后的 [Frames, Range, Channels] 张量
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import glob
|
|
|
import json
|
|
|
import numpy as np
|
|
|
from scipy.signal.windows import taylor, hamming
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
# 导入配置解析器
|
|
|
try:
|
|
|
from process_data import parse_radar_cfg, parse_tx_order
|
|
|
except ImportError:
|
|
|
try:
|
|
|
from src.process_data import parse_radar_cfg, parse_tx_order
|
|
|
except ImportError:
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
from process_data import parse_radar_cfg, parse_tx_order
|
|
|
|
|
|
# ===================================================================
|
|
|
# 配置区域
|
|
|
# ===================================================================
|
|
|
INPUT_CUBE_DIR = r"F:\test\Data_bin\lab\Cube_data"
|
|
|
OUTPUT_FEATURE_DIR = r"F:\Data_bin\dormitory\Heatmap_Features_25"
|
|
|
CFG_PATH = r"F:\test\cfg\Radar.cfg"
|
|
|
|
|
|
# 算法参数
|
|
|
DOWNSAMPLE_FACTOR = 4 # 帧降采样因子 (1=不降采样)
|
|
|
ANGLE_FFT_SIZE = 64 # 角度 FFT 点数
|
|
|
RANGE_WINDOW = "hamming" # Range FFT 窗函数
|
|
|
DOPPLER_WINDOW = "hamming" # Doppler FFT 窗函数
|
|
|
|
|
|
# IWR6843ISK 天线拓扑定义
|
|
|
IDXS_AZIMUTH = np.arange(0, 8) # 水平阵列: TX0(0-3) + TX2(4-7)
|
|
|
# 垂直配对 (水平位置对齐): (2,8), (3,9), (4,10), (5,11) - 见 elevation_phase_diff()
|
|
|
|
|
|
# ===================================================================
|
|
|
# 窗函数缓存 (避免重复计算)
|
|
|
# ===================================================================
|
|
|
_WINDOW_CACHE = {}
|
|
|
|
|
|
def get_window(name: str, length: int) -> np.ndarray:
|
|
|
"""获取或创建窗函数"""
|
|
|
key = (name, length)
|
|
|
if key not in _WINDOW_CACHE:
|
|
|
if name == 'hamming':
|
|
|
_WINDOW_CACHE[key] = hamming(length)
|
|
|
elif name == 'taylor':
|
|
|
try:
|
|
|
_WINDOW_CACHE[key] = taylor(length, nbar=3, sll=40)
|
|
|
except:
|
|
|
_WINDOW_CACHE[key] = hamming(length)
|
|
|
else:
|
|
|
_WINDOW_CACHE[key] = np.ones(length)
|
|
|
return _WINDOW_CACHE[key]
|
|
|
|
|
|
# ===================================================================
|
|
|
# 核心处理函数
|
|
|
# ===================================================================
|
|
|
|
|
|
def normalize_complex_global(data, global_max=None):
|
|
|
"""保留相位的归一化"""
|
|
|
magnitude = np.abs(data)
|
|
|
phase = np.angle(data)
|
|
|
denom = global_max if global_max else 1e4
|
|
|
return (magnitude / (denom + 1e-12)) * np.exp(1j * phase)
|
|
|
|
|
|
|
|
|
def range_fft(cube: np.ndarray, window_type: str = 'hamming') -> np.ndarray:
|
|
|
"""
|
|
|
Range FFT
|
|
|
Input: [Frames, VirtRx, Chirps, Samples]
|
|
|
Output: [Frames, VirtRx, Chirps, RangeBins]
|
|
|
"""
|
|
|
n_samples = cube.shape[-1]
|
|
|
win = get_window(window_type, n_samples)
|
|
|
cube_win = cube * win.reshape(1, 1, 1, -1)
|
|
|
return np.fft.fft(cube_win, axis=-1)
|
|
|
|
|
|
def doppler_fft(range_cube: np.ndarray, window_type: str = 'hamming') -> np.ndarray:
|
|
|
"""
|
|
|
Doppler FFT
|
|
|
Input: [Frames, VirtRx, Chirps, RangeBins]
|
|
|
Output: [Frames, VirtRx, Doppler, RangeBins] (fftshift)
|
|
|
"""
|
|
|
n_chirps = range_cube.shape[2]
|
|
|
win = get_window(window_type, n_chirps)
|
|
|
cube_win = range_cube * win.reshape(1, 1, -1, 1)
|
|
|
dop_fft = np.fft.fft(cube_win, axis=2)
|
|
|
return np.fft.fftshift(dop_fft, axes=2)
|
|
|
|
|
|
def clutter_removal(data: np.ndarray, axis: int = 2) -> np.ndarray:
|
|
|
"""静态杂波去除 (减去均值)"""
|
|
|
mean_val = np.mean(data, axis=axis, keepdims=True)
|
|
|
return data - mean_val
|
|
|
|
|
|
def azimuth_fft(range_cube: np.ndarray, num_angle_bins: int = 64) -> np.ndarray:
|
|
|
"""
|
|
|
Azimuth FFT (生成 Range-Azimuth Map)
|
|
|
Input: [Frames, VirtRx, Chirps, Range]
|
|
|
Output: [Frames, Angle, Range]
|
|
|
"""
|
|
|
# 选取水平阵列
|
|
|
az_data = range_cube[:, IDXS_AZIMUTH, :, :]
|
|
|
n_ant = az_data.shape[1]
|
|
|
|
|
|
# 加窗 (Taylor 窗获得更好的旁瓣抑制)
|
|
|
win = get_window('taylor', n_ant)
|
|
|
az_data = az_data * win.reshape(1, -1, 1, 1)
|
|
|
|
|
|
# Angle FFT
|
|
|
angle_out = np.fft.fft(az_data, n=num_angle_bins, axis=1)
|
|
|
angle_out = np.fft.fftshift(angle_out, axes=1)
|
|
|
|
|
|
# 计算功率谱并对 Chirp 维求平均 -> [Frames, Angle, Range]
|
|
|
ra_map = np.mean(np.abs(angle_out), axis=2)
|
|
|
return ra_map
|
|
|
|
|
|
def elevation_phase_diff(range_cube, num_angle_bins=64):
|
|
|
"""
|
|
|
[修正与优化] 使用4对垂直配对计算俯仰角
|
|
|
|
|
|
IWR6843ISK 物理对齐分析:
|
|
|
- Base Array (Z=0): Index 0,1,2,3 (TX0) 和 4,5,6,7 (TX2)
|
|
|
- Elev Array (Z=1): Index 8,9,10,11 (TX1, 物理右移2单位)
|
|
|
|
|
|
水平对齐配对 (X坐标相同):
|
|
|
- X=2: Index 2 (TX0_RX2) & Index 8 (TX1_RX0)
|
|
|
- X=3: Index 3 (TX0_RX3) & Index 9 (TX1_RX1)
|
|
|
- X=4: Index 4 (TX2_RX0) & Index 10 (TX1_RX2)
|
|
|
- X=5: Index 5 (TX2_RX1) & Index 11 (TX1_RX3)
|
|
|
"""
|
|
|
# [关键修正] 正确的配对列表
|
|
|
elevation_pairs = [(2, 8), (3, 9), (4, 10), (5, 11)]
|
|
|
|
|
|
n_frames = range_cube.shape[0]
|
|
|
|
|
|
# 存储每对的FFT结果
|
|
|
all_re_maps = []
|
|
|
|
|
|
for idx1, idx2 in elevation_pairs:
|
|
|
# 取出这一对天线 [Frames, 2, Chirps, Range]
|
|
|
el_data = range_cube[:, [idx1, idx2], :, :]
|
|
|
|
|
|
n_ant = 2 # 每对2根天线
|
|
|
win = get_window('hamming', n_ant)
|
|
|
el_data = el_data * win.reshape(1, -1, 1, 1)
|
|
|
|
|
|
# 2点FFT (在 axis=1, 天线维)
|
|
|
# 注意: 2点FFT补零到64点,利用插值获得平滑的相位响应
|
|
|
el_out = np.fft.fft(el_data, n=num_angle_bins, axis=1)
|
|
|
el_out = np.fft.fftshift(el_out, axes=1)
|
|
|
|
|
|
# 计算模值 (不单独归一化,避免弱信号噪声被放大)
|
|
|
el_mag = np.abs(el_out)
|
|
|
|
|
|
# 对Chirp维求平均 (非相干积累) -> [Frames, Angle, Range]
|
|
|
re_map = np.mean(el_mag, axis=2)
|
|
|
all_re_maps.append(re_map)
|
|
|
|
|
|
# 对4对结果取平均, 提升信噪比
|
|
|
re_map_avg = np.mean(np.stack(all_re_maps, axis=0), axis=0)
|
|
|
|
|
|
return re_map_avg
|
|
|
|
|
|
|
|
|
# ===================================================================
|
|
|
# 主处理流程
|
|
|
# ===================================================================
|
|
|
|
|
|
def process_single_file(cube_path: str, output_root: str) -> None:
|
|
|
"""处理单个数据文件"""
|
|
|
filename = os.path.basename(cube_path).replace('_cube.npy', '')
|
|
|
|
|
|
# [Revert] 不再创建子目录,直接使用根目录
|
|
|
out_dir = output_root
|
|
|
# os.makedirs(out_dir, exist_ok=True) # 根目录在main中已创建
|
|
|
|
|
|
# 加载数据
|
|
|
try:
|
|
|
cube = np.load(cube_path)
|
|
|
except Exception as e:
|
|
|
tqdm.write(f" [Error] 加载 {filename} 失败: {e}")
|
|
|
return
|
|
|
|
|
|
# 数据校验
|
|
|
if cube.ndim != 4:
|
|
|
tqdm.write(f" [Error] {filename} 维度错误: {cube.shape}")
|
|
|
return
|
|
|
|
|
|
n_frames, n_rx, n_chirps, n_samples = cube.shape
|
|
|
|
|
|
if n_rx < 12:
|
|
|
tqdm.write(f" [Error] {filename} 通道数不足: {n_rx} < 12")
|
|
|
return
|
|
|
|
|
|
if np.any(np.isnan(cube)) or np.any(np.isinf(cube)):
|
|
|
tqdm.write(f" [Error] {filename} 包含 NaN 或 Inf")
|
|
|
return
|
|
|
|
|
|
# 1. Range FFT
|
|
|
range_res = range_fft(cube, window_type=RANGE_WINDOW)
|
|
|
|
|
|
# 2. Clutter Removal (MTI)
|
|
|
range_res = clutter_removal(range_res, axis=2)
|
|
|
|
|
|
# 3. Doppler FFT -> Range-Doppler Map
|
|
|
dop_res = doppler_fft(range_res, window_type=DOPPLER_WINDOW)
|
|
|
rd_map = np.mean(np.abs(dop_res), axis=1) # [Frames, Doppler, Range]
|
|
|
|
|
|
# 4. Azimuth FFT -> Range-Azimuth Map
|
|
|
ra_map = azimuth_fft(range_res, ANGLE_FFT_SIZE) # [Frames, Angle, Range]
|
|
|
|
|
|
# 5. Elevation FFT -> Range-Elevation Map
|
|
|
re_map = elevation_phase_diff(range_res, ANGLE_FFT_SIZE) # [Frames, Angle, Range]
|
|
|
|
|
|
# 6. 降采样
|
|
|
if DOWNSAMPLE_FACTOR > 1:
|
|
|
pick_indices = np.arange(0, n_frames, DOWNSAMPLE_FACTOR)
|
|
|
rd_map = rd_map[pick_indices, :, :]
|
|
|
ra_map = ra_map[pick_indices, :, :]
|
|
|
re_map = re_map[pick_indices, :, :]
|
|
|
|
|
|
# [Revert] 恢复旧的扁平化文件名逻辑: SampleName_features.npz / SampleName_merged.npy
|
|
|
# 7. 保存独立特征 (NPZ)
|
|
|
npz_path = os.path.join(out_dir, f"{filename}_features.npz")
|
|
|
np.savez_compressed(npz_path, rd=rd_map, ra=ra_map, re=re_map)
|
|
|
|
|
|
# 8. 保存合并张量 (NPY) - 适配 Transformer/3D-CNN 输入
|
|
|
# 转置为 [Frames, Range, Feature]
|
|
|
rd_T = np.transpose(rd_map, (0, 2, 1)) # [F, R, D]
|
|
|
ra_T = np.transpose(ra_map, (0, 2, 1)) # [F, R, A]
|
|
|
re_T = np.transpose(re_map, (0, 2, 1)) # [F, R, E]
|
|
|
|
|
|
# 拼接: [Frames, Range, D + A + E]
|
|
|
merged = np.concatenate([rd_T, ra_T, re_T], axis=-1)
|
|
|
|
|
|
merge_path = os.path.join(out_dir, f"{filename}_merged.npy")
|
|
|
np.save(merge_path, merged.astype(np.float32))
|
|
|
|
|
|
# [Revert] 不再保存 metadata.json
|
|
|
|
|
|
|
|
|
def main():
|
|
|
# 创建输出目录
|
|
|
os.makedirs(OUTPUT_FEATURE_DIR, exist_ok=True)
|
|
|
|
|
|
# 获取文件列表
|
|
|
cube_files = sorted(glob.glob(os.path.join(INPUT_CUBE_DIR, "*_cube.npy")))
|
|
|
|
|
|
if not cube_files:
|
|
|
print(f"未找到数据文件: {INPUT_CUBE_DIR}")
|
|
|
return
|
|
|
|
|
|
print(f"发现 {len(cube_files)} 个数据文件")
|
|
|
print(f"输出目录: {OUTPUT_FEATURE_DIR}")
|
|
|
print(f"降采样因子: {DOWNSAMPLE_FACTOR}, 角度FFT: {ANGLE_FFT_SIZE}")
|
|
|
print("-" * 50)
|
|
|
|
|
|
# 使用多进程并行处理
|
|
|
from multiprocessing import Pool, cpu_count
|
|
|
from functools import partial
|
|
|
|
|
|
num_workers = min(4, cpu_count()) # 最多4个进程
|
|
|
print(f"使用 {num_workers} 个并行进程")
|
|
|
|
|
|
# 创建带固定参数的处理函数
|
|
|
process_func = partial(process_single_file, output_root=OUTPUT_FEATURE_DIR)
|
|
|
|
|
|
try:
|
|
|
with Pool(num_workers) as pool:
|
|
|
# 使用imap显示进度
|
|
|
list(tqdm(
|
|
|
pool.imap(process_func, cube_files),
|
|
|
total=len(cube_files),
|
|
|
desc="Processing"
|
|
|
))
|
|
|
except Exception as e:
|
|
|
print(f"并行处理出错: {e}")
|
|
|
print("回退到串行处理...")
|
|
|
for f in tqdm(cube_files, desc="Processing"):
|
|
|
try:
|
|
|
process_single_file(f, OUTPUT_FEATURE_DIR)
|
|
|
except Exception as e:
|
|
|
tqdm.write(f"[Error] 处理 {os.path.basename(f)} 失败: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |