#!/usr/bin/env python3 """ 完整端到端流程测试 验证从数据上传到最终结果的完整数据处理链路 """ import os import sys import django import requests import json import time import io from pathlib import Path # 设置Django环境 sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') django.setup() from django.db import connection API_BASE = "http://127.0.0.1:8080/api" class CompleteEndToEndTest: def __init__(self): self.test_results = { 'system_health': {}, 'data_upload': {}, 'processing_pipeline': {}, 'data_retrieval': {}, 'final_verification': {} } self.test_data_file = None def step_1_system_health_check(self): """步骤1: 系统健康检查""" print("🚀 步骤1: 系统健康检查") print("-" * 50) try: # 检查Django服务器 response = requests.get(f"{API_BASE}/health/", timeout=5) if response.status_code == 200: data = response.json() print(f" ✓ Django服务器: {data['message']}") # 检查数据库连接 with connection.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() print(" ✓ 数据库连接: 正常") self.test_results['system_health'] = { 'django_server': True, 'database': True, 'status': 'healthy' } return True else: print(f" ✗ Django服务器响应异常: {response.status_code}") return False except Exception as e: print(f" ✗ 系统健康检查失败: {e}") self.test_results['system_health'] = { 'status': 'failed', 'error': str(e) } return False def step_2_prepare_test_data(self): """步骤2: 准备测试数据文件""" print("\n📁 步骤2: 准备测试数据文件") print("-" * 50) # 创建真实的ATC对话CSV测试文件 test_data_content = """id,text e2e_001,"CCA123 climb to flight level 350" e2e_002,"China Southern 456 descend to flight level 280" e2e_003,"Air China 789 turn left heading 270 degrees" e2e_004,"Hainan Airlines 888 contact Shanghai approach 121.9" e2e_005,"Spring Airlines 999 cleared for takeoff runway 36L" e2e_006,"United 997 maintain flight level 330" e2e_007,"Lufthansa 672 descending to flight level 250" e2e_008,"Singapore Airlines 106 approach runway 18R" e2e_009,"Japan Airlines 550 hold position" e2e_010,"KLM 695 taxi to gate A15" e2e_011,"Emirates 203 climb and maintain flight level 380" e2e_012,"Cathay Pacific 747 contact tower 118.1" e2e_013,"Turkish Airlines 1 turn right heading 090" e2e_014,"Delta 88 cleared to land runway 24L" e2e_015,"British Airways 15 hold short of runway 06R" """ try: # 保存测试文件 self.test_data_file = Path(__file__).parent / "e2e_test_data.csv" with open(self.test_data_file, 'w', encoding='utf-8') as f: f.write(test_data_content) print(f" ✓ 测试文件创建: {self.test_data_file}") print(f" 📊 测试数据: 15条ATC对话记录") print(" 📋 数据样例:") lines = test_data_content.strip().split('\n') for i, line in enumerate(lines[1:4], 1): # 显示前3行数据 parts = line.split(',', 1) if len(parts) >= 2: text = parts[1].strip('"') print(f" {i}. {text}") self.test_results['data_upload'] = { 'file_created': True, 'record_count': 15, 'file_path': str(self.test_data_file) } return True except Exception as e: print(f" ✗ 测试文件创建失败: {e}") self.test_results['data_upload'] = { 'file_created': False, 'error': str(e) } return False def step_3_file_upload(self): """步骤3: 文件上传测试""" print("\n📤 步骤3: 文件上传测试") print("-" * 50) try: # 上传文件到API with open(self.test_data_file, 'rb') as f: files = {'file': ('e2e_test_data.csv', f, 'text/csv')} response = requests.post(f"{API_BASE}/upload/", files=files, timeout=30) if response.status_code == 200: data = response.json() if data['status'] == 'success': print(f" ✓ 文件上传成功") print(f" 📊 处理记录数: {data['data']['inserted_rows']}") print(f" 📁 文件名: {data['data']['filename']}") # 验证数据是否正确插入数据库 with connection.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM prewashed_table") db_count = cursor.fetchone()[0] print(f" 📋 数据库记录数: {db_count}") # 显示插入的数据样例 cursor.execute("SELECT id, text FROM prewashed_table LIMIT 3") samples = cursor.fetchall() print(" 📋 数据库样例:") for id_val, text_val in samples: print(f" {id_val}: {text_val}") self.test_results['data_upload'].update({ 'upload_success': True, 'inserted_count': data['data']['inserted_rows'], 'db_verified_count': db_count }) return True else: print(f" ✗ 文件上传失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 上传请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 文件上传过程失败: {e}") self.test_results['data_upload'].update({ 'upload_success': False, 'error': str(e) }) return False def step_4_processing_pipeline(self): """步骤4: 数据处理流水线测试""" print("\n🔄 步骤4: 数据处理流水线测试") print("-" * 50) pipeline_steps = [ ('预处理', 'preprocess'), ('格式合并', 'merge'), ('单词纠错', 'correct'), ('AI分析', 'analyze') ] pipeline_results = {} for step_name, endpoint in pipeline_steps: try: print(f" 🔧 执行{step_name}...") response = requests.post(f"{API_BASE}/{endpoint}/", json={}, headers={'Content-Type': 'application/json'}, timeout=30) if response.status_code == 200: data = response.json() if data['status'] == 'success': print(f" ✓ {step_name}成功") # 根据不同步骤显示不同的结果信息 if endpoint == 'preprocess': processed_count = data['data']['processed_count'] cleaning_rate = data['data']['cleaning_rate'] print(f" 📊 处理记录: {processed_count}, 清理率: {cleaning_rate}%") elif endpoint == 'merge': merged_records = data['data']['merged_records'] success_rate = data['data']['merge_success_rate'] print(f" 📊 合并记录: {merged_records}, 成功率: {success_rate}%") elif endpoint == 'correct': corrected_words = data['data']['corrected_words'] print(f" 📊 纠正单词: {corrected_words}个") elif endpoint == 'analyze': if 'analysis_summary' in data['data']: # 模拟分析结果 summary = data['data']['analysis_summary'] print(f" 📊 呼号提取: {summary.get('call_signs_extracted', 0)}") print(f" 📊 行为识别: {summary.get('behaviors_identified', 0)}") else: # 实际AI分析结果 print(f" 📊 AI分析完成") pipeline_results[endpoint] = { 'success': True, 'data': data['data'] } else: print(f" ✗ {step_name}失败: {data.get('message', '未知错误')}") pipeline_results[endpoint] = { 'success': False, 'error': data.get('message', '未知错误') } else: print(f" ✗ {step_name}请求失败: {response.status_code}") pipeline_results[endpoint] = { 'success': False, 'error': f"HTTP {response.status_code}" } # 步骤间暂停 time.sleep(1) except Exception as e: print(f" ✗ {step_name}异常: {e}") pipeline_results[endpoint] = { 'success': False, 'error': str(e) } self.test_results['processing_pipeline'] = pipeline_results # 检查是否所有步骤都成功 all_success = all(result.get('success', False) for result in pipeline_results.values()) if all_success: print(f"\n 🎉 数据处理流水线全部完成!") return True else: failed_steps = [step for step, result in pipeline_results.items() if not result.get('success', False)] print(f"\n ⚠️ 部分步骤失败: {', '.join(failed_steps)}") return False def step_5_complete_processing(self): """步骤5: 完整处理流程测试""" print("\n🚀 步骤5: 完整处理流程测试") print("-" * 50) try: print(" 🔧 执行完整处理流程...") response = requests.post(f"{API_BASE}/process-data/", timeout=60) if response.status_code == 200: data = response.json() if data['status'] == 'success': print(" ✓ 完整处理流程成功") print(f" 📊 原始记录: {data.get('original_count', 0)}") print(f" 📊 提取结果: {data.get('extracted_count', 0)}") print(f" 📊 处理记录: {data.get('processed_count', 0)}") print(f" 📊 有效记录: {data.get('valid_count', 0)}") print(f" 📊 无效记录: {data.get('invalid_count', 0)}") print(f" 📊 提取率: {data.get('extraction_rate', 0)}%") print(f" 📊 验证率: {data.get('validation_rate', 0)}%") self.test_results['processing_pipeline']['complete_process'] = { 'success': True, 'metrics': { 'original_count': data.get('original_count', 0), 'extracted_count': data.get('extracted_count', 0), 'processed_count': data.get('processed_count', 0), 'valid_count': data.get('valid_count', 0), 'invalid_count': data.get('invalid_count', 0), 'extraction_rate': data.get('extraction_rate', 0), 'validation_rate': data.get('validation_rate', 0) } } return True else: print(f" ✗ 完整处理失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 完整处理请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 完整处理异常: {e}") return False def step_6_data_retrieval_test(self): """步骤6: 数据获取测试""" print("\n📋 步骤6: 数据获取API测试") print("-" * 50) data_apis = [ ('原始数据', 'original-data'), ('处理数据', 'processed-data'), ('最终数据', 'final-data'), ('隔离数据', 'quarantine-data'), ('统计信息', 'statistics') ] retrieval_results = {} for api_name, endpoint in data_apis: try: print(f" 📊 测试{api_name}API...") response = requests.get(f"{API_BASE}/{endpoint}/", timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': count = data.get('count', 0) print(f" ✓ {api_name}: {count} 条记录") # 显示数据样例 if endpoint == 'statistics': stats = data.get('statistics', {}) print(f" 📈 提取率: {stats.get('extraction_rate', 0)}%") print(f" 📈 验证率: {stats.get('validation_rate', 0)}%") elif count > 0 and 'data' in data: sample_count = min(3, len(data['data'])) print(f" 📋 样例数据 (显示{sample_count}条):") for i, record in enumerate(data['data'][:sample_count], 1): if endpoint in ['processed-data', 'final-data']: call_sign = record.get('call_sign', 'N/A') behavior = record.get('behavior', 'N/A') print(f" {i}. 呼号: {call_sign} | 行为: {behavior}") elif endpoint == 'original-data': text = record.get('text', '')[:50] + '...' if len(record.get('text', '')) > 50 else record.get('text', '') print(f" {i}. {record.get('id', 'N/A')}: {text}") retrieval_results[endpoint] = { 'success': True, 'count': count, 'data_sample': data.get('data', [])[:3] if 'data' in data else [] } else: print(f" ⚠️ {api_name}: {data.get('message', '无数据')}") retrieval_results[endpoint] = { 'success': False, 'message': data.get('message', '无数据') } else: print(f" ✗ {api_name}请求失败: {response.status_code}") retrieval_results[endpoint] = { 'success': False, 'error': f"HTTP {response.status_code}" } except Exception as e: print(f" ✗ {api_name}异常: {e}") retrieval_results[endpoint] = { 'success': False, 'error': str(e) } self.test_results['data_retrieval'] = retrieval_results # 检查关键API是否成功 critical_apis = ['original-data', 'processed-data', 'statistics'] critical_success = all(retrieval_results.get(api, {}).get('success', False) for api in critical_apis) if critical_success: print(f"\n 🎉 数据获取API测试完成!") return True else: print(f"\n ⚠️ 部分关键API测试失败") return False def step_7_final_verification(self): """步骤7: 最终验证""" print("\n✅ 步骤7: 最终系统验证") print("-" * 50) try: # 获取最终统计信息 response = requests.get(f"{API_BASE}/statistics/", timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': stats = data['statistics'] print(" 📊 最终系统状态:") print(f" 原始数据: {stats.get('original_count', 0)} 条") print(f" 提取数据: {stats.get('extracted_count', 0)} 条") print(f" 有效数据: {stats.get('valid_count', 0)} 条") print(f" 无效数据: {stats.get('invalid_count', 0)} 条") print(f" 提取效率: {stats.get('extraction_rate', 0)}%") print(f" 验证通过率: {stats.get('validation_rate', 0)}%") # 计算系统健康度 extraction_rate = stats.get('extraction_rate', 0) validation_rate = stats.get('validation_rate', 0) data_quality = (extraction_rate + validation_rate) / 2 print(f" 系统健康度: {data_quality:.1f}%") # 验证数据库表状态 with connection.cursor() as cursor: tables_status = {} for table in ['prewashed_table', 'processed_table', 'final_table', 'quarantine_table']: try: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] tables_status[table] = count print(f" {table}: {count} 条") except Exception: tables_status[table] = 0 print(f" {table}: 不存在或为空") self.test_results['final_verification'] = { 'success': True, 'statistics': stats, 'tables_status': tables_status, 'system_health': data_quality } return True else: print(f" ✗ 统计信息获取失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 最终验证请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 最终验证异常: {e}") self.test_results['final_verification'] = { 'success': False, 'error': str(e) } return False def generate_comprehensive_report(self): """生成综合测试报告""" print("\n" + "="*80) print("📋 完整端到端测试 - 综合报告") print("="*80) # 测试步骤总结 test_steps = [ ('系统健康检查', self.test_results.get('system_health', {})), ('数据文件上传', self.test_results.get('data_upload', {})), ('处理流水线', self.test_results.get('processing_pipeline', {})), ('数据获取API', self.test_results.get('data_retrieval', {})), ('最终验证', self.test_results.get('final_verification', {})) ] passed_steps = 0 total_steps = len(test_steps) print(f"\n🔍 测试步骤总结:") for step_name, step_result in test_steps: if step_result.get('success', False) or step_result.get('status') == 'healthy': print(f" ✅ {step_name}: 通过") passed_steps += 1 elif step_result: print(f" ❌ {step_name}: 失败") else: print(f" ⏭️ {step_name}: 跳过") success_rate = (passed_steps / total_steps * 100) if total_steps > 0 else 0 print(f"\n📊 总体成功率: {passed_steps}/{total_steps} ({success_rate:.1f}%)") # 性能指标总结 final_verification = self.test_results.get('final_verification', {}) if final_verification.get('success'): stats = final_verification.get('statistics', {}) print(f"\n📈 性能指标总结:") print(f" 数据处理效率: {stats.get('extraction_rate', 0)}%") print(f" 数据验证率: {stats.get('validation_rate', 0)}%") print(f" 系统健康度: {final_verification.get('system_health', 0):.1f}%") # 数据流总结 print(f"\n🔄 数据流总结:") print(f" 原始数据 → 提取数据 → 有效数据") print(f" {stats.get('original_count', 0)} → {stats.get('extracted_count', 0)} → {stats.get('valid_count', 0)}") # 系统评级 print(f"\n🏆 系统评级:") if success_rate >= 90: grade = "A+" status = "🏆 优秀!系统已达到生产级别" elif success_rate >= 80: grade = "A" status = "✅ 良好!系统运行稳定" elif success_rate >= 70: grade = "B+" status = "📈 可用!需要少量优化" elif success_rate >= 60: grade = "B" status = "⚠️ 基本可用,需要改进" else: grade = "C" status = "❌ 需要重大修复" print(f" 等级: {grade}") print(f" 状态: {status}") # 保存详细报告 report_file = Path(__file__).parent / "complete_e2e_test_report.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(self.test_results, f, indent=2, ensure_ascii=False) print(f"\n📄 详细报告已保存: {report_file}") return success_rate >= 80 def cleanup(self): """清理测试文件""" try: if self.test_data_file and self.test_data_file.exists(): self.test_data_file.unlink() print(f"🗑️ 清理测试文件: {self.test_data_file}") except Exception as e: print(f"⚠️ 清理文件失败: {e}") def run_complete_test(self): """运行完整的端到端测试""" print("🎯 开始完整端到端流程测试") print("="*80) print("📝 测试范围: 数据上传 → 处理流水线 → AI分析 → 数据获取 → 验证") print("="*80) try: # 执行所有测试步骤 steps = [ self.step_1_system_health_check, self.step_2_prepare_test_data, self.step_3_file_upload, self.step_4_processing_pipeline, self.step_5_complete_processing, self.step_6_data_retrieval_test, self.step_7_final_verification ] for step in steps: if not step(): print(f"\n⚠️ 测试步骤失败,但继续执行后续步骤...") time.sleep(1) # 步骤间暂停 # 生成综合报告 success = self.generate_comprehensive_report() return success except KeyboardInterrupt: print(f"\n⏹️ 测试被用户中断") return False except Exception as e: print(f"\n❌ 测试过程发生异常: {e}") return False finally: self.cleanup() def main(): """主函数""" tester = CompleteEndToEndTest() success = tester.run_complete_test() print("\n" + "="*80) if success: print("🎉 完整端到端测试成功完成!") print("✅ 系统已准备好投入生产使用") else: print("⚠️ 端到端测试完成,但存在需要关注的问题") print("💡 请查看详细报告了解具体问题") return 0 if success else 1 if __name__ == "__main__": sys.exit(main())