You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
538 lines
24 KiB
538 lines
24 KiB
#!/usr/bin/env python3
|
|
"""
|
|
中间处理过程测试文件
|
|
测试完整的数据处理流水线,展示每个中间步骤的处理结果
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
|
|
API_BASE = "http://127.0.0.1:8080/api"
|
|
|
|
class IntermediateProcessTest:
|
|
def __init__(self):
|
|
self.test_results = {
|
|
'pipeline_steps': [],
|
|
'data_samples': {},
|
|
'processing_summary': {}
|
|
}
|
|
|
|
def print_header(self, step_num, title, description=""):
|
|
"""打印步骤标题"""
|
|
print(f"\n{'='*80}")
|
|
print(f"🎯 步骤 {step_num}: {title}")
|
|
if description:
|
|
print(f"📝 说明: {description}")
|
|
print(f"{'='*80}")
|
|
|
|
def print_data_sample(self, data, max_items=3, max_text_length=100):
|
|
"""打印数据样例"""
|
|
if not data:
|
|
print(" 📋 暂无数据")
|
|
return
|
|
|
|
print(f" 📋 数据样例 (显示前{min(max_items, len(data))}条):")
|
|
for i, item in enumerate(data[:max_items], 1):
|
|
if isinstance(item, dict):
|
|
# 显示关键字段
|
|
key_fields = ['id', 'text', 'original_text', 'cleaned_text',
|
|
'preprocessed_text', 'merged_text', 'corrected_text',
|
|
'call_sign', 'behavior', 'Call Sign', 'Behavior']
|
|
|
|
displayed_fields = []
|
|
for field in key_fields:
|
|
if field in item and item[field]:
|
|
value = str(item[field])
|
|
if len(value) > max_text_length:
|
|
value = value[:max_text_length] + "..."
|
|
displayed_fields.append(f"{field}: {value}")
|
|
|
|
if displayed_fields:
|
|
print(f" {i}. {' | '.join(displayed_fields[:2])}")
|
|
else:
|
|
print(f" {i}. {item}")
|
|
else:
|
|
print(f" {i}. {item}")
|
|
|
|
def test_system_health(self):
|
|
"""检查系统健康状态"""
|
|
self.print_header(0, "系统健康检查", "确认Django服务器正常运行")
|
|
|
|
try:
|
|
response = requests.get(f"{API_BASE}/health/", timeout=5)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
print(f" ✅ Django服务器状态: {data['message']}")
|
|
return True
|
|
else:
|
|
print(f" ❌ 服务器响应异常: {response.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ❌ 无法连接服务器: {e}")
|
|
return False
|
|
|
|
def test_step_1_original_data(self):
|
|
"""步骤1: 查看原始数据"""
|
|
self.print_header(1, "原始数据 (prewashed_table)", "查看文件上传后的原始ATC对话数据")
|
|
|
|
try:
|
|
response = requests.get(f"{API_BASE}/original-data/", timeout=10)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = data['count']
|
|
records = data['data']
|
|
|
|
print(f" 📊 数据统计: {count} 条原始记录")
|
|
self.print_data_sample(records)
|
|
|
|
self.test_results['data_samples']['original'] = records[:3]
|
|
self.test_results['pipeline_steps'].append({
|
|
'step': 1,
|
|
'name': '原始数据',
|
|
'table': 'prewashed_table',
|
|
'count': count,
|
|
'status': 'success'
|
|
})
|
|
|
|
return True
|
|
else:
|
|
print(f" ❌ 获取原始数据失败: {response.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ❌ 原始数据检查异常: {e}")
|
|
return False
|
|
|
|
def test_step_2_preprocess(self):
|
|
"""步骤2: 数据预处理"""
|
|
self.print_header(2, "数据预处理 (preprocessed_table)", "执行数据清理和格式统一")
|
|
|
|
try:
|
|
# 执行预处理
|
|
print(" 🔧 执行预处理操作...")
|
|
response = requests.post(f"{API_BASE}/preprocess/", json={}, timeout=30)
|
|
|
|
if response.status_code == 200:
|
|
process_result = response.json()
|
|
print(f" ✅ 预处理完成: {process_result['message']}")
|
|
print(f" 📊 处理统计: 原始{process_result['data']['raw_count']}条 → 清理后{process_result['data']['processed_count']}条")
|
|
print(f" 📈 清理率: {process_result['data']['cleaning_rate']}%")
|
|
|
|
# 获取预处理结果
|
|
print("\n 📋 查看预处理结果...")
|
|
response = requests.get(f"{API_BASE}/preprocessed-data/", timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = data['count']
|
|
records = data['data']
|
|
|
|
print(f" 📊 预处理数据: {count} 条记录")
|
|
self.print_data_sample(records)
|
|
|
|
# 显示清理操作统计
|
|
if records:
|
|
cleaning_stats = {}
|
|
for record in records:
|
|
ops = record.get('cleaning_operations', '无需清理')
|
|
cleaning_stats[ops] = cleaning_stats.get(ops, 0) + 1
|
|
|
|
print(f" 🔧 清理操作统计:")
|
|
for op, count_op in cleaning_stats.items():
|
|
print(f" - {op}: {count_op} 条")
|
|
|
|
self.test_results['data_samples']['preprocessed'] = records[:3]
|
|
self.test_results['pipeline_steps'].append({
|
|
'step': 2,
|
|
'name': '数据预处理',
|
|
'table': 'preprocessed_table',
|
|
'count': count,
|
|
'status': 'success'
|
|
})
|
|
|
|
return True
|
|
else:
|
|
print(f" ❌ 获取预处理数据失败: {response.status_code}")
|
|
return False
|
|
else:
|
|
print(f" ❌ 预处理执行失败: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f" ❌ 预处理过程异常: {e}")
|
|
return False
|
|
|
|
def test_step_3_merge(self):
|
|
"""步骤3: 格式合并"""
|
|
self.print_header(3, "格式合并 (merged_table)", "统一航空术语和数据格式")
|
|
|
|
try:
|
|
# 执行格式合并
|
|
print(" 🔧 执行格式合并操作...")
|
|
response = requests.post(f"{API_BASE}/merge/", json={}, timeout=30)
|
|
|
|
if response.status_code == 200:
|
|
process_result = response.json()
|
|
print(f" ✅ 格式合并完成: {process_result['message']}")
|
|
print(f" 📊 合并统计: 总记录{process_result['data']['total_records']}条 → 合并{process_result['data']['merged_records']}条")
|
|
print(f" 📈 合并成功率: {process_result['data']['merge_success_rate']}%")
|
|
|
|
# 获取格式合并结果
|
|
print("\n 📋 查看格式合并结果...")
|
|
response = requests.get(f"{API_BASE}/merged-data/", timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = data['count']
|
|
records = data['data']
|
|
|
|
print(f" 📊 格式合并数据: {count} 条记录")
|
|
self.print_data_sample(records)
|
|
|
|
# 显示格式化操作统计
|
|
if records:
|
|
format_stats = {}
|
|
for record in records:
|
|
ops = record.get('format_operations', '无需格式化')
|
|
format_stats[ops] = format_stats.get(ops, 0) + 1
|
|
|
|
print(f" 🔧 格式化操作统计:")
|
|
for op, count_op in format_stats.items():
|
|
print(f" - {op}: {count_op} 条")
|
|
|
|
self.test_results['data_samples']['merged'] = records[:3]
|
|
self.test_results['pipeline_steps'].append({
|
|
'step': 3,
|
|
'name': '格式合并',
|
|
'table': 'merged_table',
|
|
'count': count,
|
|
'status': 'success'
|
|
})
|
|
|
|
return True
|
|
else:
|
|
print(f" ❌ 获取格式合并数据失败: {response.status_code}")
|
|
return False
|
|
else:
|
|
print(f" ❌ 格式合并执行失败: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f" ❌ 格式合并过程异常: {e}")
|
|
return False
|
|
|
|
def test_step_4_correct(self):
|
|
"""步骤4: 单词纠错"""
|
|
self.print_header(4, "单词纠错 (corrected_table)", "修正拼写错误和语音字母转换")
|
|
|
|
try:
|
|
# 执行单词纠错
|
|
print(" 🔧 执行单词纠错操作...")
|
|
response = requests.post(f"{API_BASE}/correct/", json={}, timeout=30)
|
|
|
|
if response.status_code == 200:
|
|
process_result = response.json()
|
|
print(f" ✅ 单词纠错完成: {process_result['message']}")
|
|
print(f" 📊 纠错统计: 总记录{process_result['data']['total_records']}条,纠正{process_result['data']['corrected_words']}个单词")
|
|
print(f" 🔧 纠错类型: {', '.join(process_result['data']['correction_types'])}")
|
|
|
|
# 获取单词纠错结果
|
|
print("\n 📋 查看单词纠错结果...")
|
|
response = requests.get(f"{API_BASE}/corrected-data/", timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = data['count']
|
|
records = data['data']
|
|
|
|
print(f" 📊 单词纠错数据: {count} 条记录")
|
|
self.print_data_sample(records)
|
|
|
|
# 显示纠错操作统计
|
|
if records:
|
|
correction_stats = {}
|
|
total_corrections = 0
|
|
for record in records:
|
|
ops = record.get('corrections_made', '无需纠错')
|
|
count_corrections = record.get('correction_count', 0)
|
|
total_corrections += count_corrections
|
|
correction_stats[ops] = correction_stats.get(ops, 0) + 1
|
|
|
|
print(f" 🔧 纠错操作统计 (总计纠正{total_corrections}处):")
|
|
for op, count_op in list(correction_stats.items())[:5]: # 只显示前5种
|
|
print(f" - {op}: {count_op} 条")
|
|
|
|
self.test_results['data_samples']['corrected'] = records[:3]
|
|
self.test_results['pipeline_steps'].append({
|
|
'step': 4,
|
|
'name': '单词纠错',
|
|
'table': 'corrected_table',
|
|
'count': count,
|
|
'status': 'success'
|
|
})
|
|
|
|
return True
|
|
else:
|
|
print(f" ❌ 获取单词纠错数据失败: {response.status_code}")
|
|
return False
|
|
else:
|
|
print(f" ❌ 单词纠错执行失败: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f" ❌ 单词纠错过程异常: {e}")
|
|
return False
|
|
|
|
def test_step_5_analyze(self):
|
|
"""步骤5: AI分析"""
|
|
self.print_header(5, "AI分析 (processed_table)", "使用AI提取结构化飞行信息")
|
|
|
|
try:
|
|
# 执行AI分析
|
|
print(" 🔧 执行AI分析操作...")
|
|
response = requests.post(f"{API_BASE}/analyze/", json={}, timeout=60)
|
|
|
|
if response.status_code == 200:
|
|
process_result = response.json()
|
|
print(f" ✅ AI分析完成: {process_result['message']}")
|
|
|
|
# 检查返回数据格式
|
|
if 'data' in process_result:
|
|
result_data = process_result['data']
|
|
|
|
# 处理不同的返回格式
|
|
if 'original_count' in result_data:
|
|
# 实际AI处理结果
|
|
print(f" 📊 AI处理统计:")
|
|
print(f" 原始记录: {result_data.get('original_count', 0)} 条")
|
|
print(f" 提取结果: {result_data.get('extracted_count', 0)} 条")
|
|
print(f" 有效数据: {result_data.get('valid_count', 0)} 条")
|
|
print(f" 提取效率: {result_data.get('extraction_rate', 0)}%")
|
|
print(f" 验证率: {result_data.get('validation_rate', 0)}%")
|
|
elif 'analysis_summary' in result_data:
|
|
# 模拟分析结果
|
|
summary = result_data['analysis_summary']
|
|
print(f" 📊 AI分析统计:")
|
|
print(f" 呼号提取: {summary.get('call_signs_extracted', 0)} 个")
|
|
print(f" 行为识别: {summary.get('behaviors_identified', 0)} 个")
|
|
print(f" 高度检测: {summary.get('flight_levels_detected', 0)} 个")
|
|
print(f" 位置识别: {summary.get('locations_identified', 0)} 个")
|
|
|
|
# 获取AI处理结果
|
|
print("\n 📋 查看AI处理结果...")
|
|
response = requests.get(f"{API_BASE}/processed-data/", timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = data['count']
|
|
records = data['data']
|
|
|
|
print(f" 📊 AI处理数据: {count} 条记录")
|
|
self.print_data_sample(records)
|
|
|
|
# 显示提取字段统计
|
|
if records:
|
|
field_stats = {}
|
|
for record in records:
|
|
for field in ['call_sign', 'behavior', 'flight_level', 'location', 'time']:
|
|
value = record.get(field)
|
|
if value and value not in ['N/A', 'null', None]:
|
|
field_stats[field] = field_stats.get(field, 0) + 1
|
|
|
|
print(f" 📈 AI提取字段统计:")
|
|
for field, count_field in field_stats.items():
|
|
percentage = round(count_field / count * 100, 1) if count > 0 else 0
|
|
print(f" - {field}: {count_field}/{count} ({percentage}%)")
|
|
|
|
self.test_results['data_samples']['processed'] = records[:3]
|
|
self.test_results['pipeline_steps'].append({
|
|
'step': 5,
|
|
'name': 'AI分析',
|
|
'table': 'processed_table',
|
|
'count': count,
|
|
'status': 'success'
|
|
})
|
|
|
|
return True
|
|
else:
|
|
print(f" ❌ 获取AI处理数据失败: {response.status_code}")
|
|
return False
|
|
else:
|
|
print(f" ❌ AI分析执行失败: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f" ❌ AI分析过程异常: {e}")
|
|
return False
|
|
|
|
def test_step_6_final_validation(self):
|
|
"""步骤6: 最终验证"""
|
|
self.print_header(6, "最终验证 (final_table + quarantine_table)", "验证数据有效性并分离异常数据")
|
|
|
|
try:
|
|
# 获取最终有效数据
|
|
print(" 📋 查看最终有效数据...")
|
|
response = requests.get(f"{API_BASE}/final-data/", timeout=10)
|
|
|
|
final_count = 0
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
final_count = data['count']
|
|
records = data['data']
|
|
|
|
print(f" 📊 最终有效数据: {final_count} 条记录")
|
|
self.print_data_sample(records)
|
|
|
|
self.test_results['data_samples']['final'] = records[:3]
|
|
else:
|
|
print(f" ❌ 获取最终数据失败: {response.status_code}")
|
|
|
|
# 获取异常数据
|
|
print("\n 📋 查看异常隔离数据...")
|
|
response = requests.get(f"{API_BASE}/quarantine-data/", timeout=10)
|
|
|
|
quarantine_count = 0
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
quarantine_count = data['count']
|
|
records = data['data']
|
|
|
|
print(f" 📊 异常隔离数据: {quarantine_count} 条记录")
|
|
if quarantine_count > 0:
|
|
self.print_data_sample(records)
|
|
else:
|
|
print(" ✅ 无异常数据,所有记录均通过验证")
|
|
|
|
self.test_results['data_samples']['quarantine'] = records[:3]
|
|
else:
|
|
print(f" ❌ 获取异常数据失败: {response.status_code}")
|
|
|
|
# 获取统计信息
|
|
print("\n 📋 查看处理统计...")
|
|
response = requests.get(f"{API_BASE}/statistics/", timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
stats = data['statistics']
|
|
|
|
print(f" 📈 完整流程统计:")
|
|
print(f" 原始数据: {stats.get('original_count', 0)} 条")
|
|
print(f" 提取数据: {stats.get('extracted_count', 0)} 条")
|
|
print(f" 有效数据: {stats.get('valid_count', 0)} 条")
|
|
print(f" 无效数据: {stats.get('invalid_count', 0)} 条")
|
|
print(f" 提取效率: {stats.get('extraction_rate', 0)}%")
|
|
print(f" 验证通过率: {stats.get('validation_rate', 0)}%")
|
|
|
|
self.test_results['processing_summary'] = stats
|
|
|
|
self.test_results['pipeline_steps'].append({
|
|
'step': 6,
|
|
'name': '最终验证',
|
|
'table': 'final_table + quarantine_table',
|
|
'count': f"{final_count} + {quarantine_count}",
|
|
'status': 'success'
|
|
})
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ❌ 最终验证过程异常: {e}")
|
|
return False
|
|
|
|
def generate_summary_report(self):
|
|
"""生成总结报告"""
|
|
print(f"\n{'='*80}")
|
|
print("📋 中间处理过程测试 - 总结报告")
|
|
print(f"{'='*80}")
|
|
|
|
# 流水线步骤总结
|
|
print(f"\n🔄 数据处理流水线总结:")
|
|
for step in self.test_results['pipeline_steps']:
|
|
status_icon = "✅" if step['status'] == 'success' else "❌"
|
|
print(f" {status_icon} 步骤{step['step']}: {step['name']} ({step['table']}) - {step['count']} 条")
|
|
|
|
# 数据质量分析
|
|
if 'processing_summary' in self.test_results and self.test_results['processing_summary']:
|
|
stats = self.test_results['processing_summary']
|
|
print(f"\n📊 数据质量分析:")
|
|
print(f" 📈 数据流向: {stats.get('original_count', 0)} → {stats.get('extracted_count', 0)} → {stats.get('valid_count', 0)}")
|
|
print(f" 🎯 提取效率: {stats.get('extraction_rate', 0)}%")
|
|
print(f" ✅ 验证通过率: {stats.get('validation_rate', 0)}%")
|
|
print(f" ❌ 错误率: {stats.get('error_rate', 0)}%")
|
|
|
|
# 关键成果
|
|
print(f"\n🎉 关键成果:")
|
|
print(f" ✅ 完整的6步数据处理流水线已验证")
|
|
print(f" ✅ 每个中间步骤都有对应的数据表存储")
|
|
print(f" ✅ 所有API接口均正常工作")
|
|
print(f" ✅ 数据在各步骤间正确传递和转换")
|
|
|
|
# 前端集成建议
|
|
print(f"\n💡 前端集成建议:")
|
|
print(f" 1. 使用分步处理模式展示中间过程")
|
|
print(f" 2. 每个步骤完成后调用对应的GET API获取结果")
|
|
print(f" 3. 展示数据转换过程和处理统计")
|
|
print(f" 4. 支持查看各步骤的详细数据样例")
|
|
|
|
# 保存详细报告
|
|
report_file = "/home/hzk/项目/moxun-1/test/intermediate_process_report.json"
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'test_time': datetime.now().isoformat(),
|
|
'results': self.test_results
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n📄 详细报告已保存: {report_file}")
|
|
|
|
return len([s for s in self.test_results['pipeline_steps'] if s['status'] == 'success']) >= 5
|
|
|
|
def run_complete_test(self):
|
|
"""运行完整的中间处理过程测试"""
|
|
print("🎯 开始中间处理过程测试")
|
|
print("📝 目标: 测试数据处理流水线的每个中间步骤")
|
|
print(f"🕒 测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
# 执行测试步骤
|
|
test_steps = [
|
|
self.test_system_health,
|
|
self.test_step_1_original_data,
|
|
self.test_step_2_preprocess,
|
|
self.test_step_3_merge,
|
|
self.test_step_4_correct,
|
|
self.test_step_5_analyze,
|
|
self.test_step_6_final_validation
|
|
]
|
|
|
|
success_count = 0
|
|
for step in test_steps:
|
|
try:
|
|
if step():
|
|
success_count += 1
|
|
time.sleep(1) # 步骤间暂停
|
|
except Exception as e:
|
|
print(f" ❌ 测试步骤异常: {e}")
|
|
|
|
# 生成总结报告
|
|
overall_success = self.generate_summary_report()
|
|
|
|
print(f"\n{'='*80}")
|
|
if overall_success:
|
|
print("🎉 中间处理过程测试成功完成!")
|
|
print("✅ 所有数据处理步骤均正常工作,可以向前端提供完整的中间过程展示")
|
|
else:
|
|
print("⚠️ 中间处理过程测试完成,但存在需要关注的问题")
|
|
|
|
return overall_success
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
tester = IntermediateProcessTest()
|
|
success = tester.run_complete_test()
|
|
|
|
return 0 if success else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main()) |