#!/usr/bin/env python """ 测试版本的AI处理脚本 - 添加超时和错误处理 """ import os import sys import django import time from concurrent.futures import ThreadPoolExecutor, TimeoutError # 设置Django环境 sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') django.setup() from extractor.infoextractor_czhwjq import CallSignExtractor, read_from_table, write_to_table, parse_input def process_single_record(extractor, data_item, idx): """处理单条记录,带超时控制""" try: raw_id, raw_text = data_item[0], data_item[1] print(f"处理第 {idx+1} 条记录: {raw_id}") # 调用AI模型 result = extractor.extract_call_signs(raw_id, raw_text) if not result: print(f" ❌ AI返回空结果") return [] # 解析结果 result_list = parse_input(result) print(f" ✅ 解析出 {len(result_list)} 条记录") return result_list except Exception as e: print(f" ❌ 处理失败: {e}") return [] def test_ai_processing(): """测试AI处理流程""" print("开始测试AI处理流程...") # 读取原始数据 print("1. 读取原始数据...") id_data = read_from_table("prewashed_table", "id") text_data = read_from_table("prewashed_table", "text") data = [[id_data[i], text_data[i]] for i in range(len(id_data))] if len(data) == 0: print("❌ 没有原始数据") return print(f"📊 原始数据: {len(data)} 条") # 只处理前10条作为测试 test_data = data[:10] print(f"🧪 测试处理前 {len(test_data)} 条") # 初始化提取器 extractor = CallSignExtractor() all_result = [] idx = 0 print("2. 开始AI处理...") start_time = time.time() for i, data_item in enumerate(test_data): try: # 设置单条记录处理超时(30秒) with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(process_single_record, extractor, data_item, i) result_list = future.result(timeout=30) # 30秒超时 # 添加结果 for row in result_list: if row and len(row) >= 5: # 确保数据完整 idx += 1 all_result.append([idx] + row) except TimeoutError: print(f" ⏰ 第 {i+1} 条记录处理超时,跳过") continue except Exception as e: print(f" ❌ 第 {i+1} 条记录处理失败: {e}") continue end_time = time.time() processing_time = end_time - start_time print(f"3. 处理完成:") print(f" ⏱️ 处理时间: {processing_time:.2f} 秒") print(f" 📈 原始记录: {len(test_data)} 条") print(f" 📊 提取结果: {len(all_result)} 条") print(f" 🎯 提取率: {len(all_result)/len(test_data)*100:.1f}%") # 显示前几条结果 if all_result: print(f"4. 提取结果示例:") for i, result in enumerate(all_result[:3]): print(f" 记录{i+1}: {result}") # 写入测试表 if all_result: try: headers = ["num", "id", "Call Sign", "Behavior", "Flight Level", "Location", "Time"] write_to_table(all_result, headers, "test_precessed_table") print(f"✅ 结果已写入 test_precessed_table 表") except Exception as e: print(f"❌ 写入数据库失败: {e}") return { "processed_records": len(test_data), "extracted_records": len(all_result), "extraction_rate": len(all_result)/len(test_data)*100 if test_data else 0, "processing_time": processing_time } if __name__ == "__main__": result = test_ai_processing() print(f"\n🎉 测试完成: {result}")