#!/usr/bin/env python """ 批量AI处理脚本 - 优化版本 """ import os import sys import django import time from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed # 设置Django环境 sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') django.setup() from extractor.infoextractor_czhwjq import CallSignExtractor, read_from_table, write_to_table, parse_input def process_batch(extractor, batch_data, batch_id): """处理一批记录""" print(f"🔄 处理批次 {batch_id}") batch_results = [] for i, (raw_id, raw_text) in enumerate(batch_data): try: result = extractor.extract_call_signs(raw_id, raw_text) if result: result_list = parse_input(result) batch_results.extend(result_list) if (i + 1) % 5 == 0: print(f" 批次 {batch_id}: 已处理 {i+1}/{len(batch_data)} 条") except Exception as e: print(f" ❌ 批次 {batch_id} 第 {i+1} 条失败: {e}") continue print(f"✅ 批次 {batch_id} 完成: {len(batch_results)} 条结果") return batch_results def run_full_ai_processing(): """运行完整的AI处理""" print("🚀 开始完整AI数据处理...") # 1. 读取所有数据 print("1. 读取所有数据...") id_data = read_from_table("prewashed_table", "id") text_data = read_from_table("prewashed_table", "text") data = [(id_data[i], text_data[i]) for i in range(len(id_data))] if len(data) == 0: print("❌ 没有数据") return print(f"📊 总数据: {len(data)} 条") # 2. 分批处理 (每批50条) batch_size = 50 batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)] print(f"📦 分成 {len(batches)} 个批次,每批最多 {batch_size} 条") # 3. 初始化 extractor = CallSignExtractor() all_results = [] start_time = time.time() # 4. 逐批处理 (避免并发太多导致API限制) for batch_id, batch in enumerate(batches, 1): try: batch_results = process_batch(extractor, batch, batch_id) all_results.extend(batch_results) # 显示进度 processed_count = batch_id * batch_size if processed_count > len(data): processed_count = len(data) elapsed_time = time.time() - start_time avg_time = elapsed_time / processed_count remaining_time = avg_time * (len(data) - processed_count) print(f"📈 进度: {processed_count}/{len(data)} ({processed_count/len(data)*100:.1f}%)") print(f"⏱️ 已用时: {elapsed_time:.1f}s, 预计剩余: {remaining_time:.1f}s") print(f"🎯 当前提取: {len(all_results)} 条") print("-" * 50) # 每处理几批就保存一次(防止崩溃丢失数据) if batch_id % 5 == 0: print("💾 中间保存...") save_results(all_results, f"temp_batch_{batch_id}") except Exception as e: print(f"❌ 批次 {batch_id} 处理失败: {e}") continue # 5. 最终保存 total_time = time.time() - start_time print(f"\n🎉 AI处理完成!") print(f"⏱️ 总时间: {total_time:.1f} 秒 ({total_time/60:.1f} 分钟)") print(f"📊 原始数据: {len(data)} 条") print(f"🎯 提取结果: {len(all_results)} 条") print(f"📈 平均提取率: {len(all_results)/len(data)*100:.1f}%") if all_results: save_results(all_results, "precessed_table") print("✅ 结果已保存到 precessed_table") # 继续数据验证 print("\n🔍 开始数据验证...") run_data_validation() return { "original_count": len(data), "extracted_count": len(all_results), "processing_time": total_time } def save_results(results, table_name): """保存结果到数据库""" try: # 添加序号 numbered_results = [] for i, result in enumerate(results, 1): numbered_results.append([i] + result) headers = ["num", "id", "Call Sign", "Behavior", "Flight Level", "Location", "Time"] write_to_table(numbered_results, headers, table_name) print(f"💾 保存了 {len(numbered_results)} 条记录到 {table_name}") except Exception as e: print(f"❌ 保存失败: {e}") def run_data_validation(): """运行数据验证""" try: from extractor.data_examination_lyh import CallSignValidator, read_table, write_final_table, write_quarantine_table import pandas as pd validator = CallSignValidator() input_df = read_table("precessed_table") if input_df.empty: print("❌ precessed_table 为空") return print(f"🔍 验证 {len(input_df)} 条AI提取结果...") processed_data = [] for _, row in input_df.iterrows(): validation_row = { 'ID': row['id'], 'CallSignal': row['Call Sign'], 'Behavior': row['Behavior'], 'FlightLevel': row['Flight Level'], 'Location': row['Location'], 'Time': row['Time'] } processed_result = validator.process_row(validation_row) processed_data.append(processed_result) result_df = pd.DataFrame(processed_data) valid_df = result_df[result_df['is_valid'] == True].drop(columns=['is_valid']) invalid_df = result_df[result_df['is_valid'] == False].drop(columns=['is_valid']) if not valid_df.empty: write_final_table(valid_df) print(f"✅ {len(valid_df)} 条有效数据写入 final_table") if not invalid_df.empty: write_quarantine_table(invalid_df) print(f"⚠️ {len(invalid_df)} 条无效数据写入 quarantine_table") print(f"🎯 验证完成: {len(valid_df)} 有效 + {len(invalid_df)} 无效") except Exception as e: print(f"❌ 数据验证失败: {e}") if __name__ == "__main__": # 询问用户是否确认运行(因为会花费很长时间) print("⚠️ 警告: 完整处理769条数据预计需要50-60分钟") print("🧪 建议先运行小批量测试") choice = input("\n选择运行模式:\n1. 完整处理 (769条, ~60分钟)\n2. 小批量测试 (100条, ~10分钟)\n3. 退出\n请输入 (1/2/3): ") if choice == "1": print("🚀 开始完整处理...") result = run_full_ai_processing() print(f"🎉 最终结果: {result}") elif choice == "2": print("🧪 开始小批量测试...") # 修改为只处理前100条 import extractor.infoextractor_czhwjq as processor # 这里可以添加小批量测试逻辑 print("小批量测试需要单独实现") else: print("👋 退出")