|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
完整端到端流程测试
|
|
|
验证从数据上传到最终结果的完整数据处理链路
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import django
|
|
|
import requests
|
|
|
import json
|
|
|
import time
|
|
|
import io
|
|
|
from pathlib import Path
|
|
|
|
|
|
# 设置Django环境
|
|
|
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
|
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
|
|
|
django.setup()
|
|
|
|
|
|
from django.db import connection
|
|
|
|
|
|
API_BASE = "http://127.0.0.1:8080/api"
|
|
|
|
|
|
class CompleteEndToEndTest:
|
|
|
def __init__(self):
|
|
|
self.test_results = {
|
|
|
'system_health': {},
|
|
|
'data_upload': {},
|
|
|
'processing_pipeline': {},
|
|
|
'data_retrieval': {},
|
|
|
'final_verification': {}
|
|
|
}
|
|
|
self.test_data_file = None
|
|
|
|
|
|
def step_1_system_health_check(self):
|
|
|
"""步骤1: 系统健康检查"""
|
|
|
print("🚀 步骤1: 系统健康检查")
|
|
|
print("-" * 50)
|
|
|
|
|
|
try:
|
|
|
# 检查Django服务器
|
|
|
response = requests.get(f"{API_BASE}/health/", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
print(f" ✓ Django服务器: {data['message']}")
|
|
|
|
|
|
# 检查数据库连接
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("SELECT 1")
|
|
|
cursor.fetchone()
|
|
|
print(" ✓ 数据库连接: 正常")
|
|
|
|
|
|
self.test_results['system_health'] = {
|
|
|
'django_server': True,
|
|
|
'database': True,
|
|
|
'status': 'healthy'
|
|
|
}
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ Django服务器响应异常: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 系统健康检查失败: {e}")
|
|
|
self.test_results['system_health'] = {
|
|
|
'status': 'failed',
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def step_2_prepare_test_data(self):
|
|
|
"""步骤2: 准备测试数据文件"""
|
|
|
print("\n📁 步骤2: 准备测试数据文件")
|
|
|
print("-" * 50)
|
|
|
|
|
|
# 创建真实的ATC对话CSV测试文件
|
|
|
test_data_content = """id,text
|
|
|
e2e_001,"CCA123 climb to flight level 350"
|
|
|
e2e_002,"China Southern 456 descend to flight level 280"
|
|
|
e2e_003,"Air China 789 turn left heading 270 degrees"
|
|
|
e2e_004,"Hainan Airlines 888 contact Shanghai approach 121.9"
|
|
|
e2e_005,"Spring Airlines 999 cleared for takeoff runway 36L"
|
|
|
e2e_006,"United 997 maintain flight level 330"
|
|
|
e2e_007,"Lufthansa 672 descending to flight level 250"
|
|
|
e2e_008,"Singapore Airlines 106 approach runway 18R"
|
|
|
e2e_009,"Japan Airlines 550 hold position"
|
|
|
e2e_010,"KLM 695 taxi to gate A15"
|
|
|
e2e_011,"Emirates 203 climb and maintain flight level 380"
|
|
|
e2e_012,"Cathay Pacific 747 contact tower 118.1"
|
|
|
e2e_013,"Turkish Airlines 1 turn right heading 090"
|
|
|
e2e_014,"Delta 88 cleared to land runway 24L"
|
|
|
e2e_015,"British Airways 15 hold short of runway 06R"
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
# 保存测试文件
|
|
|
self.test_data_file = Path(__file__).parent / "e2e_test_data.csv"
|
|
|
with open(self.test_data_file, 'w', encoding='utf-8') as f:
|
|
|
f.write(test_data_content)
|
|
|
|
|
|
print(f" ✓ 测试文件创建: {self.test_data_file}")
|
|
|
print(f" 📊 测试数据: 15条ATC对话记录")
|
|
|
print(" 📋 数据样例:")
|
|
|
lines = test_data_content.strip().split('\n')
|
|
|
for i, line in enumerate(lines[1:4], 1): # 显示前3行数据
|
|
|
parts = line.split(',', 1)
|
|
|
if len(parts) >= 2:
|
|
|
text = parts[1].strip('"')
|
|
|
print(f" {i}. {text}")
|
|
|
|
|
|
self.test_results['data_upload'] = {
|
|
|
'file_created': True,
|
|
|
'record_count': 15,
|
|
|
'file_path': str(self.test_data_file)
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 测试文件创建失败: {e}")
|
|
|
self.test_results['data_upload'] = {
|
|
|
'file_created': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def step_3_file_upload(self):
|
|
|
"""步骤3: 文件上传测试"""
|
|
|
print("\n📤 步骤3: 文件上传测试")
|
|
|
print("-" * 50)
|
|
|
|
|
|
try:
|
|
|
# 上传文件到API
|
|
|
with open(self.test_data_file, 'rb') as f:
|
|
|
files = {'file': ('e2e_test_data.csv', f, 'text/csv')}
|
|
|
response = requests.post(f"{API_BASE}/upload/", files=files, timeout=30)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
if data['status'] == 'success':
|
|
|
print(f" ✓ 文件上传成功")
|
|
|
print(f" 📊 处理记录数: {data['data']['inserted_rows']}")
|
|
|
print(f" 📁 文件名: {data['data']['filename']}")
|
|
|
|
|
|
# 验证数据是否正确插入数据库
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("SELECT COUNT(*) FROM prewashed_table")
|
|
|
db_count = cursor.fetchone()[0]
|
|
|
print(f" 📋 数据库记录数: {db_count}")
|
|
|
|
|
|
# 显示插入的数据样例
|
|
|
cursor.execute("SELECT id, text FROM prewashed_table LIMIT 3")
|
|
|
samples = cursor.fetchall()
|
|
|
print(" 📋 数据库样例:")
|
|
|
for id_val, text_val in samples:
|
|
|
print(f" {id_val}: {text_val}")
|
|
|
|
|
|
self.test_results['data_upload'].update({
|
|
|
'upload_success': True,
|
|
|
'inserted_count': data['data']['inserted_rows'],
|
|
|
'db_verified_count': db_count
|
|
|
})
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 文件上传失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 上传请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 文件上传过程失败: {e}")
|
|
|
self.test_results['data_upload'].update({
|
|
|
'upload_success': False,
|
|
|
'error': str(e)
|
|
|
})
|
|
|
return False
|
|
|
|
|
|
def step_4_processing_pipeline(self):
|
|
|
"""步骤4: 数据处理流水线测试"""
|
|
|
print("\n🔄 步骤4: 数据处理流水线测试")
|
|
|
print("-" * 50)
|
|
|
|
|
|
pipeline_steps = [
|
|
|
('预处理', 'preprocess'),
|
|
|
('格式合并', 'merge'),
|
|
|
('单词纠错', 'correct'),
|
|
|
('AI分析', 'analyze')
|
|
|
]
|
|
|
|
|
|
pipeline_results = {}
|
|
|
|
|
|
for step_name, endpoint in pipeline_steps:
|
|
|
try:
|
|
|
print(f" 🔧 执行{step_name}...")
|
|
|
response = requests.post(f"{API_BASE}/{endpoint}/",
|
|
|
json={},
|
|
|
headers={'Content-Type': 'application/json'},
|
|
|
timeout=30)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
if data['status'] == 'success':
|
|
|
print(f" ✓ {step_name}成功")
|
|
|
|
|
|
# 根据不同步骤显示不同的结果信息
|
|
|
if endpoint == 'preprocess':
|
|
|
processed_count = data['data']['processed_count']
|
|
|
cleaning_rate = data['data']['cleaning_rate']
|
|
|
print(f" 📊 处理记录: {processed_count}, 清理率: {cleaning_rate}%")
|
|
|
|
|
|
elif endpoint == 'merge':
|
|
|
merged_records = data['data']['merged_records']
|
|
|
success_rate = data['data']['merge_success_rate']
|
|
|
print(f" 📊 合并记录: {merged_records}, 成功率: {success_rate}%")
|
|
|
|
|
|
elif endpoint == 'correct':
|
|
|
corrected_words = data['data']['corrected_words']
|
|
|
print(f" 📊 纠正单词: {corrected_words}个")
|
|
|
|
|
|
elif endpoint == 'analyze':
|
|
|
if 'analysis_summary' in data['data']:
|
|
|
# 模拟分析结果
|
|
|
summary = data['data']['analysis_summary']
|
|
|
print(f" 📊 呼号提取: {summary.get('call_signs_extracted', 0)}")
|
|
|
print(f" 📊 行为识别: {summary.get('behaviors_identified', 0)}")
|
|
|
else:
|
|
|
# 实际AI分析结果
|
|
|
print(f" 📊 AI分析完成")
|
|
|
|
|
|
pipeline_results[endpoint] = {
|
|
|
'success': True,
|
|
|
'data': data['data']
|
|
|
}
|
|
|
else:
|
|
|
print(f" ✗ {step_name}失败: {data.get('message', '未知错误')}")
|
|
|
pipeline_results[endpoint] = {
|
|
|
'success': False,
|
|
|
'error': data.get('message', '未知错误')
|
|
|
}
|
|
|
else:
|
|
|
print(f" ✗ {step_name}请求失败: {response.status_code}")
|
|
|
pipeline_results[endpoint] = {
|
|
|
'success': False,
|
|
|
'error': f"HTTP {response.status_code}"
|
|
|
}
|
|
|
|
|
|
# 步骤间暂停
|
|
|
time.sleep(1)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ {step_name}异常: {e}")
|
|
|
pipeline_results[endpoint] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
|
|
|
self.test_results['processing_pipeline'] = pipeline_results
|
|
|
|
|
|
# 检查是否所有步骤都成功
|
|
|
all_success = all(result.get('success', False) for result in pipeline_results.values())
|
|
|
|
|
|
if all_success:
|
|
|
print(f"\n 🎉 数据处理流水线全部完成!")
|
|
|
return True
|
|
|
else:
|
|
|
failed_steps = [step for step, result in pipeline_results.items() if not result.get('success', False)]
|
|
|
print(f"\n ⚠️ 部分步骤失败: {', '.join(failed_steps)}")
|
|
|
return False
|
|
|
|
|
|
def step_5_complete_processing(self):
|
|
|
"""步骤5: 完整处理流程测试"""
|
|
|
print("\n🚀 步骤5: 完整处理流程测试")
|
|
|
print("-" * 50)
|
|
|
|
|
|
try:
|
|
|
print(" 🔧 执行完整处理流程...")
|
|
|
response = requests.post(f"{API_BASE}/process-data/", timeout=60)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
print(" ✓ 完整处理流程成功")
|
|
|
print(f" 📊 原始记录: {data.get('original_count', 0)}")
|
|
|
print(f" 📊 提取结果: {data.get('extracted_count', 0)}")
|
|
|
print(f" 📊 处理记录: {data.get('processed_count', 0)}")
|
|
|
print(f" 📊 有效记录: {data.get('valid_count', 0)}")
|
|
|
print(f" 📊 无效记录: {data.get('invalid_count', 0)}")
|
|
|
print(f" 📊 提取率: {data.get('extraction_rate', 0)}%")
|
|
|
print(f" 📊 验证率: {data.get('validation_rate', 0)}%")
|
|
|
|
|
|
self.test_results['processing_pipeline']['complete_process'] = {
|
|
|
'success': True,
|
|
|
'metrics': {
|
|
|
'original_count': data.get('original_count', 0),
|
|
|
'extracted_count': data.get('extracted_count', 0),
|
|
|
'processed_count': data.get('processed_count', 0),
|
|
|
'valid_count': data.get('valid_count', 0),
|
|
|
'invalid_count': data.get('invalid_count', 0),
|
|
|
'extraction_rate': data.get('extraction_rate', 0),
|
|
|
'validation_rate': data.get('validation_rate', 0)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 完整处理失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 完整处理请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 完整处理异常: {e}")
|
|
|
return False
|
|
|
|
|
|
def step_6_data_retrieval_test(self):
|
|
|
"""步骤6: 数据获取测试"""
|
|
|
print("\n📋 步骤6: 数据获取API测试")
|
|
|
print("-" * 50)
|
|
|
|
|
|
data_apis = [
|
|
|
('原始数据', 'original-data'),
|
|
|
('处理数据', 'processed-data'),
|
|
|
('最终数据', 'final-data'),
|
|
|
('隔离数据', 'quarantine-data'),
|
|
|
('统计信息', 'statistics')
|
|
|
]
|
|
|
|
|
|
retrieval_results = {}
|
|
|
|
|
|
for api_name, endpoint in data_apis:
|
|
|
try:
|
|
|
print(f" 📊 测试{api_name}API...")
|
|
|
response = requests.get(f"{API_BASE}/{endpoint}/", timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
count = data.get('count', 0)
|
|
|
print(f" ✓ {api_name}: {count} 条记录")
|
|
|
|
|
|
# 显示数据样例
|
|
|
if endpoint == 'statistics':
|
|
|
stats = data.get('statistics', {})
|
|
|
print(f" 📈 提取率: {stats.get('extraction_rate', 0)}%")
|
|
|
print(f" 📈 验证率: {stats.get('validation_rate', 0)}%")
|
|
|
elif count > 0 and 'data' in data:
|
|
|
sample_count = min(3, len(data['data']))
|
|
|
print(f" 📋 样例数据 (显示{sample_count}条):")
|
|
|
for i, record in enumerate(data['data'][:sample_count], 1):
|
|
|
if endpoint in ['processed-data', 'final-data']:
|
|
|
call_sign = record.get('call_sign', 'N/A')
|
|
|
behavior = record.get('behavior', 'N/A')
|
|
|
print(f" {i}. 呼号: {call_sign} | 行为: {behavior}")
|
|
|
elif endpoint == 'original-data':
|
|
|
text = record.get('text', '')[:50] + '...' if len(record.get('text', '')) > 50 else record.get('text', '')
|
|
|
print(f" {i}. {record.get('id', 'N/A')}: {text}")
|
|
|
|
|
|
retrieval_results[endpoint] = {
|
|
|
'success': True,
|
|
|
'count': count,
|
|
|
'data_sample': data.get('data', [])[:3] if 'data' in data else []
|
|
|
}
|
|
|
else:
|
|
|
print(f" ⚠️ {api_name}: {data.get('message', '无数据')}")
|
|
|
retrieval_results[endpoint] = {
|
|
|
'success': False,
|
|
|
'message': data.get('message', '无数据')
|
|
|
}
|
|
|
else:
|
|
|
print(f" ✗ {api_name}请求失败: {response.status_code}")
|
|
|
retrieval_results[endpoint] = {
|
|
|
'success': False,
|
|
|
'error': f"HTTP {response.status_code}"
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ {api_name}异常: {e}")
|
|
|
retrieval_results[endpoint] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
|
|
|
self.test_results['data_retrieval'] = retrieval_results
|
|
|
|
|
|
# 检查关键API是否成功
|
|
|
critical_apis = ['original-data', 'processed-data', 'statistics']
|
|
|
critical_success = all(retrieval_results.get(api, {}).get('success', False) for api in critical_apis)
|
|
|
|
|
|
if critical_success:
|
|
|
print(f"\n 🎉 数据获取API测试完成!")
|
|
|
return True
|
|
|
else:
|
|
|
print(f"\n ⚠️ 部分关键API测试失败")
|
|
|
return False
|
|
|
|
|
|
def step_7_final_verification(self):
|
|
|
"""步骤7: 最终验证"""
|
|
|
print("\n✅ 步骤7: 最终系统验证")
|
|
|
print("-" * 50)
|
|
|
|
|
|
try:
|
|
|
# 获取最终统计信息
|
|
|
response = requests.get(f"{API_BASE}/statistics/", timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
stats = data['statistics']
|
|
|
|
|
|
print(" 📊 最终系统状态:")
|
|
|
print(f" 原始数据: {stats.get('original_count', 0)} 条")
|
|
|
print(f" 提取数据: {stats.get('extracted_count', 0)} 条")
|
|
|
print(f" 有效数据: {stats.get('valid_count', 0)} 条")
|
|
|
print(f" 无效数据: {stats.get('invalid_count', 0)} 条")
|
|
|
print(f" 提取效率: {stats.get('extraction_rate', 0)}%")
|
|
|
print(f" 验证通过率: {stats.get('validation_rate', 0)}%")
|
|
|
|
|
|
# 计算系统健康度
|
|
|
extraction_rate = stats.get('extraction_rate', 0)
|
|
|
validation_rate = stats.get('validation_rate', 0)
|
|
|
data_quality = (extraction_rate + validation_rate) / 2
|
|
|
|
|
|
print(f" 系统健康度: {data_quality:.1f}%")
|
|
|
|
|
|
# 验证数据库表状态
|
|
|
with connection.cursor() as cursor:
|
|
|
tables_status = {}
|
|
|
|
|
|
for table in ['prewashed_table', 'processed_table', 'final_table', 'quarantine_table']:
|
|
|
try:
|
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
|
count = cursor.fetchone()[0]
|
|
|
tables_status[table] = count
|
|
|
print(f" {table}: {count} 条")
|
|
|
except Exception:
|
|
|
tables_status[table] = 0
|
|
|
print(f" {table}: 不存在或为空")
|
|
|
|
|
|
self.test_results['final_verification'] = {
|
|
|
'success': True,
|
|
|
'statistics': stats,
|
|
|
'tables_status': tables_status,
|
|
|
'system_health': data_quality
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 统计信息获取失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 最终验证请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 最终验证异常: {e}")
|
|
|
self.test_results['final_verification'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def generate_comprehensive_report(self):
|
|
|
"""生成综合测试报告"""
|
|
|
print("\n" + "="*80)
|
|
|
print("📋 完整端到端测试 - 综合报告")
|
|
|
print("="*80)
|
|
|
|
|
|
# 测试步骤总结
|
|
|
test_steps = [
|
|
|
('系统健康检查', self.test_results.get('system_health', {})),
|
|
|
('数据文件上传', self.test_results.get('data_upload', {})),
|
|
|
('处理流水线', self.test_results.get('processing_pipeline', {})),
|
|
|
('数据获取API', self.test_results.get('data_retrieval', {})),
|
|
|
('最终验证', self.test_results.get('final_verification', {}))
|
|
|
]
|
|
|
|
|
|
passed_steps = 0
|
|
|
total_steps = len(test_steps)
|
|
|
|
|
|
print(f"\n🔍 测试步骤总结:")
|
|
|
for step_name, step_result in test_steps:
|
|
|
if step_result.get('success', False) or step_result.get('status') == 'healthy':
|
|
|
print(f" ✅ {step_name}: 通过")
|
|
|
passed_steps += 1
|
|
|
elif step_result:
|
|
|
print(f" ❌ {step_name}: 失败")
|
|
|
else:
|
|
|
print(f" ⏭️ {step_name}: 跳过")
|
|
|
|
|
|
success_rate = (passed_steps / total_steps * 100) if total_steps > 0 else 0
|
|
|
print(f"\n📊 总体成功率: {passed_steps}/{total_steps} ({success_rate:.1f}%)")
|
|
|
|
|
|
# 性能指标总结
|
|
|
final_verification = self.test_results.get('final_verification', {})
|
|
|
if final_verification.get('success'):
|
|
|
stats = final_verification.get('statistics', {})
|
|
|
|
|
|
print(f"\n📈 性能指标总结:")
|
|
|
print(f" 数据处理效率: {stats.get('extraction_rate', 0)}%")
|
|
|
print(f" 数据验证率: {stats.get('validation_rate', 0)}%")
|
|
|
print(f" 系统健康度: {final_verification.get('system_health', 0):.1f}%")
|
|
|
|
|
|
# 数据流总结
|
|
|
print(f"\n🔄 数据流总结:")
|
|
|
print(f" 原始数据 → 提取数据 → 有效数据")
|
|
|
print(f" {stats.get('original_count', 0)} → {stats.get('extracted_count', 0)} → {stats.get('valid_count', 0)}")
|
|
|
|
|
|
# 系统评级
|
|
|
print(f"\n🏆 系统评级:")
|
|
|
if success_rate >= 90:
|
|
|
grade = "A+"
|
|
|
status = "🏆 优秀!系统已达到生产级别"
|
|
|
elif success_rate >= 80:
|
|
|
grade = "A"
|
|
|
status = "✅ 良好!系统运行稳定"
|
|
|
elif success_rate >= 70:
|
|
|
grade = "B+"
|
|
|
status = "📈 可用!需要少量优化"
|
|
|
elif success_rate >= 60:
|
|
|
grade = "B"
|
|
|
status = "⚠️ 基本可用,需要改进"
|
|
|
else:
|
|
|
grade = "C"
|
|
|
status = "❌ 需要重大修复"
|
|
|
|
|
|
print(f" 等级: {grade}")
|
|
|
print(f" 状态: {status}")
|
|
|
|
|
|
# 保存详细报告
|
|
|
report_file = Path(__file__).parent / "complete_e2e_test_report.json"
|
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(self.test_results, f, indent=2, ensure_ascii=False)
|
|
|
print(f"\n📄 详细报告已保存: {report_file}")
|
|
|
|
|
|
return success_rate >= 80
|
|
|
|
|
|
def cleanup(self):
|
|
|
"""清理测试文件"""
|
|
|
try:
|
|
|
if self.test_data_file and self.test_data_file.exists():
|
|
|
self.test_data_file.unlink()
|
|
|
print(f"🗑️ 清理测试文件: {self.test_data_file}")
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ 清理文件失败: {e}")
|
|
|
|
|
|
def run_complete_test(self):
|
|
|
"""运行完整的端到端测试"""
|
|
|
print("🎯 开始完整端到端流程测试")
|
|
|
print("="*80)
|
|
|
print("📝 测试范围: 数据上传 → 处理流水线 → AI分析 → 数据获取 → 验证")
|
|
|
print("="*80)
|
|
|
|
|
|
try:
|
|
|
# 执行所有测试步骤
|
|
|
steps = [
|
|
|
self.step_1_system_health_check,
|
|
|
self.step_2_prepare_test_data,
|
|
|
self.step_3_file_upload,
|
|
|
self.step_4_processing_pipeline,
|
|
|
self.step_5_complete_processing,
|
|
|
self.step_6_data_retrieval_test,
|
|
|
self.step_7_final_verification
|
|
|
]
|
|
|
|
|
|
for step in steps:
|
|
|
if not step():
|
|
|
print(f"\n⚠️ 测试步骤失败,但继续执行后续步骤...")
|
|
|
time.sleep(1) # 步骤间暂停
|
|
|
|
|
|
# 生成综合报告
|
|
|
success = self.generate_comprehensive_report()
|
|
|
|
|
|
return success
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
print(f"\n⏹️ 测试被用户中断")
|
|
|
return False
|
|
|
except Exception as e:
|
|
|
print(f"\n❌ 测试过程发生异常: {e}")
|
|
|
return False
|
|
|
finally:
|
|
|
self.cleanup()
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""主函数"""
|
|
|
tester = CompleteEndToEndTest()
|
|
|
success = tester.run_complete_test()
|
|
|
|
|
|
print("\n" + "="*80)
|
|
|
if success:
|
|
|
print("🎉 完整端到端测试成功完成!")
|
|
|
print("✅ 系统已准备好投入生产使用")
|
|
|
else:
|
|
|
print("⚠️ 端到端测试完成,但存在需要关注的问题")
|
|
|
print("💡 请查看详细报告了解具体问题")
|
|
|
|
|
|
return 0 if success else 1
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
sys.exit(main()) |