You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
4.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
"""
测试版本的AI处理脚本 - 添加超时和错误处理
"""
import os
import sys
import django
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
# 设置Django环境
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
django.setup()
from extractor.infoextractor_czhwjq import CallSignExtractor, read_from_table, write_to_table, parse_input
def process_single_record(extractor, data_item, idx):
"""处理单条记录,带超时控制"""
try:
raw_id, raw_text = data_item[0], data_item[1]
print(f"处理第 {idx+1} 条记录: {raw_id}")
# 调用AI模型
result = extractor.extract_call_signs(raw_id, raw_text)
if not result:
print(f" ❌ AI返回空结果")
return []
# 解析结果
result_list = parse_input(result)
print(f" ✅ 解析出 {len(result_list)} 条记录")
return result_list
except Exception as e:
print(f" ❌ 处理失败: {e}")
return []
def test_ai_processing():
"""测试AI处理流程"""
print("开始测试AI处理流程...")
# 读取原始数据
print("1. 读取原始数据...")
id_data = read_from_table("prewashed_table", "id")
text_data = read_from_table("prewashed_table", "text")
data = [[id_data[i], text_data[i]] for i in range(len(id_data))]
if len(data) == 0:
print("❌ 没有原始数据")
return
print(f"📊 原始数据: {len(data)}")
# 只处理前10条作为测试
test_data = data[:10]
print(f"🧪 测试处理前 {len(test_data)}")
# 初始化提取器
extractor = CallSignExtractor()
all_result = []
idx = 0
print("2. 开始AI处理...")
start_time = time.time()
for i, data_item in enumerate(test_data):
try:
# 设置单条记录处理超时30秒
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(process_single_record, extractor, data_item, i)
result_list = future.result(timeout=30) # 30秒超时
# 添加结果
for row in result_list:
if row and len(row) >= 5: # 确保数据完整
idx += 1
all_result.append([idx] + row)
except TimeoutError:
print(f" ⏰ 第 {i+1} 条记录处理超时,跳过")
continue
except Exception as e:
print(f" ❌ 第 {i+1} 条记录处理失败: {e}")
continue
end_time = time.time()
processing_time = end_time - start_time
print(f"3. 处理完成:")
print(f" ⏱️ 处理时间: {processing_time:.2f}")
print(f" 📈 原始记录: {len(test_data)}")
print(f" 📊 提取结果: {len(all_result)}")
print(f" 🎯 提取率: {len(all_result)/len(test_data)*100:.1f}%")
# 显示前几条结果
if all_result:
print(f"4. 提取结果示例:")
for i, result in enumerate(all_result[:3]):
print(f" 记录{i+1}: {result}")
# 写入测试表
if all_result:
try:
headers = ["num", "id", "Call Sign", "Behavior", "Flight Level", "Location", "Time"]
write_to_table(all_result, headers, "test_precessed_table")
print(f"✅ 结果已写入 test_precessed_table 表")
except Exception as e:
print(f"❌ 写入数据库失败: {e}")
return {
"processed_records": len(test_data),
"extracted_records": len(all_result),
"extraction_rate": len(all_result)/len(test_data)*100 if test_data else 0,
"processing_time": processing_time
}
if __name__ == "__main__":
result = test_ai_processing()
print(f"\n🎉 测试完成: {result}")