#!/usr/bin/env python3 """ AI处理逻辑优化测试 测试改进版AI处理器的效果 """ import os import sys import django import requests import json import time from pathlib import Path # 设置Django环境 sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') django.setup() from django.db import connection API_BASE = "http://127.0.0.1:8080/api" class AIOptimizationTest: def __init__(self): self.test_results = { 'before_optimization': {}, 'after_optimization': {}, 'comparison': {} } def prepare_test_data(self): """准备优化测试数据""" print("🔧 准备AI优化测试数据...") # 更真实的ATC对话数据 test_data = [ ("opt_001", "CCA123 climb to flight level 350"), ("opt_002", "China Southern 456 descend to flight level 280"), ("opt_003", "Air China 789 turn left heading 270 degrees"), ("opt_004", "Hainan Airlines 888 contact Shanghai approach 121.9"), ("opt_005", "Spring Airlines 999 cleared for takeoff runway 36L"), ("opt_006", "United 997 maintain flight level 330"), ("opt_007", "Lufthansa 672 descending to flight level 250"), ("opt_008", "Singapore Airlines 106 approach runway 18R"), ("opt_009", "Japan Airlines 550 hold position"), ("opt_010", "KLM 695 taxi to gate A15") ] try: with connection.cursor() as cursor: # 清空并插入新数据 cursor.execute("DELETE FROM prewashed_table") for id_val, text_val in test_data: cursor.execute( "INSERT INTO prewashed_table (id, text) VALUES (%s, %s)", [id_val, text_val] ) cursor.execute("SELECT COUNT(*) FROM prewashed_table") count = cursor.fetchone()[0] print(f" ✓ 成功准备 {count} 条AI优化测试数据") # 显示测试数据样例 cursor.execute("SELECT * FROM prewashed_table LIMIT 3") sample_data = cursor.fetchall() print(" 📋 测试数据样例:") for row in sample_data: print(f" ID: {row[0]}, Text: {row[1]}") return True except Exception as e: print(f" ✗ 测试数据准备失败: {e}") return False def capture_before_state(self): """捕获优化前的状态""" print("\n📊 捕获优化前状态...") try: # 获取统计信息 response = requests.get(f"{API_BASE}/statistics/", timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success': stats = data['statistics'] self.test_results['before_optimization'] = { 'original_count': stats.get('original_count', 0), 'extracted_count': stats.get('extracted_count', 0), 'valid_count': stats.get('valid_count', 0), 'invalid_count': stats.get('invalid_count', 0), 'extraction_rate': stats.get('extraction_rate', 0), 'validation_rate': stats.get('validation_rate', 0) } print(f" 📊 原始数据: {stats.get('original_count', 0)} 条") print(f" 📊 提取数据: {stats.get('extracted_count', 0)} 条") print(f" 📊 有效数据: {stats.get('valid_count', 0)} 条") print(f" 📊 提取率: {stats.get('extraction_rate', 0)}%") print(f" 📊 验证率: {stats.get('validation_rate', 0)}%") return True except Exception as e: print(f" ✗ 捕获优化前状态失败: {e}") return False return False def run_improved_processing(self): """运行改进版处理""" print("\n🚀 运行改进版AI处理...") try: # 调用完整处理流程API response = requests.post(f"{API_BASE}/process-data/", timeout=60) if response.status_code == 200: data = response.json() if data['status'] == 'success': print(f" ✓ 改进版AI处理成功") print(f" 📊 原始记录: {data.get('original_count', 0)}") print(f" 📊 提取结果: {data.get('extracted_count', 0)}") print(f" 📊 处理记录: {data.get('processed_count', 0)}") print(f" 📊 有效记录: {data.get('valid_count', 0)}") print(f" 📊 无效记录: {data.get('invalid_count', 0)}") print(f" 📊 提取率: {data.get('extraction_rate', 0)}%") print(f" 📊 验证率: {data.get('validation_rate', 0)}%") self.test_results['after_optimization'] = { 'original_count': data.get('original_count', 0), 'extracted_count': data.get('extracted_count', 0), 'processed_count': data.get('processed_count', 0), 'valid_count': data.get('valid_count', 0), 'invalid_count': data.get('invalid_count', 0), 'extraction_rate': data.get('extraction_rate', 0), 'validation_rate': data.get('validation_rate', 0) } return True else: print(f" ✗ 改进版AI处理失败: {data.get('message', '未知错误')}") return False else: print(f" ✗ API调用失败: {response.status_code}") return False except Exception as e: print(f" ✗ 改进版AI处理异常: {e}") return False def verify_processed_data_quality(self): """验证处理后数据质量""" print("\n🔍 验证处理后数据质量...") try: # 检查processed_table(新表) response = requests.get(f"{API_BASE}/processed-data/", timeout=10) if response.status_code == 200: data = response.json() if data['status'] == 'success' and data['count'] > 0: records = data['data'] print(f" ✓ 获取到 {data['count']} 条处理后数据") # 分析数据质量 quality_stats = { 'non_empty_call_signs': 0, 'non_empty_behaviors': 0, 'non_empty_flight_levels': 0, 'non_empty_locations': 0, 'non_empty_times': 0 } print(" 📋 数据质量样例:") for i, record in enumerate(records[:5], 1): call_sign = record.get('Call Sign', '').strip() behavior = record.get('Behavior', '').strip() flight_level = record.get('Flight Level', '').strip() location = record.get('Location', '').strip() time = record.get('Time', '').strip() print(f" {i}. 呼号: {call_sign or 'N/A'}") print(f" 行为: {behavior or 'N/A'}") print(f" 高度: {flight_level or 'N/A'}") print(f" 位置: {location or 'N/A'}") print(f" 时间: {time or 'N/A'}") print() # 统计非空字段 if call_sign and call_sign != 'NULL': quality_stats['non_empty_call_signs'] += 1 if behavior and behavior != 'NULL': quality_stats['non_empty_behaviors'] += 1 if flight_level and flight_level != 'NULL': quality_stats['non_empty_flight_levels'] += 1 if location and location != 'NULL': quality_stats['non_empty_locations'] += 1 if time and time != 'NULL': quality_stats['non_empty_times'] += 1 # 计算质量百分比(基于所有记录) total_records = len(records) quality_percentages = {} for field, count in quality_stats.items(): quality_percentages[field] = round(count / total_records * 100, 1) if total_records > 0 else 0 print(" 📊 数据完整性分析:") print(f" 呼号完整性: {quality_percentages['non_empty_call_signs']}%") print(f" 行为完整性: {quality_percentages['non_empty_behaviors']}%") print(f" 高度完整性: {quality_percentages['non_empty_flight_levels']}%") print(f" 位置完整性: {quality_percentages['non_empty_locations']}%") print(f" 时间完整性: {quality_percentages['non_empty_times']}%") self.test_results['data_quality'] = { 'total_records': total_records, 'quality_stats': quality_stats, 'quality_percentages': quality_percentages } return True else: print(" ⚠️ 处理后数据为空") return False else: print(f" ✗ 数据质量检查失败: {response.status_code}") return False except Exception as e: print(f" ✗ 数据质量验证失败: {e}") return False def check_final_tables(self): """检查最终数据表""" print("\n📋 检查最终数据表...") try: # 检查final_table response = requests.get(f"{API_BASE}/final-data/", timeout=10) if response.status_code == 200: data = response.json() final_count = data.get('count', 0) print(f" ✓ final_table: {final_count} 条有效数据") if final_count > 0: print(" 📋 有效数据样例:") for i, record in enumerate(data['data'][:3], 1): print(f" {i}. {record.get('Call Sign', 'N/A')} - {record.get('Behavior', 'N/A')}") else: print(f" ⚠️ final_table检查失败: {response.status_code}") # 检查quarantine_table response = requests.get(f"{API_BASE}/quarantine-data/", timeout=10) if response.status_code == 200: data = response.json() quarantine_count = data.get('count', 0) print(f" ✓ quarantine_table: {quarantine_count} 条无效数据") else: print(f" ⚠️ quarantine_table检查失败: {response.status_code}") return True except Exception as e: print(f" ✗ 最终数据表检查失败: {e}") return False def generate_optimization_report(self): """生成优化报告""" print("\n" + "="*80) print("📊 AI处理逻辑优化报告") print("="*80) before = self.test_results.get('before_optimization', {}) after = self.test_results.get('after_optimization', {}) quality = self.test_results.get('data_quality', {}) if before and after: print(f"\n📈 优化前后对比:") print(f" 原始数据: {before.get('original_count', 0)} → {after.get('original_count', 0)}") print(f" 提取数据: {before.get('extracted_count', 0)} → {after.get('extracted_count', 0)}") print(f" 有效数据: {before.get('valid_count', 0)} → {after.get('valid_count', 0)}") print(f" 提取率: {before.get('extraction_rate', 0)}% → {after.get('extraction_rate', 0)}%") print(f" 验证率: {before.get('validation_rate', 0)}% → {after.get('validation_rate', 0)}%") # 计算改进幅度 improvements = {} for metric in ['extraction_rate', 'validation_rate', 'valid_count']: before_val = before.get(metric, 0) after_val = after.get(metric, 0) if before_val > 0: improvement = ((after_val - before_val) / before_val) * 100 improvements[metric] = improvement else: improvements[metric] = float('inf') if after_val > 0 else 0 print(f"\n📊 改进幅度:") for metric, improvement in improvements.items(): if improvement == float('inf'): print(f" {metric}: 从0提升到{after.get(metric, 0)}") elif improvement > 0: print(f" {metric}: +{improvement:.1f}%") elif improvement < 0: print(f" {metric}: {improvement:.1f}%") else: print(f" {metric}: 无变化") if quality: print(f"\n🔍 数据质量分析:") total = quality.get('total_records', 0) percentages = quality.get('quality_percentages', {}) print(f" 总处理记录: {total}") print(f" 呼号完整性: {percentages.get('non_empty_call_signs', 0)}%") print(f" 行为完整性: {percentages.get('non_empty_behaviors', 0)}%") print(f" 高度完整性: {percentages.get('non_empty_flight_levels', 0)}%") # 计算整体质量评分 quality_score = ( percentages.get('non_empty_call_signs', 0) * 0.4 + percentages.get('non_empty_behaviors', 0) * 0.3 + percentages.get('non_empty_flight_levels', 0) * 0.2 + percentages.get('non_empty_locations', 0) * 0.05 + percentages.get('non_empty_times', 0) * 0.05 ) print(f" 整体质量评分: {quality_score:.1f}/100") # 优化总结 print(f"\n🎯 优化总结:") validation_rate = after.get('validation_rate', 0) if validation_rate > 80: print(" 🏆 优秀!AI处理质量显著提升") elif validation_rate > 60: print(" ✅ 良好!AI处理质量有明显改善") elif validation_rate > 40: print(" ⚠️ 一般,还有进一步优化空间") else: print(" ❌ 需要继续优化AI处理逻辑") # 保存报告 report_file = Path(__file__).parent / "ai_optimization_report.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(self.test_results, f, indent=2, ensure_ascii=False) print(f"\n📄 详细报告已保存: {report_file}") def run_optimization_test(self): """运行完整的AI优化测试""" print("🎯 开始AI处理逻辑优化测试") print("="*80) # 1. 准备测试数据 if not self.prepare_test_data(): print("❌ 测试数据准备失败") return False # 2. 捕获优化前状态 self.capture_before_state() # 3. 运行改进版处理 if not self.run_improved_processing(): print("❌ 改进版AI处理失败") return False # 4. 验证数据质量 self.verify_processed_data_quality() # 5. 检查最终表 self.check_final_tables() # 6. 生成优化报告 self.generate_optimization_report() return True def main(): """主函数""" tester = AIOptimizationTest() success = tester.run_optimization_test() print("\n" + "="*80) if success: print("✅ AI处理逻辑优化测试完成!") else: print("⚠️ 优化测试遇到问题,请检查日志") return 0 if success else 1 if __name__ == "__main__": sys.exit(main())