You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
moxun-1/test/complete_end_to_end_test.py

609 lines
26 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
完整端到端流程测试
验证从数据上传到最终结果的完整数据处理链路
"""
import os
import sys
import django
import requests
import json
import time
import io
from pathlib import Path
# 设置Django环境
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
django.setup()
from django.db import connection
API_BASE = "http://127.0.0.1:8080/api"
class CompleteEndToEndTest:
def __init__(self):
self.test_results = {
'system_health': {},
'data_upload': {},
'processing_pipeline': {},
'data_retrieval': {},
'final_verification': {}
}
self.test_data_file = None
def step_1_system_health_check(self):
"""步骤1: 系统健康检查"""
print("🚀 步骤1: 系统健康检查")
print("-" * 50)
try:
# 检查Django服务器
response = requests.get(f"{API_BASE}/health/", timeout=5)
if response.status_code == 200:
data = response.json()
print(f" ✓ Django服务器: {data['message']}")
# 检查数据库连接
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
print(" ✓ 数据库连接: 正常")
self.test_results['system_health'] = {
'django_server': True,
'database': True,
'status': 'healthy'
}
return True
else:
print(f" ✗ Django服务器响应异常: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 系统健康检查失败: {e}")
self.test_results['system_health'] = {
'status': 'failed',
'error': str(e)
}
return False
def step_2_prepare_test_data(self):
"""步骤2: 准备测试数据文件"""
print("\n📁 步骤2: 准备测试数据文件")
print("-" * 50)
# 创建真实的ATC对话CSV测试文件
test_data_content = """id,text
e2e_001,"CCA123 climb to flight level 350"
e2e_002,"China Southern 456 descend to flight level 280"
e2e_003,"Air China 789 turn left heading 270 degrees"
e2e_004,"Hainan Airlines 888 contact Shanghai approach 121.9"
e2e_005,"Spring Airlines 999 cleared for takeoff runway 36L"
e2e_006,"United 997 maintain flight level 330"
e2e_007,"Lufthansa 672 descending to flight level 250"
e2e_008,"Singapore Airlines 106 approach runway 18R"
e2e_009,"Japan Airlines 550 hold position"
e2e_010,"KLM 695 taxi to gate A15"
e2e_011,"Emirates 203 climb and maintain flight level 380"
e2e_012,"Cathay Pacific 747 contact tower 118.1"
e2e_013,"Turkish Airlines 1 turn right heading 090"
e2e_014,"Delta 88 cleared to land runway 24L"
e2e_015,"British Airways 15 hold short of runway 06R"
"""
try:
# 保存测试文件
self.test_data_file = Path(__file__).parent / "e2e_test_data.csv"
with open(self.test_data_file, 'w', encoding='utf-8') as f:
f.write(test_data_content)
print(f" ✓ 测试文件创建: {self.test_data_file}")
print(f" 📊 测试数据: 15条ATC对话记录")
print(" 📋 数据样例:")
lines = test_data_content.strip().split('\n')
for i, line in enumerate(lines[1:4], 1): # 显示前3行数据
parts = line.split(',', 1)
if len(parts) >= 2:
text = parts[1].strip('"')
print(f" {i}. {text}")
self.test_results['data_upload'] = {
'file_created': True,
'record_count': 15,
'file_path': str(self.test_data_file)
}
return True
except Exception as e:
print(f" ✗ 测试文件创建失败: {e}")
self.test_results['data_upload'] = {
'file_created': False,
'error': str(e)
}
return False
def step_3_file_upload(self):
"""步骤3: 文件上传测试"""
print("\n📤 步骤3: 文件上传测试")
print("-" * 50)
try:
# 上传文件到API
with open(self.test_data_file, 'rb') as f:
files = {'file': ('e2e_test_data.csv', f, 'text/csv')}
response = requests.post(f"{API_BASE}/upload/", files=files, timeout=30)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
print(f" ✓ 文件上传成功")
print(f" 📊 处理记录数: {data['data']['inserted_rows']}")
print(f" 📁 文件名: {data['data']['filename']}")
# 验证数据是否正确插入数据库
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM prewashed_table")
db_count = cursor.fetchone()[0]
print(f" 📋 数据库记录数: {db_count}")
# 显示插入的数据样例
cursor.execute("SELECT id, text FROM prewashed_table LIMIT 3")
samples = cursor.fetchall()
print(" 📋 数据库样例:")
for id_val, text_val in samples:
print(f" {id_val}: {text_val}")
self.test_results['data_upload'].update({
'upload_success': True,
'inserted_count': data['data']['inserted_rows'],
'db_verified_count': db_count
})
return True
else:
print(f" ✗ 文件上传失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 上传请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 文件上传过程失败: {e}")
self.test_results['data_upload'].update({
'upload_success': False,
'error': str(e)
})
return False
def step_4_processing_pipeline(self):
"""步骤4: 数据处理流水线测试"""
print("\n🔄 步骤4: 数据处理流水线测试")
print("-" * 50)
pipeline_steps = [
('预处理', 'preprocess'),
('格式合并', 'merge'),
('单词纠错', 'correct'),
('AI分析', 'analyze')
]
pipeline_results = {}
for step_name, endpoint in pipeline_steps:
try:
print(f" 🔧 执行{step_name}...")
response = requests.post(f"{API_BASE}/{endpoint}/",
json={},
headers={'Content-Type': 'application/json'},
timeout=30)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
print(f"{step_name}成功")
# 根据不同步骤显示不同的结果信息
if endpoint == 'preprocess':
processed_count = data['data']['processed_count']
cleaning_rate = data['data']['cleaning_rate']
print(f" 📊 处理记录: {processed_count}, 清理率: {cleaning_rate}%")
elif endpoint == 'merge':
merged_records = data['data']['merged_records']
success_rate = data['data']['merge_success_rate']
print(f" 📊 合并记录: {merged_records}, 成功率: {success_rate}%")
elif endpoint == 'correct':
corrected_words = data['data']['corrected_words']
print(f" 📊 纠正单词: {corrected_words}")
elif endpoint == 'analyze':
if 'analysis_summary' in data['data']:
# 模拟分析结果
summary = data['data']['analysis_summary']
print(f" 📊 呼号提取: {summary.get('call_signs_extracted', 0)}")
print(f" 📊 行为识别: {summary.get('behaviors_identified', 0)}")
else:
# 实际AI分析结果
print(f" 📊 AI分析完成")
pipeline_results[endpoint] = {
'success': True,
'data': data['data']
}
else:
print(f"{step_name}失败: {data.get('message', '未知错误')}")
pipeline_results[endpoint] = {
'success': False,
'error': data.get('message', '未知错误')
}
else:
print(f"{step_name}请求失败: {response.status_code}")
pipeline_results[endpoint] = {
'success': False,
'error': f"HTTP {response.status_code}"
}
# 步骤间暂停
time.sleep(1)
except Exception as e:
print(f"{step_name}异常: {e}")
pipeline_results[endpoint] = {
'success': False,
'error': str(e)
}
self.test_results['processing_pipeline'] = pipeline_results
# 检查是否所有步骤都成功
all_success = all(result.get('success', False) for result in pipeline_results.values())
if all_success:
print(f"\n 🎉 数据处理流水线全部完成!")
return True
else:
failed_steps = [step for step, result in pipeline_results.items() if not result.get('success', False)]
print(f"\n ⚠️ 部分步骤失败: {', '.join(failed_steps)}")
return False
def step_5_complete_processing(self):
"""步骤5: 完整处理流程测试"""
print("\n🚀 步骤5: 完整处理流程测试")
print("-" * 50)
try:
print(" 🔧 执行完整处理流程...")
response = requests.post(f"{API_BASE}/process-data/", timeout=60)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
print(" ✓ 完整处理流程成功")
print(f" 📊 原始记录: {data.get('original_count', 0)}")
print(f" 📊 提取结果: {data.get('extracted_count', 0)}")
print(f" 📊 处理记录: {data.get('processed_count', 0)}")
print(f" 📊 有效记录: {data.get('valid_count', 0)}")
print(f" 📊 无效记录: {data.get('invalid_count', 0)}")
print(f" 📊 提取率: {data.get('extraction_rate', 0)}%")
print(f" 📊 验证率: {data.get('validation_rate', 0)}%")
self.test_results['processing_pipeline']['complete_process'] = {
'success': True,
'metrics': {
'original_count': data.get('original_count', 0),
'extracted_count': data.get('extracted_count', 0),
'processed_count': data.get('processed_count', 0),
'valid_count': data.get('valid_count', 0),
'invalid_count': data.get('invalid_count', 0),
'extraction_rate': data.get('extraction_rate', 0),
'validation_rate': data.get('validation_rate', 0)
}
}
return True
else:
print(f" ✗ 完整处理失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 完整处理请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 完整处理异常: {e}")
return False
def step_6_data_retrieval_test(self):
"""步骤6: 数据获取测试"""
print("\n📋 步骤6: 数据获取API测试")
print("-" * 50)
data_apis = [
('原始数据', 'original-data'),
('处理数据', 'processed-data'),
('最终数据', 'final-data'),
('隔离数据', 'quarantine-data'),
('统计信息', 'statistics')
]
retrieval_results = {}
for api_name, endpoint in data_apis:
try:
print(f" 📊 测试{api_name}API...")
response = requests.get(f"{API_BASE}/{endpoint}/", timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
count = data.get('count', 0)
print(f"{api_name}: {count} 条记录")
# 显示数据样例
if endpoint == 'statistics':
stats = data.get('statistics', {})
print(f" 📈 提取率: {stats.get('extraction_rate', 0)}%")
print(f" 📈 验证率: {stats.get('validation_rate', 0)}%")
elif count > 0 and 'data' in data:
sample_count = min(3, len(data['data']))
print(f" 📋 样例数据 (显示{sample_count}条):")
for i, record in enumerate(data['data'][:sample_count], 1):
if endpoint in ['processed-data', 'final-data']:
call_sign = record.get('call_sign', 'N/A')
behavior = record.get('behavior', 'N/A')
print(f" {i}. 呼号: {call_sign} | 行为: {behavior}")
elif endpoint == 'original-data':
text = record.get('text', '')[:50] + '...' if len(record.get('text', '')) > 50 else record.get('text', '')
print(f" {i}. {record.get('id', 'N/A')}: {text}")
retrieval_results[endpoint] = {
'success': True,
'count': count,
'data_sample': data.get('data', [])[:3] if 'data' in data else []
}
else:
print(f" ⚠️ {api_name}: {data.get('message', '无数据')}")
retrieval_results[endpoint] = {
'success': False,
'message': data.get('message', '无数据')
}
else:
print(f"{api_name}请求失败: {response.status_code}")
retrieval_results[endpoint] = {
'success': False,
'error': f"HTTP {response.status_code}"
}
except Exception as e:
print(f"{api_name}异常: {e}")
retrieval_results[endpoint] = {
'success': False,
'error': str(e)
}
self.test_results['data_retrieval'] = retrieval_results
# 检查关键API是否成功
critical_apis = ['original-data', 'processed-data', 'statistics']
critical_success = all(retrieval_results.get(api, {}).get('success', False) for api in critical_apis)
if critical_success:
print(f"\n 🎉 数据获取API测试完成")
return True
else:
print(f"\n ⚠️ 部分关键API测试失败")
return False
def step_7_final_verification(self):
"""步骤7: 最终验证"""
print("\n✅ 步骤7: 最终系统验证")
print("-" * 50)
try:
# 获取最终统计信息
response = requests.get(f"{API_BASE}/statistics/", timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
stats = data['statistics']
print(" 📊 最终系统状态:")
print(f" 原始数据: {stats.get('original_count', 0)}")
print(f" 提取数据: {stats.get('extracted_count', 0)}")
print(f" 有效数据: {stats.get('valid_count', 0)}")
print(f" 无效数据: {stats.get('invalid_count', 0)}")
print(f" 提取效率: {stats.get('extraction_rate', 0)}%")
print(f" 验证通过率: {stats.get('validation_rate', 0)}%")
# 计算系统健康度
extraction_rate = stats.get('extraction_rate', 0)
validation_rate = stats.get('validation_rate', 0)
data_quality = (extraction_rate + validation_rate) / 2
print(f" 系统健康度: {data_quality:.1f}%")
# 验证数据库表状态
with connection.cursor() as cursor:
tables_status = {}
for table in ['prewashed_table', 'processed_table', 'final_table', 'quarantine_table']:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
tables_status[table] = count
print(f" {table}: {count}")
except Exception:
tables_status[table] = 0
print(f" {table}: 不存在或为空")
self.test_results['final_verification'] = {
'success': True,
'statistics': stats,
'tables_status': tables_status,
'system_health': data_quality
}
return True
else:
print(f" ✗ 统计信息获取失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 最终验证请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 最终验证异常: {e}")
self.test_results['final_verification'] = {
'success': False,
'error': str(e)
}
return False
def generate_comprehensive_report(self):
"""生成综合测试报告"""
print("\n" + "="*80)
print("📋 完整端到端测试 - 综合报告")
print("="*80)
# 测试步骤总结
test_steps = [
('系统健康检查', self.test_results.get('system_health', {})),
('数据文件上传', self.test_results.get('data_upload', {})),
('处理流水线', self.test_results.get('processing_pipeline', {})),
('数据获取API', self.test_results.get('data_retrieval', {})),
('最终验证', self.test_results.get('final_verification', {}))
]
passed_steps = 0
total_steps = len(test_steps)
print(f"\n🔍 测试步骤总结:")
for step_name, step_result in test_steps:
if step_result.get('success', False) or step_result.get('status') == 'healthy':
print(f"{step_name}: 通过")
passed_steps += 1
elif step_result:
print(f"{step_name}: 失败")
else:
print(f" ⏭️ {step_name}: 跳过")
success_rate = (passed_steps / total_steps * 100) if total_steps > 0 else 0
print(f"\n📊 总体成功率: {passed_steps}/{total_steps} ({success_rate:.1f}%)")
# 性能指标总结
final_verification = self.test_results.get('final_verification', {})
if final_verification.get('success'):
stats = final_verification.get('statistics', {})
print(f"\n📈 性能指标总结:")
print(f" 数据处理效率: {stats.get('extraction_rate', 0)}%")
print(f" 数据验证率: {stats.get('validation_rate', 0)}%")
print(f" 系统健康度: {final_verification.get('system_health', 0):.1f}%")
# 数据流总结
print(f"\n🔄 数据流总结:")
print(f" 原始数据 → 提取数据 → 有效数据")
print(f" {stats.get('original_count', 0)}{stats.get('extracted_count', 0)}{stats.get('valid_count', 0)}")
# 系统评级
print(f"\n🏆 系统评级:")
if success_rate >= 90:
grade = "A+"
status = "🏆 优秀!系统已达到生产级别"
elif success_rate >= 80:
grade = "A"
status = "✅ 良好!系统运行稳定"
elif success_rate >= 70:
grade = "B+"
status = "📈 可用!需要少量优化"
elif success_rate >= 60:
grade = "B"
status = "⚠️ 基本可用,需要改进"
else:
grade = "C"
status = "❌ 需要重大修复"
print(f" 等级: {grade}")
print(f" 状态: {status}")
# 保存详细报告
report_file = Path(__file__).parent / "complete_e2e_test_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(self.test_results, f, indent=2, ensure_ascii=False)
print(f"\n📄 详细报告已保存: {report_file}")
return success_rate >= 80
def cleanup(self):
"""清理测试文件"""
try:
if self.test_data_file and self.test_data_file.exists():
self.test_data_file.unlink()
print(f"🗑️ 清理测试文件: {self.test_data_file}")
except Exception as e:
print(f"⚠️ 清理文件失败: {e}")
def run_complete_test(self):
"""运行完整的端到端测试"""
print("🎯 开始完整端到端流程测试")
print("="*80)
print("📝 测试范围: 数据上传 → 处理流水线 → AI分析 → 数据获取 → 验证")
print("="*80)
try:
# 执行所有测试步骤
steps = [
self.step_1_system_health_check,
self.step_2_prepare_test_data,
self.step_3_file_upload,
self.step_4_processing_pipeline,
self.step_5_complete_processing,
self.step_6_data_retrieval_test,
self.step_7_final_verification
]
for step in steps:
if not step():
print(f"\n⚠️ 测试步骤失败,但继续执行后续步骤...")
time.sleep(1) # 步骤间暂停
# 生成综合报告
success = self.generate_comprehensive_report()
return success
except KeyboardInterrupt:
print(f"\n⏹️ 测试被用户中断")
return False
except Exception as e:
print(f"\n❌ 测试过程发生异常: {e}")
return False
finally:
self.cleanup()
def main():
"""主函数"""
tester = CompleteEndToEndTest()
success = tester.run_complete_test()
print("\n" + "="*80)
if success:
print("🎉 完整端到端测试成功完成!")
print("✅ 系统已准备好投入生产使用")
else:
print("⚠️ 端到端测试完成,但存在需要关注的问题")
print("💡 请查看详细报告了解具体问题")
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())