You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
7.3 KiB

#!/usr/bin/env python
"""
批量AI处理脚本 - 优化版本
"""
import os
import sys
import django
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
# 设置Django环境
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
django.setup()
from extractor.infoextractor_czhwjq import CallSignExtractor, read_from_table, write_to_table, parse_input
def process_batch(extractor, batch_data, batch_id):
"""处理一批记录"""
print(f"🔄 处理批次 {batch_id}")
batch_results = []
for i, (raw_id, raw_text) in enumerate(batch_data):
try:
result = extractor.extract_call_signs(raw_id, raw_text)
if result:
result_list = parse_input(result)
batch_results.extend(result_list)
if (i + 1) % 5 == 0:
print(f" 批次 {batch_id}: 已处理 {i+1}/{len(batch_data)}")
except Exception as e:
print(f" ❌ 批次 {batch_id}{i+1} 条失败: {e}")
continue
print(f"✅ 批次 {batch_id} 完成: {len(batch_results)} 条结果")
return batch_results
def run_full_ai_processing():
"""运行完整的AI处理"""
print("🚀 开始完整AI数据处理...")
# 1. 读取所有数据
print("1. 读取所有数据...")
id_data = read_from_table("prewashed_table", "id")
text_data = read_from_table("prewashed_table", "text")
data = [(id_data[i], text_data[i]) for i in range(len(id_data))]
if len(data) == 0:
print("❌ 没有数据")
return
print(f"📊 总数据: {len(data)}")
# 2. 分批处理 (每批50条)
batch_size = 50
batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
print(f"📦 分成 {len(batches)} 个批次,每批最多 {batch_size}")
# 3. 初始化
extractor = CallSignExtractor()
all_results = []
start_time = time.time()
# 4. 逐批处理 (避免并发太多导致API限制)
for batch_id, batch in enumerate(batches, 1):
try:
batch_results = process_batch(extractor, batch, batch_id)
all_results.extend(batch_results)
# 显示进度
processed_count = batch_id * batch_size
if processed_count > len(data):
processed_count = len(data)
elapsed_time = time.time() - start_time
avg_time = elapsed_time / processed_count
remaining_time = avg_time * (len(data) - processed_count)
print(f"📈 进度: {processed_count}/{len(data)} ({processed_count/len(data)*100:.1f}%)")
print(f"⏱️ 已用时: {elapsed_time:.1f}s, 预计剩余: {remaining_time:.1f}s")
print(f"🎯 当前提取: {len(all_results)}")
print("-" * 50)
# 每处理几批就保存一次(防止崩溃丢失数据)
if batch_id % 5 == 0:
print("💾 中间保存...")
save_results(all_results, f"temp_batch_{batch_id}")
except Exception as e:
print(f"❌ 批次 {batch_id} 处理失败: {e}")
continue
# 5. 最终保存
total_time = time.time() - start_time
print(f"\n🎉 AI处理完成!")
print(f"⏱️ 总时间: {total_time:.1f} 秒 ({total_time/60:.1f} 分钟)")
print(f"📊 原始数据: {len(data)}")
print(f"🎯 提取结果: {len(all_results)}")
print(f"📈 平均提取率: {len(all_results)/len(data)*100:.1f}%")
if all_results:
save_results(all_results, "precessed_table")
print("✅ 结果已保存到 precessed_table")
# 继续数据验证
print("\n🔍 开始数据验证...")
run_data_validation()
return {
"original_count": len(data),
"extracted_count": len(all_results),
"processing_time": total_time
}
def save_results(results, table_name):
"""保存结果到数据库"""
try:
# 添加序号
numbered_results = []
for i, result in enumerate(results, 1):
numbered_results.append([i] + result)
headers = ["num", "id", "Call Sign", "Behavior", "Flight Level", "Location", "Time"]
write_to_table(numbered_results, headers, table_name)
print(f"💾 保存了 {len(numbered_results)} 条记录到 {table_name}")
except Exception as e:
print(f"❌ 保存失败: {e}")
def run_data_validation():
"""运行数据验证"""
try:
from extractor.data_examination_lyh import CallSignValidator, read_table, write_final_table, write_quarantine_table
import pandas as pd
validator = CallSignValidator()
input_df = read_table("precessed_table")
if input_df.empty:
print("❌ precessed_table 为空")
return
print(f"🔍 验证 {len(input_df)} 条AI提取结果...")
processed_data = []
for _, row in input_df.iterrows():
validation_row = {
'ID': row['id'],
'CallSignal': row['Call Sign'],
'Behavior': row['Behavior'],
'FlightLevel': row['Flight Level'],
'Location': row['Location'],
'Time': row['Time']
}
processed_result = validator.process_row(validation_row)
processed_data.append(processed_result)
result_df = pd.DataFrame(processed_data)
valid_df = result_df[result_df['is_valid'] == True].drop(columns=['is_valid'])
invalid_df = result_df[result_df['is_valid'] == False].drop(columns=['is_valid'])
if not valid_df.empty:
write_final_table(valid_df)
print(f"{len(valid_df)} 条有效数据写入 final_table")
if not invalid_df.empty:
write_quarantine_table(invalid_df)
print(f"⚠️ {len(invalid_df)} 条无效数据写入 quarantine_table")
print(f"🎯 验证完成: {len(valid_df)} 有效 + {len(invalid_df)} 无效")
except Exception as e:
print(f"❌ 数据验证失败: {e}")
if __name__ == "__main__":
# 询问用户是否确认运行(因为会花费很长时间)
print("⚠️ 警告: 完整处理769条数据预计需要50-60分钟")
print("🧪 建议先运行小批量测试")
choice = input("\n选择运行模式:\n1. 完整处理 (769条, ~60分钟)\n2. 小批量测试 (100条, ~10分钟)\n3. 退出\n请输入 (1/2/3): ")
if choice == "1":
print("🚀 开始完整处理...")
result = run_full_ai_processing()
print(f"🎉 最终结果: {result}")
elif choice == "2":
print("🧪 开始小批量测试...")
# 修改为只处理前100条
import extractor.infoextractor_czhwjq as processor
# 这里可以添加小批量测试逻辑
print("小批量测试需要单独实现")
else:
print("👋 退出")