You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
moxun-1/test/data_processing_flow_test.py

669 lines
26 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
数据处理流程完整测试
测试从原始数据到最终结果的完整流程
"""
import os
import sys
import django
import requests
import json
import time
import pandas as pd
from pathlib import Path
# 设置Django环境
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
django.setup()
from django.db import connection
API_BASE = "http://127.0.0.1:8080/api"
class DataProcessingFlowTest:
def __init__(self):
self.test_results = {
'setup': {},
'processing_steps': {},
'data_validation': {},
'api_tests': {}
}
def setup_test_data(self):
"""准备测试数据"""
print("🔧 准备测试数据...")
# 创建测试用的ATC对话数据
test_data = [
("test_001", "CCA123 climb to flight level 350"),
("test_002", "CSN456 descend to flight level 280"),
("test_003", "CHH789 maintain heading 090 degrees"),
("test_004", "Air China 001 turn left heading 270"),
("test_005", "China Southern 888 contact approach 121.9"),
("test_006", "Hainan Airlines 7777 reduce speed to 250 knots"),
("test_007", "Spring Airlines 9999 cleared for takeoff runway 36L"),
("test_008", "Juneyao Airlines 1234 taxi to gate A15"),
("test_009", "Shanghai Airlines 5678 hold short of runway 18R"),
("test_010", "Tibet Airlines 9876 report when ready for departure")
]
try:
with connection.cursor() as cursor:
# 确保prewashed_table存在
cursor.execute("""
CREATE TABLE IF NOT EXISTS prewashed_table (
id VARCHAR(255) NOT NULL,
text TEXT,
PRIMARY KEY (id)
)
""")
# 清空现有数据
cursor.execute("DELETE FROM prewashed_table")
# 插入测试数据
for id_val, text_val in test_data:
cursor.execute(
"INSERT INTO prewashed_table (id, text) VALUES (%s, %s)",
[id_val, text_val]
)
# 验证插入的数据
cursor.execute("SELECT COUNT(*) FROM prewashed_table")
count = cursor.fetchone()[0]
print(f" ✓ 成功插入 {count} 条测试数据")
# 显示插入的数据样例
cursor.execute("SELECT * FROM prewashed_table LIMIT 3")
sample_data = cursor.fetchall()
print(" 📋 数据样例:")
for row in sample_data:
print(f" ID: {row[0]}, Text: {row[1]}")
self.test_results['setup'] = {
'success': True,
'data_count': count,
'sample_data': sample_data
}
return True
except Exception as e:
print(f" ✗ 测试数据准备失败: {e}")
self.test_results['setup'] = {
'success': False,
'error': str(e)
}
return False
def check_server_status(self):
"""检查Django服务器状态"""
print("\n🚀 检查Django服务器状态...")
try:
response = requests.get(f"{API_BASE}/health/", timeout=5)
if response.status_code == 200:
print(" ✓ Django服务器运行正常")
return True
else:
print(f" ✗ 服务器响应异常: {response.status_code}")
return False
except requests.RequestException as e:
print(f" ✗ 无法连接到Django服务器: {e}")
print(" 💡 请确保Django服务器正在运行: python manage.py runserver")
return False
def test_original_data_api(self):
"""测试原始数据获取API"""
print("\n📊 测试原始数据获取API...")
try:
response = requests.get(f"{API_BASE}/original-data/", timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
count = data['count']
records = data['data']
print(f" ✓ 成功获取 {count} 条原始数据")
print(" 📋 原始数据样例:")
for i, record in enumerate(records[:3]):
print(f" {i+1}. ID: {record['id']}, Text: {record['text'][:50]}...")
self.test_results['api_tests']['original_data'] = {
'success': True,
'count': count,
'sample': records[:3]
}
return True
else:
print(f" ✗ API返回错误: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ API请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 原始数据API测试失败: {e}")
self.test_results['api_tests']['original_data'] = {
'success': False,
'error': str(e)
}
return False
def test_preprocessing_step(self):
"""测试数据预处理步骤"""
print("\n🔄 测试数据预处理步骤...")
try:
response = requests.post(f"{API_BASE}/preprocess/",
json={},
headers={'Content-Type': 'application/json'},
timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
processed_count = data['data']['processed_count']
cleaning_rate = data['data']['cleaning_rate']
print(f" ✓ 预处理成功: 处理了 {processed_count} 条记录")
print(f" 📊 数据清理率: {cleaning_rate}%")
self.test_results['processing_steps']['preprocess'] = {
'success': True,
'processed_count': processed_count,
'cleaning_rate': cleaning_rate
}
return True
else:
print(f" ✗ 预处理失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 预处理请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 预处理测试失败: {e}")
self.test_results['processing_steps']['preprocess'] = {
'success': False,
'error': str(e)
}
return False
def test_merge_step(self):
"""测试格式合并步骤"""
print("\n🔗 测试格式合并步骤...")
try:
response = requests.post(f"{API_BASE}/merge/",
json={},
headers={'Content-Type': 'application/json'},
timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
merged_records = data['data']['merged_records']
success_rate = data['data']['merge_success_rate']
print(f" ✓ 格式合并成功: 合并了 {merged_records} 条记录")
print(f" 📊 合并成功率: {success_rate}%")
self.test_results['processing_steps']['merge'] = {
'success': True,
'merged_records': merged_records,
'success_rate': success_rate
}
return True
else:
print(f" ✗ 格式合并失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 格式合并请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 格式合并测试失败: {e}")
self.test_results['processing_steps']['merge'] = {
'success': False,
'error': str(e)
}
return False
def test_correction_step(self):
"""测试单词纠错步骤"""
print("\n📝 测试单词纠错步骤...")
try:
response = requests.post(f"{API_BASE}/correct/",
json={},
headers={'Content-Type': 'application/json'},
timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
corrected_words = data['data']['corrected_words']
correction_types = data['data']['correction_types']
print(f" ✓ 单词纠错成功: 纠正了 {corrected_words} 个单词")
print(f" 📋 纠错类型: {', '.join(correction_types)}")
self.test_results['processing_steps']['correction'] = {
'success': True,
'corrected_words': corrected_words,
'correction_types': correction_types
}
return True
else:
print(f" ✗ 单词纠错失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 单词纠错请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 单词纠错测试失败: {e}")
self.test_results['processing_steps']['correction'] = {
'success': False,
'error': str(e)
}
return False
def test_ai_analysis_step(self):
"""测试AI分析步骤"""
print("\n🤖 测试AI分析步骤...")
try:
response = requests.post(f"{API_BASE}/analyze/",
json={},
headers={'Content-Type': 'application/json'},
timeout=30) # AI处理可能需要更长时间
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
if 'analysis_summary' in data['data']:
# 模拟分析结果
summary = data['data']['analysis_summary']
print(f" ✓ AI分析完成模拟结果")
print(f" 📊 呼号提取: {summary.get('call_signs_extracted', 0)}")
print(f" 📊 行为识别: {summary.get('behaviors_identified', 0)}")
print(f" 📊 高度检测: {summary.get('flight_levels_detected', 0)}")
else:
# 实际AI处理结果
print(f" ✓ AI分析完成实际处理")
print(f" 📊 处理结果: {data['data']}")
self.test_results['processing_steps']['ai_analysis'] = {
'success': True,
'result': data['data']
}
return True
else:
print(f" ✗ AI分析失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ AI分析请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ AI分析测试失败: {e}")
self.test_results['processing_steps']['ai_analysis'] = {
'success': False,
'error': str(e)
}
return False
def test_processed_data_api(self):
"""测试处理后数据获取API"""
print("\n📋 测试处理后数据获取API...")
try:
response = requests.get(f"{API_BASE}/processed-data/", timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
count = data['count']
records = data['data']
print(f" ✓ 成功获取 {count} 条处理后数据")
if records:
print(" 📋 处理后数据样例:")
for i, record in enumerate(records[:3]):
print(f" {i+1}. 呼号: {record.get('Call Sign', 'N/A')}")
print(f" 行为: {record.get('Behavior', 'N/A')}")
print(f" 高度: {record.get('Flight Level', 'N/A')}")
print(f" 位置: {record.get('Location', 'N/A')}")
print()
self.test_results['api_tests']['processed_data'] = {
'success': True,
'count': count,
'sample': records[:3] if records else []
}
return True
else:
print(f" ⚠️ 处理后数据为空或获取失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 处理后数据API请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 处理后数据API测试失败: {e}")
self.test_results['api_tests']['processed_data'] = {
'success': False,
'error': str(e)
}
return False
def test_statistics_api(self):
"""测试统计信息API"""
print("\n📊 测试统计信息API...")
try:
response = requests.get(f"{API_BASE}/statistics/", timeout=10)
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
stats = data['statistics']
print(f" ✓ 统计信息获取成功")
print(f" 📊 原始数据: {stats.get('original_count', 0)}")
print(f" 📊 处理数据: {stats.get('extracted_count', 0)}")
print(f" 📊 有效数据: {stats.get('valid_count', 0)}")
print(f" 📊 无效数据: {stats.get('invalid_count', 0)}")
print(f" 📊 提取率: {stats.get('extraction_rate', 0)}%")
print(f" 📊 验证率: {stats.get('validation_rate', 0)}%")
self.test_results['api_tests']['statistics'] = {
'success': True,
'statistics': stats
}
return True
else:
print(f" ✗ 统计信息获取失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 统计信息API请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 统计信息API测试失败: {e}")
self.test_results['api_tests']['statistics'] = {
'success': False,
'error': str(e)
}
return False
def test_complete_processing_api(self):
"""测试完整处理流程API"""
print("\n🚀 测试完整处理流程API...")
try:
response = requests.post(f"{API_BASE}/process-data/",
timeout=60) # 完整处理可能需要较长时间
if response.status_code == 200:
data = response.json()
if data['status'] == 'success':
print(f" ✓ 完整处理流程成功")
# 显示处理结果
if 'processed_count' in data:
print(f" 📊 处理数量: {data['processed_count']}")
if 'valid_count' in data:
print(f" 📊 有效数量: {data['valid_count']}")
if 'invalid_count' in data:
print(f" 📊 无效数量: {data['invalid_count']}")
self.test_results['processing_steps']['complete_process'] = {
'success': True,
'result': data
}
return True
else:
print(f" ✗ 完整处理失败: {data.get('message', '未知错误')}")
return False
else:
print(f" ✗ 完整处理API请求失败: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 完整处理API测试失败: {e}")
self.test_results['processing_steps']['complete_process'] = {
'success': False,
'error': str(e)
}
return False
def check_database_tables(self):
"""检查数据库表状态"""
print("\n🗄️ 检查数据库表状态...")
tables_to_check = [
'prewashed_table',
'processed_table',
'precessed_table', # 旧的拼写错误表名
'final_table',
'quarantine_table'
]
table_status = {}
try:
with connection.cursor() as cursor:
for table in tables_to_check:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
table_status[table] = {'exists': True, 'count': count}
print(f"{table}: {count} 条记录")
except Exception:
table_status[table] = {'exists': False, 'count': 0}
print(f" - {table}: 表不存在")
self.test_results['data_validation']['table_status'] = table_status
except Exception as e:
print(f" ✗ 数据库表检查失败: {e}")
self.test_results['data_validation']['table_status'] = {
'error': str(e)
}
def generate_test_report(self):
"""生成测试报告"""
print("\n" + "="*80)
print("📋 数据处理流程测试报告")
print("="*80)
# 测试数据准备
setup = self.test_results.get('setup', {})
if setup.get('success'):
print(f"\n✅ 测试数据准备: 成功 ({setup.get('data_count', 0)} 条)")
else:
print(f"\n❌ 测试数据准备: 失败")
# 处理步骤测试
steps = self.test_results.get('processing_steps', {})
print(f"\n🔄 处理步骤测试:")
step_names = {
'preprocess': '数据预处理',
'merge': '格式合并',
'correction': '单词纠错',
'ai_analysis': 'AI分析',
'complete_process': '完整处理流程'
}
for step_key, step_name in step_names.items():
if step_key in steps:
if steps[step_key].get('success'):
print(f"{step_name}: 成功")
else:
print(f"{step_name}: 失败")
else:
print(f" ⏭️ {step_name}: 跳过")
# API测试
api_tests = self.test_results.get('api_tests', {})
print(f"\n🌐 API测试:")
api_names = {
'original_data': '原始数据获取',
'processed_data': '处理后数据获取',
'statistics': '统计信息获取'
}
for api_key, api_name in api_names.items():
if api_key in api_tests:
if api_tests[api_key].get('success'):
print(f"{api_name}: 成功")
else:
print(f"{api_name}: 失败")
else:
print(f" ⏭️ {api_name}: 跳过")
# 数据验证
validation = self.test_results.get('data_validation', {})
if 'table_status' in validation:
print(f"\n🗄️ 数据库表状态:")
for table, status in validation['table_status'].items():
if status.get('exists'):
print(f"{table}: {status.get('count', 0)} 条记录")
else:
print(f" {table}: 不存在")
# 总体评估
total_tests = 0
passed_tests = 0
# 统计各类测试
if setup.get('success'):
passed_tests += 1
total_tests += 1
for step_result in steps.values():
if step_result.get('success'):
passed_tests += 1
total_tests += 1
for api_result in api_tests.values():
if api_result.get('success'):
passed_tests += 1
total_tests += 1
success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
print(f"\n🎯 总体测试结果: {passed_tests}/{total_tests} 通过 ({success_rate:.1f}%)")
if success_rate >= 80:
print("🏆 优秀!数据处理流程运行良好")
elif success_rate >= 60:
print("✅ 良好!大部分功能正常")
elif success_rate >= 40:
print("⚠️ 一般,部分功能需要检查")
else:
print("❌ 需要修复多个问题")
# 保存详细报告
report_file = Path(__file__).parent / "data_processing_test_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(self.test_results, f, indent=2, ensure_ascii=False)
print(f"\n📄 详细报告已保存: {report_file}")
def run_complete_test(self):
"""运行完整的数据处理流程测试"""
print("🎯 开始数据处理流程完整测试")
print("="*80)
# 1. 准备测试数据
if not self.setup_test_data():
print("❌ 测试数据准备失败,无法继续测试")
return False
# 2. 检查服务器状态
if not self.check_server_status():
print("❌ Django服务器未运行无法进行API测试")
print("💡 请先启动服务器: cd Django123/atc_extractor/backend && python manage.py runserver")
self.check_database_tables()
self.generate_test_report()
return False
# 3. 测试原始数据API
self.test_original_data_api()
# 4. 按顺序测试各个处理步骤
self.test_preprocessing_step()
time.sleep(1) # 避免请求过快
self.test_merge_step()
time.sleep(1)
self.test_correction_step()
time.sleep(1)
self.test_ai_analysis_step()
time.sleep(2)
# 5. 测试完整处理流程
self.test_complete_processing_api()
time.sleep(2)
# 6. 测试处理结果API
self.test_processed_data_api()
time.sleep(1)
# 7. 测试统计信息API
self.test_statistics_api()
# 8. 检查数据库表状态
self.check_database_tables()
# 9. 生成测试报告
self.generate_test_report()
return True
def main():
"""主函数"""
tester = DataProcessingFlowTest()
success = tester.run_complete_test()
print("\n" + "="*80)
if success:
print("✅ 数据处理流程测试完成!")
else:
print("⚠️ 测试完成,但可能存在问题需要解决")
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())