#!/usr/bin/env python3 """ 数据处理流程完整测试 测试从原始数据到最终结果的完整流程 """ import os import sys import django import requests import json import time import pandas as pd from pathlib import Path # 设置Django环境 sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') django.setup() from django.db import connection API_BASE = "http://127.0.0.1:8080/api" class DataProcessingFlowTest: def __init__(self): self.test_results = { 'setup': {}, 'processing_steps': {}, 'data_validation': {}, 'api_tests': {} } def setup_test_data(self): """准备测试数据""" print("🔧 准备测试数据...") # 创建测试用的ATC对话数据 test_data = [ ("test_001", "CCA123 climb to flight level 350"), ("test_002", "CSN456 descend to flight level 280"), ("test_003", "CHH789 maintain heading 090 degrees"), ("test_004", "Air China 001 turn left heading 270"), ("test_005", "China Southern 888 contact approach 121.9"), ("test_006", "Hainan Airlines 7777 reduce speed to 250 knots"), ("test_007", "Spring Airlines 9999 cleared for takeoff runway 36L"), ("test_008", "Juneyao Airlines 1234 taxi to gate A15"), ("test_009", "Shanghai Airlines 5678 hold short of runway 18R"), ("test_010", "Tibet Airlines 9876 report when ready for departure") ] try: with connection.cursor() as cursor: # 确保prewashed_table存在 cursor.execute(""" CREATE TABLE IF NOT EXISTS prewashed_table ( id VARCHAR(255) NOT NULL, text TEXT, PRIMARY KEY (id) ) """) # 清空现有数据 cursor.execute("DELETE FROM prewashed_table") # 插入测试数据 for id_val, text_val in test_data: cursor.execute( "INSERT INTO prewashed_table (id, text) VALUES (%s, %s)", [id_val, text_val] ) # 验证插入的数据 cursor.execute("SELECT COUNT(*) FROM prewashed_table") count = cursor.fetchone()[0] print(f" ✓ 成功插入 {count} 条测试数据") # 显示插入的数据样例 cursor.execute("SELECT * FROM prewashed_table LIMIT 3") sample_data = cursor.fetchall() print(" 📋 数据样例:") for row in sample_data: print(f" ID: {row[0]}, Text: {row[1]}") self.test_results['setup'] = { 'success': True, 'data_count': count, 'sample_data': sample_data } return True except Exception as e: print(f" ✗ 测试数据准备失败: {e}") self.test_results['setup'] = { 'success': False, 'error': str(e) } return False def check_server_status(self): """检查Django服务器状态""" print("\n🚀 检查Django服务器状态...") try: response = requests.get(f"{API_BASE}/health/", timeout=5) if response.status_code == 200: print(" ✓ Django服务器运行正常") return True else: print(f" ✗ 服务器响应异常: {response.status_code}") return False except requests.RequestException as e: print(f" ✗ 无法连接到Django服务器: {e}") print(" 💡 请确保Django服务器正在运行: python manage.py runserver") return False def test_original_data_api(self): """测试原始数据获取API""" print("\n📊 测试原始数据获取API...") try: response = requests.get(f"{API_BASE}/original-data/", timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': count = data['count'] records = data['data'] print(f" ✓ 成功获取 {count} 条原始数据") print(" 📋 原始数据样例:") for i, record in enumerate(records[:3]): print(f" {i+1}. ID: {record['id']}, Text: {record['text'][:50]}...") self.test_results['api_tests']['original_data'] = { 'success': True, 'count': count, 'sample': records[:3] } return True else: print(f" ✗ API返回错误: {data.get('message', '未知错误')}") return False else: print(f" ✗ API请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 原始数据API测试失败: {e}") self.test_results['api_tests']['original_data'] = { 'success': False, 'error': str(e) } return False def test_preprocessing_step(self): """测试数据预处理步骤""" print("\n🔄 测试数据预处理步骤...") try: response = requests.post(f"{API_BASE}/preprocess/", json={}, headers={'Content-Type': 'application/json'}, timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': processed_count = data['data']['processed_count'] cleaning_rate = data['data']['cleaning_rate'] print(f" ✓ 预处理成功: 处理了 {processed_count} 条记录") print(f" 📊 数据清理率: {cleaning_rate}%") self.test_results['processing_steps']['preprocess'] = { 'success': True, 'processed_count': processed_count, 'cleaning_rate': cleaning_rate } return True else: print(f" ✗ 预处理失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 预处理请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 预处理测试失败: {e}") self.test_results['processing_steps']['preprocess'] = { 'success': False, 'error': str(e) } return False def test_merge_step(self): """测试格式合并步骤""" print("\n🔗 测试格式合并步骤...") try: response = requests.post(f"{API_BASE}/merge/", json={}, headers={'Content-Type': 'application/json'}, timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': merged_records = data['data']['merged_records'] success_rate = data['data']['merge_success_rate'] print(f" ✓ 格式合并成功: 合并了 {merged_records} 条记录") print(f" 📊 合并成功率: {success_rate}%") self.test_results['processing_steps']['merge'] = { 'success': True, 'merged_records': merged_records, 'success_rate': success_rate } return True else: print(f" ✗ 格式合并失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 格式合并请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 格式合并测试失败: {e}") self.test_results['processing_steps']['merge'] = { 'success': False, 'error': str(e) } return False def test_correction_step(self): """测试单词纠错步骤""" print("\n📝 测试单词纠错步骤...") try: response = requests.post(f"{API_BASE}/correct/", json={}, headers={'Content-Type': 'application/json'}, timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': corrected_words = data['data']['corrected_words'] correction_types = data['data']['correction_types'] print(f" ✓ 单词纠错成功: 纠正了 {corrected_words} 个单词") print(f" 📋 纠错类型: {', '.join(correction_types)}") self.test_results['processing_steps']['correction'] = { 'success': True, 'corrected_words': corrected_words, 'correction_types': correction_types } return True else: print(f" ✗ 单词纠错失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 单词纠错请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 单词纠错测试失败: {e}") self.test_results['processing_steps']['correction'] = { 'success': False, 'error': str(e) } return False def test_ai_analysis_step(self): """测试AI分析步骤""" print("\n🤖 测试AI分析步骤...") try: response = requests.post(f"{API_BASE}/analyze/", json={}, headers={'Content-Type': 'application/json'}, timeout=30) # AI处理可能需要更长时间 if response.status_code == 200: data = response.json() if data['status'] == 'success': if 'analysis_summary' in data['data']: # 模拟分析结果 summary = data['data']['analysis_summary'] print(f" ✓ AI分析完成(模拟结果)") print(f" 📊 呼号提取: {summary.get('call_signs_extracted', 0)}") print(f" 📊 行为识别: {summary.get('behaviors_identified', 0)}") print(f" 📊 高度检测: {summary.get('flight_levels_detected', 0)}") else: # 实际AI处理结果 print(f" ✓ AI分析完成(实际处理)") print(f" 📊 处理结果: {data['data']}") self.test_results['processing_steps']['ai_analysis'] = { 'success': True, 'result': data['data'] } return True else: print(f" ✗ AI分析失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ AI分析请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ AI分析测试失败: {e}") self.test_results['processing_steps']['ai_analysis'] = { 'success': False, 'error': str(e) } return False def test_processed_data_api(self): """测试处理后数据获取API""" print("\n📋 测试处理后数据获取API...") try: response = requests.get(f"{API_BASE}/processed-data/", timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': count = data['count'] records = data['data'] print(f" ✓ 成功获取 {count} 条处理后数据") if records: print(" 📋 处理后数据样例:") for i, record in enumerate(records[:3]): print(f" {i+1}. 呼号: {record.get('Call Sign', 'N/A')}") print(f" 行为: {record.get('Behavior', 'N/A')}") print(f" 高度: {record.get('Flight Level', 'N/A')}") print(f" 位置: {record.get('Location', 'N/A')}") print() self.test_results['api_tests']['processed_data'] = { 'success': True, 'count': count, 'sample': records[:3] if records else [] } return True else: print(f" ⚠️ 处理后数据为空或获取失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 处理后数据API请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 处理后数据API测试失败: {e}") self.test_results['api_tests']['processed_data'] = { 'success': False, 'error': str(e) } return False def test_statistics_api(self): """测试统计信息API""" print("\n📊 测试统计信息API...") try: response = requests.get(f"{API_BASE}/statistics/", timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': stats = data['statistics'] print(f" ✓ 统计信息获取成功") print(f" 📊 原始数据: {stats.get('original_count', 0)} 条") print(f" 📊 处理数据: {stats.get('extracted_count', 0)} 条") print(f" 📊 有效数据: {stats.get('valid_count', 0)} 条") print(f" 📊 无效数据: {stats.get('invalid_count', 0)} 条") print(f" 📊 提取率: {stats.get('extraction_rate', 0)}%") print(f" 📊 验证率: {stats.get('validation_rate', 0)}%") self.test_results['api_tests']['statistics'] = { 'success': True, 'statistics': stats } return True else: print(f" ✗ 统计信息获取失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 统计信息API请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 统计信息API测试失败: {e}") self.test_results['api_tests']['statistics'] = { 'success': False, 'error': str(e) } return False def test_complete_processing_api(self): """测试完整处理流程API""" print("\n🚀 测试完整处理流程API...") try: response = requests.post(f"{API_BASE}/process-data/", timeout=60) # 完整处理可能需要较长时间 if response.status_code == 200: data = response.json() if data['status'] == 'success': print(f" ✓ 完整处理流程成功") # 显示处理结果 if 'processed_count' in data: print(f" 📊 处理数量: {data['processed_count']}") if 'valid_count' in data: print(f" 📊 有效数量: {data['valid_count']}") if 'invalid_count' in data: print(f" 📊 无效数量: {data['invalid_count']}") self.test_results['processing_steps']['complete_process'] = { 'success': True, 'result': data } return True else: print(f" ✗ 完整处理失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ 完整处理API请求失败: {response.status_code}") return False except Exception as e: print(f" ✗ 完整处理API测试失败: {e}") self.test_results['processing_steps']['complete_process'] = { 'success': False, 'error': str(e) } return False def check_database_tables(self): """检查数据库表状态""" print("\n🗄️ 检查数据库表状态...") tables_to_check = [ 'prewashed_table', 'processed_table', 'precessed_table', # 旧的拼写错误表名 'final_table', 'quarantine_table' ] table_status = {} try: with connection.cursor() as cursor: for table in tables_to_check: try: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] table_status[table] = {'exists': True, 'count': count} print(f" ✓ {table}: {count} 条记录") except Exception: table_status[table] = {'exists': False, 'count': 0} print(f" - {table}: 表不存在") self.test_results['data_validation']['table_status'] = table_status except Exception as e: print(f" ✗ 数据库表检查失败: {e}") self.test_results['data_validation']['table_status'] = { 'error': str(e) } def generate_test_report(self): """生成测试报告""" print("\n" + "="*80) print("📋 数据处理流程测试报告") print("="*80) # 测试数据准备 setup = self.test_results.get('setup', {}) if setup.get('success'): print(f"\n✅ 测试数据准备: 成功 ({setup.get('data_count', 0)} 条)") else: print(f"\n❌ 测试数据准备: 失败") # 处理步骤测试 steps = self.test_results.get('processing_steps', {}) print(f"\n🔄 处理步骤测试:") step_names = { 'preprocess': '数据预处理', 'merge': '格式合并', 'correction': '单词纠错', 'ai_analysis': 'AI分析', 'complete_process': '完整处理流程' } for step_key, step_name in step_names.items(): if step_key in steps: if steps[step_key].get('success'): print(f" ✅ {step_name}: 成功") else: print(f" ❌ {step_name}: 失败") else: print(f" ⏭️ {step_name}: 跳过") # API测试 api_tests = self.test_results.get('api_tests', {}) print(f"\n🌐 API测试:") api_names = { 'original_data': '原始数据获取', 'processed_data': '处理后数据获取', 'statistics': '统计信息获取' } for api_key, api_name in api_names.items(): if api_key in api_tests: if api_tests[api_key].get('success'): print(f" ✅ {api_name}: 成功") else: print(f" ❌ {api_name}: 失败") else: print(f" ⏭️ {api_name}: 跳过") # 数据验证 validation = self.test_results.get('data_validation', {}) if 'table_status' in validation: print(f"\n🗄️ 数据库表状态:") for table, status in validation['table_status'].items(): if status.get('exists'): print(f" ✅ {table}: {status.get('count', 0)} 条记录") else: print(f" ➖ {table}: 不存在") # 总体评估 total_tests = 0 passed_tests = 0 # 统计各类测试 if setup.get('success'): passed_tests += 1 total_tests += 1 for step_result in steps.values(): if step_result.get('success'): passed_tests += 1 total_tests += 1 for api_result in api_tests.values(): if api_result.get('success'): passed_tests += 1 total_tests += 1 success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0 print(f"\n🎯 总体测试结果: {passed_tests}/{total_tests} 通过 ({success_rate:.1f}%)") if success_rate >= 80: print("🏆 优秀!数据处理流程运行良好") elif success_rate >= 60: print("✅ 良好!大部分功能正常") elif success_rate >= 40: print("⚠️ 一般,部分功能需要检查") else: print("❌ 需要修复多个问题") # 保存详细报告 report_file = Path(__file__).parent / "data_processing_test_report.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(self.test_results, f, indent=2, ensure_ascii=False) print(f"\n📄 详细报告已保存: {report_file}") def run_complete_test(self): """运行完整的数据处理流程测试""" print("🎯 开始数据处理流程完整测试") print("="*80) # 1. 准备测试数据 if not self.setup_test_data(): print("❌ 测试数据准备失败,无法继续测试") return False # 2. 检查服务器状态 if not self.check_server_status(): print("❌ Django服务器未运行,无法进行API测试") print("💡 请先启动服务器: cd Django123/atc_extractor/backend && python manage.py runserver") self.check_database_tables() self.generate_test_report() return False # 3. 测试原始数据API self.test_original_data_api() # 4. 按顺序测试各个处理步骤 self.test_preprocessing_step() time.sleep(1) # 避免请求过快 self.test_merge_step() time.sleep(1) self.test_correction_step() time.sleep(1) self.test_ai_analysis_step() time.sleep(2) # 5. 测试完整处理流程 self.test_complete_processing_api() time.sleep(2) # 6. 测试处理结果API self.test_processed_data_api() time.sleep(1) # 7. 测试统计信息API self.test_statistics_api() # 8. 检查数据库表状态 self.check_database_tables() # 9. 生成测试报告 self.generate_test_report() return True def main(): """主函数""" tester = DataProcessingFlowTest() success = tester.run_complete_test() print("\n" + "="*80) if success: print("✅ 数据处理流程测试完成!") else: print("⚠️ 测试完成,但可能存在问题需要解决") return 0 if success else 1 if __name__ == "__main__": sys.exit(main())