|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
Django后端综合测试套件 - 专业级测试框架
|
|
|
测试目标:检测并验证所有API接口、数据库操作、业务逻辑的正确性
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import django
|
|
|
import unittest
|
|
|
import json
|
|
|
import tempfile
|
|
|
import io
|
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
|
from django.test import TestCase, Client
|
|
|
from django.db import connection
|
|
|
from django.conf import settings
|
|
|
import pandas as pd
|
|
|
|
|
|
# 设置Django环境
|
|
|
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
|
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
|
|
|
django.setup()
|
|
|
|
|
|
from extractor.views import *
|
|
|
from extractor.services import process_data
|
|
|
from extractor.infoextractor_czhwjq import CallSignExtractor
|
|
|
|
|
|
class DatabaseTestCase(TestCase):
|
|
|
"""数据库连接和表结构测试"""
|
|
|
|
|
|
def setUp(self):
|
|
|
"""测试前准备:创建测试数据"""
|
|
|
self.client = Client()
|
|
|
# 创建测试用的基础表
|
|
|
with connection.cursor() as cursor:
|
|
|
# 创建测试用的prewashed_table
|
|
|
cursor.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS prewashed_table (
|
|
|
id VARCHAR(255) NOT NULL,
|
|
|
text TEXT,
|
|
|
PRIMARY KEY (id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
# 插入测试数据
|
|
|
cursor.execute("""
|
|
|
INSERT IGNORE INTO prewashed_table (id, text) VALUES
|
|
|
('test1', 'CCA123 climb to flight level 350'),
|
|
|
('test2', 'CSN456 descend to flight level 280'),
|
|
|
('test3', 'CHH789 maintain heading 090')
|
|
|
""")
|
|
|
|
|
|
def test_database_connection(self):
|
|
|
"""测试数据库连接"""
|
|
|
try:
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("SELECT 1")
|
|
|
result = cursor.fetchone()
|
|
|
self.assertEqual(result[0], 1)
|
|
|
except Exception as e:
|
|
|
self.fail(f"数据库连接失败: {e}")
|
|
|
|
|
|
def test_table_existence(self):
|
|
|
"""测试必要的表是否存在"""
|
|
|
required_tables = ['prewashed_table', 'air_company']
|
|
|
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("SHOW TABLES")
|
|
|
existing_tables = [table[0] for table in cursor.fetchall()]
|
|
|
|
|
|
for table in required_tables:
|
|
|
if table in existing_tables:
|
|
|
print(f"✓ 表 {table} 存在")
|
|
|
else:
|
|
|
print(f"✗ 表 {table} 不存在")
|
|
|
|
|
|
def test_table_structure(self):
|
|
|
"""测试表结构的完整性"""
|
|
|
with connection.cursor() as cursor:
|
|
|
# 测试prewashed_table结构
|
|
|
cursor.execute("DESCRIBE prewashed_table")
|
|
|
columns = [col[0] for col in cursor.fetchall()]
|
|
|
self.assertIn('id', columns)
|
|
|
self.assertIn('text', columns)
|
|
|
|
|
|
|
|
|
class APIEndpointTestCase(TestCase):
|
|
|
"""API接口功能测试"""
|
|
|
|
|
|
def setUp(self):
|
|
|
self.client = Client()
|
|
|
# 准备测试数据
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS prewashed_table (
|
|
|
id VARCHAR(255) NOT NULL,
|
|
|
text TEXT,
|
|
|
PRIMARY KEY (id)
|
|
|
)
|
|
|
""")
|
|
|
cursor.execute("""
|
|
|
INSERT IGNORE INTO prewashed_table (id, text) VALUES
|
|
|
('api_test1', 'CCA123 climb to flight level 350'),
|
|
|
('api_test2', 'CSN456 descend to flight level 280')
|
|
|
""")
|
|
|
|
|
|
def test_health_check_endpoint(self):
|
|
|
"""测试健康检查接口"""
|
|
|
response = self.client.get('/api/health/')
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
data = response.json()
|
|
|
self.assertEqual(data['status'], 'ok')
|
|
|
print("✓ 健康检查接口正常")
|
|
|
|
|
|
def test_original_data_endpoint(self):
|
|
|
"""测试原始数据获取接口"""
|
|
|
response = self.client.get('/api/original-data/')
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
data = response.json()
|
|
|
self.assertEqual(data['status'], 'success')
|
|
|
self.assertIsInstance(data['data'], list)
|
|
|
self.assertGreater(data['count'], 0)
|
|
|
print("✓ 原始数据接口正常")
|
|
|
|
|
|
def test_statistics_endpoint(self):
|
|
|
"""测试统计信息接口"""
|
|
|
response = self.client.get('/api/statistics/')
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
data = response.json()
|
|
|
self.assertEqual(data['status'], 'success')
|
|
|
self.assertIn('statistics', data)
|
|
|
print("✓ 统计接口正常")
|
|
|
|
|
|
def test_preprocess_endpoint(self):
|
|
|
"""测试预处理接口"""
|
|
|
response = self.client.post('/api/preprocess/',
|
|
|
json.dumps({'test': 'data'}),
|
|
|
content_type='application/json')
|
|
|
self.assertIn(response.status_code, [200, 400]) # 200成功或400数据相关错误
|
|
|
data = response.json()
|
|
|
self.assertIn(data['status'], ['success', 'error'])
|
|
|
print("✓ 预处理接口响应正常")
|
|
|
|
|
|
def test_merge_endpoint(self):
|
|
|
"""测试格式合并接口"""
|
|
|
response = self.client.post('/api/merge/',
|
|
|
json.dumps({'test': 'data'}),
|
|
|
content_type='application/json')
|
|
|
self.assertIn(response.status_code, [200, 400])
|
|
|
data = response.json()
|
|
|
self.assertIn(data['status'], ['success', 'error'])
|
|
|
print("✓ 格式合并接口响应正常")
|
|
|
|
|
|
def test_correct_endpoint(self):
|
|
|
"""测试单词纠错接口"""
|
|
|
response = self.client.post('/api/correct/',
|
|
|
json.dumps({'test': 'data'}),
|
|
|
content_type='application/json')
|
|
|
self.assertIn(response.status_code, [200, 400])
|
|
|
data = response.json()
|
|
|
self.assertIn(data['status'], ['success', 'error'])
|
|
|
print("✓ 单词纠错接口响应正常")
|
|
|
|
|
|
|
|
|
class FileUploadTestCase(TestCase):
|
|
|
"""文件上传功能测试"""
|
|
|
|
|
|
def setUp(self):
|
|
|
self.client = Client()
|
|
|
|
|
|
def test_csv_file_upload(self):
|
|
|
"""测试CSV文件上传"""
|
|
|
# 创建测试CSV文件
|
|
|
csv_content = "id,text\ntest1,CCA123 climb to FL350\ntest2,CSN456 descend to FL280"
|
|
|
csv_file = io.StringIO(csv_content)
|
|
|
csv_file.name = 'test_data.csv'
|
|
|
|
|
|
response = self.client.post('/api/upload/', {
|
|
|
'file': io.BytesIO(csv_content.encode('utf-8'))
|
|
|
})
|
|
|
|
|
|
# 检查响应状态
|
|
|
self.assertIn(response.status_code, [200, 400, 500])
|
|
|
print(f"✓ CSV上传接口响应状态: {response.status_code}")
|
|
|
|
|
|
def test_invalid_file_upload(self):
|
|
|
"""测试无效文件上传"""
|
|
|
response = self.client.post('/api/upload/', {})
|
|
|
self.assertEqual(response.status_code, 400)
|
|
|
data = response.json()
|
|
|
self.assertEqual(data['status'], 'error')
|
|
|
print("✓ 无效文件上传处理正确")
|
|
|
|
|
|
|
|
|
class DataProcessingTestCase(TestCase):
|
|
|
"""数据处理业务逻辑测试"""
|
|
|
|
|
|
def setUp(self):
|
|
|
self.client = Client()
|
|
|
# 准备测试数据
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS prewashed_table (
|
|
|
id VARCHAR(255) NOT NULL,
|
|
|
text TEXT,
|
|
|
PRIMARY KEY (id)
|
|
|
)
|
|
|
""")
|
|
|
cursor.execute("DELETE FROM prewashed_table")
|
|
|
cursor.execute("""
|
|
|
INSERT INTO prewashed_table (id, text) VALUES
|
|
|
('dp_test1', 'CCA123 climb to flight level 350'),
|
|
|
('dp_test2', 'CSN456 descend to flight level 280'),
|
|
|
('dp_test3', 'CHH789 maintain heading 090')
|
|
|
""")
|
|
|
|
|
|
@patch('extractor.services.ai_process_data')
|
|
|
def test_data_processing_with_mock(self, mock_ai_process):
|
|
|
"""测试数据处理流程(使用Mock)"""
|
|
|
# Mock AI处理结果
|
|
|
mock_ai_process.return_value = {
|
|
|
'status': 'success',
|
|
|
'processed_count': 3,
|
|
|
'valid_count': 2,
|
|
|
'invalid_count': 1
|
|
|
}
|
|
|
|
|
|
response = self.client.post('/api/process-data/')
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
data = response.json()
|
|
|
self.assertEqual(data['status'], 'success')
|
|
|
print("✓ 数据处理流程测试通过")
|
|
|
|
|
|
def test_empty_data_processing(self):
|
|
|
"""测试空数据处理"""
|
|
|
# 清空数据
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("DELETE FROM prewashed_table")
|
|
|
|
|
|
response = self.client.post('/api/preprocess/')
|
|
|
self.assertEqual(response.status_code, 400)
|
|
|
data = response.json()
|
|
|
self.assertEqual(data['status'], 'error')
|
|
|
self.assertIn('没有找到需要预处理的原始数据', data['message'])
|
|
|
print("✓ 空数据处理逻辑正确")
|
|
|
|
|
|
|
|
|
class ErrorHandlingTestCase(TestCase):
|
|
|
"""错误处理测试"""
|
|
|
|
|
|
def setUp(self):
|
|
|
self.client = Client()
|
|
|
|
|
|
@patch('extractor.views.connection')
|
|
|
def test_database_error_handling(self, mock_connection):
|
|
|
"""测试数据库连接错误处理"""
|
|
|
# 模拟数据库连接错误
|
|
|
mock_cursor = Mock()
|
|
|
mock_cursor.execute.side_effect = Exception("Database connection failed")
|
|
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
|
|
|
|
|
response = self.client.get('/api/statistics/')
|
|
|
self.assertEqual(response.status_code, 500)
|
|
|
data = response.json()
|
|
|
self.assertEqual(data['status'], 'error')
|
|
|
print("✓ 数据库错误处理正确")
|
|
|
|
|
|
def test_invalid_json_handling(self):
|
|
|
"""测试无效JSON处理"""
|
|
|
response = self.client.post('/api/preprocess/',
|
|
|
'invalid json',
|
|
|
content_type='application/json')
|
|
|
# Django会自动处理JSON解析错误,返回400
|
|
|
self.assertIn(response.status_code, [400, 500])
|
|
|
print("✓ 无效JSON处理正确")
|
|
|
|
|
|
|
|
|
class SecurityTestCase(TestCase):
|
|
|
"""安全性测试"""
|
|
|
|
|
|
def setUp(self):
|
|
|
self.client = Client()
|
|
|
|
|
|
def test_sql_injection_protection(self):
|
|
|
"""测试SQL注入防护"""
|
|
|
# 尝试SQL注入攻击
|
|
|
malicious_data = {"id": "1'; DROP TABLE prewashed_table; --"}
|
|
|
response = self.client.post('/api/preprocess/',
|
|
|
json.dumps(malicious_data),
|
|
|
content_type='application/json')
|
|
|
|
|
|
# 确保表仍然存在
|
|
|
with connection.cursor() as cursor:
|
|
|
cursor.execute("SHOW TABLES LIKE 'prewashed_table'")
|
|
|
result = cursor.fetchone()
|
|
|
self.assertIsNotNone(result)
|
|
|
print("✓ SQL注入防护有效")
|
|
|
|
|
|
def test_settings_security(self):
|
|
|
"""测试Django设置安全性"""
|
|
|
# 检查调试模式(生产环境应该关闭)
|
|
|
if settings.DEBUG:
|
|
|
print("⚠️ WARNING: DEBUG模式在生产环境中应该关闭")
|
|
|
|
|
|
# 检查SECRET_KEY
|
|
|
if 'django-insecure' in settings.SECRET_KEY:
|
|
|
print("⚠️ WARNING: 使用默认SECRET_KEY,生产环境需要更换")
|
|
|
|
|
|
print("✓ 安全设置检查完成")
|
|
|
|
|
|
|
|
|
class IntegrationTestCase(TestCase):
|
|
|
"""集成测试"""
|
|
|
|
|
|
def setUp(self):
|
|
|
self.client = Client()
|
|
|
|
|
|
def test_complete_workflow(self):
|
|
|
"""测试完整的工作流程"""
|
|
|
# 1. 检查健康状态
|
|
|
health_response = self.client.get('/api/health/')
|
|
|
self.assertEqual(health_response.status_code, 200)
|
|
|
|
|
|
# 2. 检查原始数据
|
|
|
data_response = self.client.get('/api/original-data/')
|
|
|
self.assertEqual(data_response.status_code, 200)
|
|
|
|
|
|
# 3. 执行预处理(如果有数据的话)
|
|
|
preprocess_response = self.client.post('/api/preprocess/',
|
|
|
json.dumps({}),
|
|
|
content_type='application/json')
|
|
|
self.assertIn(preprocess_response.status_code, [200, 400])
|
|
|
|
|
|
# 4. 获取统计信息
|
|
|
stats_response = self.client.get('/api/statistics/')
|
|
|
self.assertEqual(stats_response.status_code, 200)
|
|
|
|
|
|
print("✓ 完整工作流程测试通过")
|
|
|
|
|
|
|
|
|
def run_comprehensive_tests():
|
|
|
"""运行全面测试套件"""
|
|
|
print("🚀 开始运行Django后端综合测试...")
|
|
|
print("=" * 60)
|
|
|
|
|
|
# 创建测试套件
|
|
|
test_suite = unittest.TestSuite()
|
|
|
|
|
|
# 添加所有测试类
|
|
|
test_classes = [
|
|
|
DatabaseTestCase,
|
|
|
APIEndpointTestCase,
|
|
|
FileUploadTestCase,
|
|
|
DataProcessingTestCase,
|
|
|
ErrorHandlingTestCase,
|
|
|
SecurityTestCase,
|
|
|
IntegrationTestCase
|
|
|
]
|
|
|
|
|
|
for test_class in test_classes:
|
|
|
tests = unittest.TestLoader().loadTestsFromTestCase(test_class)
|
|
|
test_suite.addTests(tests)
|
|
|
|
|
|
# 运行测试
|
|
|
runner = unittest.TextTestRunner(verbosity=2)
|
|
|
result = runner.run(test_suite)
|
|
|
|
|
|
print("=" * 60)
|
|
|
print(f"测试完成: {result.testsRun} 个测试")
|
|
|
print(f"成功: {result.testsRun - len(result.failures) - len(result.errors)}")
|
|
|
print(f"失败: {len(result.failures)}")
|
|
|
print(f"错误: {len(result.errors)}")
|
|
|
|
|
|
if result.failures:
|
|
|
print("\n失败的测试:")
|
|
|
for test, traceback in result.failures:
|
|
|
print(f"- {test}: {traceback}")
|
|
|
|
|
|
if result.errors:
|
|
|
print("\n错误的测试:")
|
|
|
for test, traceback in result.errors:
|
|
|
print(f"- {test}: {traceback}")
|
|
|
|
|
|
return result.wasSuccessful()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
success = run_comprehensive_tests()
|
|
|
if success:
|
|
|
print("\n🎉 所有测试通过!")
|
|
|
else:
|
|
|
print("\n❌ 部分测试失败,需要修复问题")
|
|
|
|
|
|
sys.exit(0 if success else 1) |