|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
数据处理流程完整测试
|
|
|
测试从原始数据到最终结果的完整流程
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import django
|
|
|
import requests
|
|
|
import json
|
|
|
import time
|
|
|
import pandas as pd
|
|
|
from pathlib import Path
|
|
|
|
|
|
# 设置Django环境
|
|
|
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
|
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
|
|
|
django.setup()
|
|
|
|
|
|
from django.db import connection
|
|
|
|
|
|
API_BASE = "http://127.0.0.1:8080/api"
|
|
|
|
|
|
class DataProcessingFlowTest:
|
|
|
def __init__(self):
|
|
|
self.test_results = {
|
|
|
'setup': {},
|
|
|
'processing_steps': {},
|
|
|
'data_validation': {},
|
|
|
'api_tests': {}
|
|
|
}
|
|
|
|
|
|
def setup_test_data(self):
|
|
|
"""准备测试数据"""
|
|
|
print("🔧 准备测试数据...")
|
|
|
|
|
|
# 创建测试用的ATC对话数据
|
|
|
test_data = [
|
|
|
("test_001", "CCA123 climb to flight level 350"),
|
|
|
("test_002", "CSN456 descend to flight level 280"),
|
|
|
("test_003", "CHH789 maintain heading 090 degrees"),
|
|
|
("test_004", "Air China 001 turn left heading 270"),
|
|
|
("test_005", "China Southern 888 contact approach 121.9"),
|
|
|
("test_006", "Hainan Airlines 7777 reduce speed to 250 knots"),
|
|
|
("test_007", "Spring Airlines 9999 cleared for takeoff runway 36L"),
|
|
|
("test_008", "Juneyao Airlines 1234 taxi to gate A15"),
|
|
|
("test_009", "Shanghai Airlines 5678 hold short of runway 18R"),
|
|
|
("test_010", "Tibet Airlines 9876 report when ready for departure")
|
|
|
]
|
|
|
|
|
|
try:
|
|
|
with connection.cursor() as cursor:
|
|
|
# 确保prewashed_table存在
|
|
|
cursor.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS prewashed_table (
|
|
|
id VARCHAR(255) NOT NULL,
|
|
|
text TEXT,
|
|
|
PRIMARY KEY (id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
# 清空现有数据
|
|
|
cursor.execute("DELETE FROM prewashed_table")
|
|
|
|
|
|
# 插入测试数据
|
|
|
for id_val, text_val in test_data:
|
|
|
cursor.execute(
|
|
|
"INSERT INTO prewashed_table (id, text) VALUES (%s, %s)",
|
|
|
[id_val, text_val]
|
|
|
)
|
|
|
|
|
|
# 验证插入的数据
|
|
|
cursor.execute("SELECT COUNT(*) FROM prewashed_table")
|
|
|
count = cursor.fetchone()[0]
|
|
|
|
|
|
print(f" ✓ 成功插入 {count} 条测试数据")
|
|
|
|
|
|
# 显示插入的数据样例
|
|
|
cursor.execute("SELECT * FROM prewashed_table LIMIT 3")
|
|
|
sample_data = cursor.fetchall()
|
|
|
print(" 📋 数据样例:")
|
|
|
for row in sample_data:
|
|
|
print(f" ID: {row[0]}, Text: {row[1]}")
|
|
|
|
|
|
self.test_results['setup'] = {
|
|
|
'success': True,
|
|
|
'data_count': count,
|
|
|
'sample_data': sample_data
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 测试数据准备失败: {e}")
|
|
|
self.test_results['setup'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def check_server_status(self):
|
|
|
"""检查Django服务器状态"""
|
|
|
print("\n🚀 检查Django服务器状态...")
|
|
|
|
|
|
try:
|
|
|
response = requests.get(f"{API_BASE}/health/", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
print(" ✓ Django服务器运行正常")
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 服务器响应异常: {response.status_code}")
|
|
|
return False
|
|
|
except requests.RequestException as e:
|
|
|
print(f" ✗ 无法连接到Django服务器: {e}")
|
|
|
print(" 💡 请确保Django服务器正在运行: python manage.py runserver")
|
|
|
return False
|
|
|
|
|
|
def test_original_data_api(self):
|
|
|
"""测试原始数据获取API"""
|
|
|
print("\n📊 测试原始数据获取API...")
|
|
|
|
|
|
try:
|
|
|
response = requests.get(f"{API_BASE}/original-data/", timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
count = data['count']
|
|
|
records = data['data']
|
|
|
|
|
|
print(f" ✓ 成功获取 {count} 条原始数据")
|
|
|
print(" 📋 原始数据样例:")
|
|
|
for i, record in enumerate(records[:3]):
|
|
|
print(f" {i+1}. ID: {record['id']}, Text: {record['text'][:50]}...")
|
|
|
|
|
|
self.test_results['api_tests']['original_data'] = {
|
|
|
'success': True,
|
|
|
'count': count,
|
|
|
'sample': records[:3]
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ API返回错误: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ API请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 原始数据API测试失败: {e}")
|
|
|
self.test_results['api_tests']['original_data'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def test_preprocessing_step(self):
|
|
|
"""测试数据预处理步骤"""
|
|
|
print("\n🔄 测试数据预处理步骤...")
|
|
|
|
|
|
try:
|
|
|
response = requests.post(f"{API_BASE}/preprocess/",
|
|
|
json={},
|
|
|
headers={'Content-Type': 'application/json'},
|
|
|
timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
processed_count = data['data']['processed_count']
|
|
|
cleaning_rate = data['data']['cleaning_rate']
|
|
|
|
|
|
print(f" ✓ 预处理成功: 处理了 {processed_count} 条记录")
|
|
|
print(f" 📊 数据清理率: {cleaning_rate}%")
|
|
|
|
|
|
self.test_results['processing_steps']['preprocess'] = {
|
|
|
'success': True,
|
|
|
'processed_count': processed_count,
|
|
|
'cleaning_rate': cleaning_rate
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 预处理失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 预处理请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 预处理测试失败: {e}")
|
|
|
self.test_results['processing_steps']['preprocess'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def test_merge_step(self):
|
|
|
"""测试格式合并步骤"""
|
|
|
print("\n🔗 测试格式合并步骤...")
|
|
|
|
|
|
try:
|
|
|
response = requests.post(f"{API_BASE}/merge/",
|
|
|
json={},
|
|
|
headers={'Content-Type': 'application/json'},
|
|
|
timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
merged_records = data['data']['merged_records']
|
|
|
success_rate = data['data']['merge_success_rate']
|
|
|
|
|
|
print(f" ✓ 格式合并成功: 合并了 {merged_records} 条记录")
|
|
|
print(f" 📊 合并成功率: {success_rate}%")
|
|
|
|
|
|
self.test_results['processing_steps']['merge'] = {
|
|
|
'success': True,
|
|
|
'merged_records': merged_records,
|
|
|
'success_rate': success_rate
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 格式合并失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 格式合并请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 格式合并测试失败: {e}")
|
|
|
self.test_results['processing_steps']['merge'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def test_correction_step(self):
|
|
|
"""测试单词纠错步骤"""
|
|
|
print("\n📝 测试单词纠错步骤...")
|
|
|
|
|
|
try:
|
|
|
response = requests.post(f"{API_BASE}/correct/",
|
|
|
json={},
|
|
|
headers={'Content-Type': 'application/json'},
|
|
|
timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
corrected_words = data['data']['corrected_words']
|
|
|
correction_types = data['data']['correction_types']
|
|
|
|
|
|
print(f" ✓ 单词纠错成功: 纠正了 {corrected_words} 个单词")
|
|
|
print(f" 📋 纠错类型: {', '.join(correction_types)}")
|
|
|
|
|
|
self.test_results['processing_steps']['correction'] = {
|
|
|
'success': True,
|
|
|
'corrected_words': corrected_words,
|
|
|
'correction_types': correction_types
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 单词纠错失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 单词纠错请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 单词纠错测试失败: {e}")
|
|
|
self.test_results['processing_steps']['correction'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def test_ai_analysis_step(self):
|
|
|
"""测试AI分析步骤"""
|
|
|
print("\n🤖 测试AI分析步骤...")
|
|
|
|
|
|
try:
|
|
|
response = requests.post(f"{API_BASE}/analyze/",
|
|
|
json={},
|
|
|
headers={'Content-Type': 'application/json'},
|
|
|
timeout=30) # AI处理可能需要更长时间
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
if 'analysis_summary' in data['data']:
|
|
|
# 模拟分析结果
|
|
|
summary = data['data']['analysis_summary']
|
|
|
print(f" ✓ AI分析完成(模拟结果)")
|
|
|
print(f" 📊 呼号提取: {summary.get('call_signs_extracted', 0)}")
|
|
|
print(f" 📊 行为识别: {summary.get('behaviors_identified', 0)}")
|
|
|
print(f" 📊 高度检测: {summary.get('flight_levels_detected', 0)}")
|
|
|
else:
|
|
|
# 实际AI处理结果
|
|
|
print(f" ✓ AI分析完成(实际处理)")
|
|
|
print(f" 📊 处理结果: {data['data']}")
|
|
|
|
|
|
self.test_results['processing_steps']['ai_analysis'] = {
|
|
|
'success': True,
|
|
|
'result': data['data']
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ AI分析失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ AI分析请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ AI分析测试失败: {e}")
|
|
|
self.test_results['processing_steps']['ai_analysis'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def test_processed_data_api(self):
|
|
|
"""测试处理后数据获取API"""
|
|
|
print("\n📋 测试处理后数据获取API...")
|
|
|
|
|
|
try:
|
|
|
response = requests.get(f"{API_BASE}/processed-data/", timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
count = data['count']
|
|
|
records = data['data']
|
|
|
|
|
|
print(f" ✓ 成功获取 {count} 条处理后数据")
|
|
|
|
|
|
if records:
|
|
|
print(" 📋 处理后数据样例:")
|
|
|
for i, record in enumerate(records[:3]):
|
|
|
print(f" {i+1}. 呼号: {record.get('Call Sign', 'N/A')}")
|
|
|
print(f" 行为: {record.get('Behavior', 'N/A')}")
|
|
|
print(f" 高度: {record.get('Flight Level', 'N/A')}")
|
|
|
print(f" 位置: {record.get('Location', 'N/A')}")
|
|
|
print()
|
|
|
|
|
|
self.test_results['api_tests']['processed_data'] = {
|
|
|
'success': True,
|
|
|
'count': count,
|
|
|
'sample': records[:3] if records else []
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ⚠️ 处理后数据为空或获取失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 处理后数据API请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 处理后数据API测试失败: {e}")
|
|
|
self.test_results['api_tests']['processed_data'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def test_statistics_api(self):
|
|
|
"""测试统计信息API"""
|
|
|
print("\n📊 测试统计信息API...")
|
|
|
|
|
|
try:
|
|
|
response = requests.get(f"{API_BASE}/statistics/", timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
stats = data['statistics']
|
|
|
|
|
|
print(f" ✓ 统计信息获取成功")
|
|
|
print(f" 📊 原始数据: {stats.get('original_count', 0)} 条")
|
|
|
print(f" 📊 处理数据: {stats.get('extracted_count', 0)} 条")
|
|
|
print(f" 📊 有效数据: {stats.get('valid_count', 0)} 条")
|
|
|
print(f" 📊 无效数据: {stats.get('invalid_count', 0)} 条")
|
|
|
print(f" 📊 提取率: {stats.get('extraction_rate', 0)}%")
|
|
|
print(f" 📊 验证率: {stats.get('validation_rate', 0)}%")
|
|
|
|
|
|
self.test_results['api_tests']['statistics'] = {
|
|
|
'success': True,
|
|
|
'statistics': stats
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 统计信息获取失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 统计信息API请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 统计信息API测试失败: {e}")
|
|
|
self.test_results['api_tests']['statistics'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def test_complete_processing_api(self):
|
|
|
"""测试完整处理流程API"""
|
|
|
print("\n🚀 测试完整处理流程API...")
|
|
|
|
|
|
try:
|
|
|
response = requests.post(f"{API_BASE}/process-data/",
|
|
|
timeout=60) # 完整处理可能需要较长时间
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
|
|
|
if data['status'] == 'success':
|
|
|
print(f" ✓ 完整处理流程成功")
|
|
|
|
|
|
# 显示处理结果
|
|
|
if 'processed_count' in data:
|
|
|
print(f" 📊 处理数量: {data['processed_count']}")
|
|
|
if 'valid_count' in data:
|
|
|
print(f" 📊 有效数量: {data['valid_count']}")
|
|
|
if 'invalid_count' in data:
|
|
|
print(f" 📊 无效数量: {data['invalid_count']}")
|
|
|
|
|
|
self.test_results['processing_steps']['complete_process'] = {
|
|
|
'success': True,
|
|
|
'result': data
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print(f" ✗ 完整处理失败: {data.get('message', '未知错误')}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f" ✗ 完整处理API请求失败: {response.status_code}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 完整处理API测试失败: {e}")
|
|
|
self.test_results['processing_steps']['complete_process'] = {
|
|
|
'success': False,
|
|
|
'error': str(e)
|
|
|
}
|
|
|
return False
|
|
|
|
|
|
def check_database_tables(self):
|
|
|
"""检查数据库表状态"""
|
|
|
print("\n🗄️ 检查数据库表状态...")
|
|
|
|
|
|
tables_to_check = [
|
|
|
'prewashed_table',
|
|
|
'processed_table',
|
|
|
'precessed_table', # 旧的拼写错误表名
|
|
|
'final_table',
|
|
|
'quarantine_table'
|
|
|
]
|
|
|
|
|
|
table_status = {}
|
|
|
|
|
|
try:
|
|
|
with connection.cursor() as cursor:
|
|
|
for table in tables_to_check:
|
|
|
try:
|
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
|
count = cursor.fetchone()[0]
|
|
|
table_status[table] = {'exists': True, 'count': count}
|
|
|
print(f" ✓ {table}: {count} 条记录")
|
|
|
except Exception:
|
|
|
table_status[table] = {'exists': False, 'count': 0}
|
|
|
print(f" - {table}: 表不存在")
|
|
|
|
|
|
self.test_results['data_validation']['table_status'] = table_status
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f" ✗ 数据库表检查失败: {e}")
|
|
|
self.test_results['data_validation']['table_status'] = {
|
|
|
'error': str(e)
|
|
|
}
|
|
|
|
|
|
def generate_test_report(self):
|
|
|
"""生成测试报告"""
|
|
|
print("\n" + "="*80)
|
|
|
print("📋 数据处理流程测试报告")
|
|
|
print("="*80)
|
|
|
|
|
|
# 测试数据准备
|
|
|
setup = self.test_results.get('setup', {})
|
|
|
if setup.get('success'):
|
|
|
print(f"\n✅ 测试数据准备: 成功 ({setup.get('data_count', 0)} 条)")
|
|
|
else:
|
|
|
print(f"\n❌ 测试数据准备: 失败")
|
|
|
|
|
|
# 处理步骤测试
|
|
|
steps = self.test_results.get('processing_steps', {})
|
|
|
print(f"\n🔄 处理步骤测试:")
|
|
|
|
|
|
step_names = {
|
|
|
'preprocess': '数据预处理',
|
|
|
'merge': '格式合并',
|
|
|
'correction': '单词纠错',
|
|
|
'ai_analysis': 'AI分析',
|
|
|
'complete_process': '完整处理流程'
|
|
|
}
|
|
|
|
|
|
for step_key, step_name in step_names.items():
|
|
|
if step_key in steps:
|
|
|
if steps[step_key].get('success'):
|
|
|
print(f" ✅ {step_name}: 成功")
|
|
|
else:
|
|
|
print(f" ❌ {step_name}: 失败")
|
|
|
else:
|
|
|
print(f" ⏭️ {step_name}: 跳过")
|
|
|
|
|
|
# API测试
|
|
|
api_tests = self.test_results.get('api_tests', {})
|
|
|
print(f"\n🌐 API测试:")
|
|
|
|
|
|
api_names = {
|
|
|
'original_data': '原始数据获取',
|
|
|
'processed_data': '处理后数据获取',
|
|
|
'statistics': '统计信息获取'
|
|
|
}
|
|
|
|
|
|
for api_key, api_name in api_names.items():
|
|
|
if api_key in api_tests:
|
|
|
if api_tests[api_key].get('success'):
|
|
|
print(f" ✅ {api_name}: 成功")
|
|
|
else:
|
|
|
print(f" ❌ {api_name}: 失败")
|
|
|
else:
|
|
|
print(f" ⏭️ {api_name}: 跳过")
|
|
|
|
|
|
# 数据验证
|
|
|
validation = self.test_results.get('data_validation', {})
|
|
|
if 'table_status' in validation:
|
|
|
print(f"\n🗄️ 数据库表状态:")
|
|
|
for table, status in validation['table_status'].items():
|
|
|
if status.get('exists'):
|
|
|
print(f" ✅ {table}: {status.get('count', 0)} 条记录")
|
|
|
else:
|
|
|
print(f" ➖ {table}: 不存在")
|
|
|
|
|
|
# 总体评估
|
|
|
total_tests = 0
|
|
|
passed_tests = 0
|
|
|
|
|
|
# 统计各类测试
|
|
|
if setup.get('success'):
|
|
|
passed_tests += 1
|
|
|
total_tests += 1
|
|
|
|
|
|
for step_result in steps.values():
|
|
|
if step_result.get('success'):
|
|
|
passed_tests += 1
|
|
|
total_tests += 1
|
|
|
|
|
|
for api_result in api_tests.values():
|
|
|
if api_result.get('success'):
|
|
|
passed_tests += 1
|
|
|
total_tests += 1
|
|
|
|
|
|
success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
|
|
|
|
|
|
print(f"\n🎯 总体测试结果: {passed_tests}/{total_tests} 通过 ({success_rate:.1f}%)")
|
|
|
|
|
|
if success_rate >= 80:
|
|
|
print("🏆 优秀!数据处理流程运行良好")
|
|
|
elif success_rate >= 60:
|
|
|
print("✅ 良好!大部分功能正常")
|
|
|
elif success_rate >= 40:
|
|
|
print("⚠️ 一般,部分功能需要检查")
|
|
|
else:
|
|
|
print("❌ 需要修复多个问题")
|
|
|
|
|
|
# 保存详细报告
|
|
|
report_file = Path(__file__).parent / "data_processing_test_report.json"
|
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(self.test_results, f, indent=2, ensure_ascii=False)
|
|
|
print(f"\n📄 详细报告已保存: {report_file}")
|
|
|
|
|
|
def run_complete_test(self):
|
|
|
"""运行完整的数据处理流程测试"""
|
|
|
print("🎯 开始数据处理流程完整测试")
|
|
|
print("="*80)
|
|
|
|
|
|
# 1. 准备测试数据
|
|
|
if not self.setup_test_data():
|
|
|
print("❌ 测试数据准备失败,无法继续测试")
|
|
|
return False
|
|
|
|
|
|
# 2. 检查服务器状态
|
|
|
if not self.check_server_status():
|
|
|
print("❌ Django服务器未运行,无法进行API测试")
|
|
|
print("💡 请先启动服务器: cd Django123/atc_extractor/backend && python manage.py runserver")
|
|
|
self.check_database_tables()
|
|
|
self.generate_test_report()
|
|
|
return False
|
|
|
|
|
|
# 3. 测试原始数据API
|
|
|
self.test_original_data_api()
|
|
|
|
|
|
# 4. 按顺序测试各个处理步骤
|
|
|
self.test_preprocessing_step()
|
|
|
time.sleep(1) # 避免请求过快
|
|
|
|
|
|
self.test_merge_step()
|
|
|
time.sleep(1)
|
|
|
|
|
|
self.test_correction_step()
|
|
|
time.sleep(1)
|
|
|
|
|
|
self.test_ai_analysis_step()
|
|
|
time.sleep(2)
|
|
|
|
|
|
# 5. 测试完整处理流程
|
|
|
self.test_complete_processing_api()
|
|
|
time.sleep(2)
|
|
|
|
|
|
# 6. 测试处理结果API
|
|
|
self.test_processed_data_api()
|
|
|
time.sleep(1)
|
|
|
|
|
|
# 7. 测试统计信息API
|
|
|
self.test_statistics_api()
|
|
|
|
|
|
# 8. 检查数据库表状态
|
|
|
self.check_database_tables()
|
|
|
|
|
|
# 9. 生成测试报告
|
|
|
self.generate_test_report()
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""主函数"""
|
|
|
tester = DataProcessingFlowTest()
|
|
|
success = tester.run_complete_test()
|
|
|
|
|
|
print("\n" + "="*80)
|
|
|
if success:
|
|
|
print("✅ 数据处理流程测试完成!")
|
|
|
else:
|
|
|
print("⚠️ 测试完成,但可能存在问题需要解决")
|
|
|
|
|
|
return 0 if success else 1
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
sys.exit(main()) |