You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
195 lines
7.3 KiB
195 lines
7.3 KiB
#!/usr/bin/env python
|
|
"""
|
|
批量AI处理脚本 - 优化版本
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import django
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
|
|
|
|
# 设置Django环境
|
|
sys.path.append('/home/hzk/项目/moxun-1/信息抽取+数据检验/Django123/atc_extractor/backend')
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
|
|
django.setup()
|
|
|
|
from extractor.infoextractor_czhwjq import CallSignExtractor, read_from_table, write_to_table, parse_input
|
|
|
|
def process_batch(extractor, batch_data, batch_id):
|
|
"""处理一批记录"""
|
|
print(f"🔄 处理批次 {batch_id}")
|
|
batch_results = []
|
|
|
|
for i, (raw_id, raw_text) in enumerate(batch_data):
|
|
try:
|
|
result = extractor.extract_call_signs(raw_id, raw_text)
|
|
if result:
|
|
result_list = parse_input(result)
|
|
batch_results.extend(result_list)
|
|
|
|
if (i + 1) % 5 == 0:
|
|
print(f" 批次 {batch_id}: 已处理 {i+1}/{len(batch_data)} 条")
|
|
|
|
except Exception as e:
|
|
print(f" ❌ 批次 {batch_id} 第 {i+1} 条失败: {e}")
|
|
continue
|
|
|
|
print(f"✅ 批次 {batch_id} 完成: {len(batch_results)} 条结果")
|
|
return batch_results
|
|
|
|
def run_full_ai_processing():
|
|
"""运行完整的AI处理"""
|
|
print("🚀 开始完整AI数据处理...")
|
|
|
|
# 1. 读取所有数据
|
|
print("1. 读取所有数据...")
|
|
id_data = read_from_table("prewashed_table", "id")
|
|
text_data = read_from_table("prewashed_table", "text")
|
|
data = [(id_data[i], text_data[i]) for i in range(len(id_data))]
|
|
|
|
if len(data) == 0:
|
|
print("❌ 没有数据")
|
|
return
|
|
|
|
print(f"📊 总数据: {len(data)} 条")
|
|
|
|
# 2. 分批处理 (每批50条)
|
|
batch_size = 50
|
|
batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
|
|
print(f"📦 分成 {len(batches)} 个批次,每批最多 {batch_size} 条")
|
|
|
|
# 3. 初始化
|
|
extractor = CallSignExtractor()
|
|
all_results = []
|
|
start_time = time.time()
|
|
|
|
# 4. 逐批处理 (避免并发太多导致API限制)
|
|
for batch_id, batch in enumerate(batches, 1):
|
|
try:
|
|
batch_results = process_batch(extractor, batch, batch_id)
|
|
all_results.extend(batch_results)
|
|
|
|
# 显示进度
|
|
processed_count = batch_id * batch_size
|
|
if processed_count > len(data):
|
|
processed_count = len(data)
|
|
|
|
elapsed_time = time.time() - start_time
|
|
avg_time = elapsed_time / processed_count
|
|
remaining_time = avg_time * (len(data) - processed_count)
|
|
|
|
print(f"📈 进度: {processed_count}/{len(data)} ({processed_count/len(data)*100:.1f}%)")
|
|
print(f"⏱️ 已用时: {elapsed_time:.1f}s, 预计剩余: {remaining_time:.1f}s")
|
|
print(f"🎯 当前提取: {len(all_results)} 条")
|
|
print("-" * 50)
|
|
|
|
# 每处理几批就保存一次(防止崩溃丢失数据)
|
|
if batch_id % 5 == 0:
|
|
print("💾 中间保存...")
|
|
save_results(all_results, f"temp_batch_{batch_id}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ 批次 {batch_id} 处理失败: {e}")
|
|
continue
|
|
|
|
# 5. 最终保存
|
|
total_time = time.time() - start_time
|
|
print(f"\n🎉 AI处理完成!")
|
|
print(f"⏱️ 总时间: {total_time:.1f} 秒 ({total_time/60:.1f} 分钟)")
|
|
print(f"📊 原始数据: {len(data)} 条")
|
|
print(f"🎯 提取结果: {len(all_results)} 条")
|
|
print(f"📈 平均提取率: {len(all_results)/len(data)*100:.1f}%")
|
|
|
|
if all_results:
|
|
save_results(all_results, "precessed_table")
|
|
print("✅ 结果已保存到 precessed_table")
|
|
|
|
# 继续数据验证
|
|
print("\n🔍 开始数据验证...")
|
|
run_data_validation()
|
|
|
|
return {
|
|
"original_count": len(data),
|
|
"extracted_count": len(all_results),
|
|
"processing_time": total_time
|
|
}
|
|
|
|
def save_results(results, table_name):
|
|
"""保存结果到数据库"""
|
|
try:
|
|
# 添加序号
|
|
numbered_results = []
|
|
for i, result in enumerate(results, 1):
|
|
numbered_results.append([i] + result)
|
|
|
|
headers = ["num", "id", "Call Sign", "Behavior", "Flight Level", "Location", "Time"]
|
|
write_to_table(numbered_results, headers, table_name)
|
|
print(f"💾 保存了 {len(numbered_results)} 条记录到 {table_name}")
|
|
except Exception as e:
|
|
print(f"❌ 保存失败: {e}")
|
|
|
|
def run_data_validation():
|
|
"""运行数据验证"""
|
|
try:
|
|
from extractor.data_examination_lyh import CallSignValidator, read_table, write_final_table, write_quarantine_table
|
|
import pandas as pd
|
|
|
|
validator = CallSignValidator()
|
|
input_df = read_table("precessed_table")
|
|
|
|
if input_df.empty:
|
|
print("❌ precessed_table 为空")
|
|
return
|
|
|
|
print(f"🔍 验证 {len(input_df)} 条AI提取结果...")
|
|
|
|
processed_data = []
|
|
for _, row in input_df.iterrows():
|
|
validation_row = {
|
|
'ID': row['id'],
|
|
'CallSignal': row['Call Sign'],
|
|
'Behavior': row['Behavior'],
|
|
'FlightLevel': row['Flight Level'],
|
|
'Location': row['Location'],
|
|
'Time': row['Time']
|
|
}
|
|
processed_result = validator.process_row(validation_row)
|
|
processed_data.append(processed_result)
|
|
|
|
result_df = pd.DataFrame(processed_data)
|
|
valid_df = result_df[result_df['is_valid'] == True].drop(columns=['is_valid'])
|
|
invalid_df = result_df[result_df['is_valid'] == False].drop(columns=['is_valid'])
|
|
|
|
if not valid_df.empty:
|
|
write_final_table(valid_df)
|
|
print(f"✅ {len(valid_df)} 条有效数据写入 final_table")
|
|
|
|
if not invalid_df.empty:
|
|
write_quarantine_table(invalid_df)
|
|
print(f"⚠️ {len(invalid_df)} 条无效数据写入 quarantine_table")
|
|
|
|
print(f"🎯 验证完成: {len(valid_df)} 有效 + {len(invalid_df)} 无效")
|
|
|
|
except Exception as e:
|
|
print(f"❌ 数据验证失败: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
# 询问用户是否确认运行(因为会花费很长时间)
|
|
print("⚠️ 警告: 完整处理769条数据预计需要50-60分钟")
|
|
print("🧪 建议先运行小批量测试")
|
|
|
|
choice = input("\n选择运行模式:\n1. 完整处理 (769条, ~60分钟)\n2. 小批量测试 (100条, ~10分钟)\n3. 退出\n请输入 (1/2/3): ")
|
|
|
|
if choice == "1":
|
|
print("🚀 开始完整处理...")
|
|
result = run_full_ai_processing()
|
|
print(f"🎉 最终结果: {result}")
|
|
elif choice == "2":
|
|
print("🧪 开始小批量测试...")
|
|
# 修改为只处理前100条
|
|
import extractor.infoextractor_czhwjq as processor
|
|
# 这里可以添加小批量测试逻辑
|
|
print("小批量测试需要单独实现")
|
|
else:
|
|
print("👋 退出") |