You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
18 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
AI处理逻辑优化测试
测试改进版AI处理器的效果
"""
import os
import sys
import django
import requests
import json
import time
from pathlib import Path
# 设置Django环境
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
django.setup()
from django.db import connection
API_BASE = "http://127.0.0.1:8080/api"
class AIOptimizationTest:
def __init__(self):
self.test_results = {
'before_optimization': {},
'after_optimization': {},
'comparison': {}
}
def prepare_test_data(self):
"""准备优化测试数据"""
print("🔧 准备AI优化测试数据...")
# 更真实的ATC对话数据
test_data = [
("opt_001", "CCA123 climb to flight level 350"),
("opt_002", "China Southern 456 descend to flight level 280"),
("opt_003", "Air China 789 turn left heading 270 degrees"),
("opt_004", "Hainan Airlines 888 contact Shanghai approach 121.9"),
("opt_005", "Spring Airlines 999 cleared for takeoff runway 36L"),
("opt_006", "United 997 maintain flight level 330"),
("opt_007", "Lufthansa 672 descending to flight level 250"),
("opt_008", "Singapore Airlines 106 approach runway 18R"),
("opt_009", "Japan Airlines 550 hold position"),
("opt_010", "KLM 695 taxi to gate A15")
]
try:
with connection.cursor() as cursor:
# 清空并插入新数据
cursor.execute("DELETE FROM prewashed_table")
for id_val, text_val in test_data:
cursor.execute(
"INSERT INTO prewashed_table (id, text) VALUES (%s, %s)",
[id_val, text_val]
)
cursor.execute("SELECT COUNT(*) FROM prewashed_table")
count = cursor.fetchone()[0]
print(f" ✓ 成功准备 {count} 条AI优化测试数据")
# 显示测试数据样例
cursor.execute("SELECT * FROM prewashed_table LIMIT 3")
sample_data = cursor.fetchall()
print(" 📋 测试数据样例:")
for row in sample_data:
print(f" ID: {row[0]}, Text: {row[1]}")
return True
except Exception as e:
print(f" ✗ 测试数据准备失败: {e}")
return False
def capture_before_state(self):
"""捕获优化前的状态"""
print("\n📊 捕获优化前状态...")
try:
# 获取统计信息
response = requests.get(f"{API_BASE}/statistics/", timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
stats = data['statistics']
self.test_results['before_optimization'] = {
'original_count': stats.get('original_count', 0),
'extracted_count': stats.get('extracted_count', 0),
'valid_count': stats.get('valid_count', 0),
'invalid_count': stats.get('invalid_count', 0),
'extraction_rate': stats.get('extraction_rate', 0),
'validation_rate': stats.get('validation_rate', 0)
}
print(f" 📊 原始数据: {stats.get('original_count', 0)}")
print(f" 📊 提取数据: {stats.get('extracted_count', 0)}")
print(f" 📊 有效数据: {stats.get('valid_count', 0)}")
print(f" 📊 提取率: {stats.get('extraction_rate', 0)}%")
print(f" 📊 验证率: {stats.get('validation_rate', 0)}%")
return True
except Exception as e:
print(f" ✗ 捕获优化前状态失败: {e}")
return False
return False
def run_improved_processing(self):
"""运行改进版处理"""
print("\n🚀 运行改进版AI处理...")
try:
# 调用完整处理流程API
response = requests.post(f"{API_BASE}/process-data/", timeout=60)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
print(f" ✓ 改进版AI处理成功")
print(f" 📊 原始记录: {data.get('original_count', 0)}")
print(f" 📊 提取结果: {data.get('extracted_count', 0)}")
print(f" 📊 处理记录: {data.get('processed_count', 0)}")
print(f" 📊 有效记录: {data.get('valid_count', 0)}")
print(f" 📊 无效记录: {data.get('invalid_count', 0)}")
print(f" 📊 提取率: {data.get('extraction_rate', 0)}%")
print(f" 📊 验证率: {data.get('validation_rate', 0)}%")
self.test_results['after_optimization'] = {
'original_count': data.get('original_count', 0),
'extracted_count': data.get('extracted_count', 0),
'processed_count': data.get('processed_count', 0),
'valid_count': data.get('valid_count', 0),
'invalid_count': data.get('invalid_count', 0),
'extraction_rate': data.get('extraction_rate', 0),
'validation_rate': data.get('validation_rate', 0)
}
return True
else:
print(f" ✗ 改进版AI处理失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ API调用失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 改进版AI处理异常: {e}")
return False
def verify_processed_data_quality(self):
"""验证处理后数据质量"""
print("\n🔍 验证处理后数据质量...")
try:
# 检查processed_table新表
response = requests.get(f"{API_BASE}/processed-data/", timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success' and data['count'] > 0:
records = data['data']
print(f" ✓ 获取到 {data['count']} 条处理后数据")
# 分析数据质量
quality_stats = {
'non_empty_call_signs': 0,
'non_empty_behaviors': 0,
'non_empty_flight_levels': 0,
'non_empty_locations': 0,
'non_empty_times': 0
}
print(" 📋 数据质量样例:")
for i, record in enumerate(records[:5], 1):
call_sign = record.get('Call Sign', '').strip()
behavior = record.get('Behavior', '').strip()
flight_level = record.get('Flight Level', '').strip()
location = record.get('Location', '').strip()
time = record.get('Time', '').strip()
print(f" {i}. 呼号: {call_sign or 'N/A'}")
print(f" 行为: {behavior or 'N/A'}")
print(f" 高度: {flight_level or 'N/A'}")
print(f" 位置: {location or 'N/A'}")
print(f" 时间: {time or 'N/A'}")
print()
# 统计非空字段
if call_sign and call_sign != 'NULL':
quality_stats['non_empty_call_signs'] += 1
if behavior and behavior != 'NULL':
quality_stats['non_empty_behaviors'] += 1
if flight_level and flight_level != 'NULL':
quality_stats['non_empty_flight_levels'] += 1
if location and location != 'NULL':
quality_stats['non_empty_locations'] += 1
if time and time != 'NULL':
quality_stats['non_empty_times'] += 1
# 计算质量百分比(基于所有记录)
total_records = len(records)
quality_percentages = {}
for field, count in quality_stats.items():
quality_percentages[field] = round(count / total_records * 100, 1) if total_records > 0 else 0
print(" 📊 数据完整性分析:")
print(f" 呼号完整性: {quality_percentages['non_empty_call_signs']}%")
print(f" 行为完整性: {quality_percentages['non_empty_behaviors']}%")
print(f" 高度完整性: {quality_percentages['non_empty_flight_levels']}%")
print(f" 位置完整性: {quality_percentages['non_empty_locations']}%")
print(f" 时间完整性: {quality_percentages['non_empty_times']}%")
self.test_results['data_quality'] = {
'total_records': total_records,
'quality_stats': quality_stats,
'quality_percentages': quality_percentages
}
return True
else:
print(" ⚠️ 处理后数据为空")
return False
else:
print(f" ✗ 数据质量检查失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 数据质量验证失败: {e}")
return False
def check_final_tables(self):
"""检查最终数据表"""
print("\n📋 检查最终数据表...")
try:
# 检查final_table
response = requests.get(f"{API_BASE}/final-data/", timeout=10)
if response.status_code == 200:
data = response.json()
final_count = data.get('count', 0)
print(f" ✓ final_table: {final_count} 条有效数据")
if final_count > 0:
print(" 📋 有效数据样例:")
for i, record in enumerate(data['data'][:3], 1):
print(f" {i}. {record.get('Call Sign', 'N/A')} - {record.get('Behavior', 'N/A')}")
else:
print(f" ⚠️ final_table检查失败: {response.status_code}")
# 检查quarantine_table
response = requests.get(f"{API_BASE}/quarantine-data/", timeout=10)
if response.status_code == 200:
data = response.json()
quarantine_count = data.get('count', 0)
print(f" ✓ quarantine_table: {quarantine_count} 条无效数据")
else:
print(f" ⚠️ quarantine_table检查失败: {response.status_code}")
return True
except Exception as e:
print(f" ✗ 最终数据表检查失败: {e}")
return False
def generate_optimization_report(self):
"""生成优化报告"""
print("\n" + "="*80)
print("📊 AI处理逻辑优化报告")
print("="*80)
before = self.test_results.get('before_optimization', {})
after = self.test_results.get('after_optimization', {})
quality = self.test_results.get('data_quality', {})
if before and after:
print(f"\n📈 优化前后对比:")
print(f" 原始数据: {before.get('original_count', 0)}{after.get('original_count', 0)}")
print(f" 提取数据: {before.get('extracted_count', 0)}{after.get('extracted_count', 0)}")
print(f" 有效数据: {before.get('valid_count', 0)}{after.get('valid_count', 0)}")
print(f" 提取率: {before.get('extraction_rate', 0)}% → {after.get('extraction_rate', 0)}%")
print(f" 验证率: {before.get('validation_rate', 0)}% → {after.get('validation_rate', 0)}%")
# 计算改进幅度
improvements = {}
for metric in ['extraction_rate', 'validation_rate', 'valid_count']:
before_val = before.get(metric, 0)
after_val = after.get(metric, 0)
if before_val > 0:
improvement = ((after_val - before_val) / before_val) * 100
improvements[metric] = improvement
else:
improvements[metric] = float('inf') if after_val > 0 else 0
print(f"\n📊 改进幅度:")
for metric, improvement in improvements.items():
if improvement == float('inf'):
print(f" {metric}: 从0提升到{after.get(metric, 0)}")
elif improvement > 0:
print(f" {metric}: +{improvement:.1f}%")
elif improvement < 0:
print(f" {metric}: {improvement:.1f}%")
else:
print(f" {metric}: 无变化")
if quality:
print(f"\n🔍 数据质量分析:")
total = quality.get('total_records', 0)
percentages = quality.get('quality_percentages', {})
print(f" 总处理记录: {total}")
print(f" 呼号完整性: {percentages.get('non_empty_call_signs', 0)}%")
print(f" 行为完整性: {percentages.get('non_empty_behaviors', 0)}%")
print(f" 高度完整性: {percentages.get('non_empty_flight_levels', 0)}%")
# 计算整体质量评分
quality_score = (
percentages.get('non_empty_call_signs', 0) * 0.4 +
percentages.get('non_empty_behaviors', 0) * 0.3 +
percentages.get('non_empty_flight_levels', 0) * 0.2 +
percentages.get('non_empty_locations', 0) * 0.05 +
percentages.get('non_empty_times', 0) * 0.05
)
print(f" 整体质量评分: {quality_score:.1f}/100")
# 优化总结
print(f"\n🎯 优化总结:")
validation_rate = after.get('validation_rate', 0)
if validation_rate > 80:
print(" 🏆 优秀AI处理质量显著提升")
elif validation_rate > 60:
print(" ✅ 良好AI处理质量有明显改善")
elif validation_rate > 40:
print(" ⚠️ 一般,还有进一步优化空间")
else:
print(" ❌ 需要继续优化AI处理逻辑")
# 保存报告
report_file = Path(__file__).parent / "ai_optimization_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(self.test_results, f, indent=2, ensure_ascii=False)
print(f"\n📄 详细报告已保存: {report_file}")
def run_optimization_test(self):
"""运行完整的AI优化测试"""
print("🎯 开始AI处理逻辑优化测试")
print("="*80)
# 1. 准备测试数据
if not self.prepare_test_data():
print("❌ 测试数据准备失败")
return False
# 2. 捕获优化前状态
self.capture_before_state()
# 3. 运行改进版处理
if not self.run_improved_processing():
print("❌ 改进版AI处理失败")
return False
# 4. 验证数据质量
self.verify_processed_data_quality()
# 5. 检查最终表
self.check_final_tables()
# 6. 生成优化报告
self.generate_optimization_report()
return True
def main():
"""主函数"""
tester = AIOptimizationTest()
success = tester.run_optimization_test()
print("\n" + "="*80)
if success:
print("✅ AI处理逻辑优化测试完成")
else:
print("⚠️ 优化测试遇到问题,请检查日志")
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())