|
|
|
@ -0,0 +1,531 @@
|
|
|
|
|
"""
|
|
|
|
|
改进版AI处理逻辑
|
|
|
|
|
解决数据质量问题,优化AI提取和验证流程
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import re
|
|
|
|
|
import json
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
|
|
from django.conf import settings
|
|
|
|
|
from django.db import connection
|
|
|
|
|
from openai import OpenAI
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
class ImprovedCallSignExtractor:
|
|
|
|
|
"""改进版AI呼号提取器"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.model = settings.AI_CONFIG['model_name']
|
|
|
|
|
self.client = OpenAI(
|
|
|
|
|
base_url=settings.AI_CONFIG['base_url'],
|
|
|
|
|
api_key=settings.AI_CONFIG['api_key']
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# 优化的AI提示模板
|
|
|
|
|
self.prompt_template = """You are an expert aviation communications analyst. Extract flight information from ATC radio communications with high accuracy.
|
|
|
|
|
|
|
|
|
|
EXTRACTION RULES:
|
|
|
|
|
1. CALL SIGN: Extract exact aircraft identifier (e.g., "CCA123", "United 456")
|
|
|
|
|
2. BEHAVIOR: Extract primary action using base verb form:
|
|
|
|
|
- "climbing" → "climb"
|
|
|
|
|
- "descending" → "descend"
|
|
|
|
|
- "turning" → "turn"
|
|
|
|
|
- "maintaining" → "maintain"
|
|
|
|
|
- "contact" → "contact"
|
|
|
|
|
- "cleared" → "clear"
|
|
|
|
|
3. FLIGHT LEVEL: Extract altitude as words (e.g., "FL350" → "three five zero")
|
|
|
|
|
4. LOCATION: Extract waypoints, airports, or navigation fixes
|
|
|
|
|
5. TIME: Extract any time references (UTC format preferred)
|
|
|
|
|
|
|
|
|
|
OUTPUT FORMAT: Return JSON array with objects containing:
|
|
|
|
|
{
|
|
|
|
|
"call_sign": "exact call sign",
|
|
|
|
|
"behavior": "normalized action",
|
|
|
|
|
"flight_level": "altitude in words or null",
|
|
|
|
|
"location": "position or null",
|
|
|
|
|
"time": "time or null"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
EXAMPLES:
|
|
|
|
|
Input: "CCA123 climb to flight level 350"
|
|
|
|
|
Output: [{"call_sign": "CCA123", "behavior": "climb", "flight_level": "three five zero", "location": null, "time": null}]
|
|
|
|
|
|
|
|
|
|
Input: "CSN456 descend to flight level 280"
|
|
|
|
|
Output: [{"call_sign": "CSN456", "behavior": "descend", "flight_level": "two eight zero", "location": null, "time": null}]
|
|
|
|
|
|
|
|
|
|
INPUT TEXT: {text}
|
|
|
|
|
|
|
|
|
|
Extract all flight communications from this text. Return only the JSON array."""
|
|
|
|
|
|
|
|
|
|
def extract_from_text(self, text: str) -> List[Dict]:
|
|
|
|
|
"""从文本中提取飞行信息"""
|
|
|
|
|
try:
|
|
|
|
|
prompt = self.prompt_template.format(text=text)
|
|
|
|
|
|
|
|
|
|
response = self.client.chat.completions.create(
|
|
|
|
|
model=self.model,
|
|
|
|
|
messages=[{"role": "user", "content": prompt}],
|
|
|
|
|
temperature=0.1, # 降低随机性
|
|
|
|
|
max_tokens=1000
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
result_text = response.choices[0].message.content.strip()
|
|
|
|
|
|
|
|
|
|
# 尝试解析JSON
|
|
|
|
|
try:
|
|
|
|
|
# 清理响应文本,移除可能的格式问题
|
|
|
|
|
cleaned_text = result_text.strip()
|
|
|
|
|
|
|
|
|
|
# 多种JSON提取策略
|
|
|
|
|
json_patterns = [
|
|
|
|
|
r'\[.*?\]', # 标准JSON数组
|
|
|
|
|
r'\{.*?\}', # 单个JSON对象
|
|
|
|
|
r'```json\s*(\[.*?\])\s*```', # Markdown代码块
|
|
|
|
|
r'```\s*(\[.*?\])\s*```' # 普通代码块
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
extracted_data = None
|
|
|
|
|
for pattern in json_patterns:
|
|
|
|
|
matches = re.findall(pattern, cleaned_text, re.DOTALL)
|
|
|
|
|
for match in matches:
|
|
|
|
|
try:
|
|
|
|
|
if isinstance(match, tuple):
|
|
|
|
|
match = match[0] if match else ""
|
|
|
|
|
|
|
|
|
|
# 尝试解析JSON
|
|
|
|
|
parsed = json.loads(match)
|
|
|
|
|
|
|
|
|
|
# 确保是列表格式
|
|
|
|
|
if isinstance(parsed, dict):
|
|
|
|
|
extracted_data = [parsed]
|
|
|
|
|
elif isinstance(parsed, list):
|
|
|
|
|
extracted_data = parsed
|
|
|
|
|
else:
|
|
|
|
|
continue
|
|
|
|
|
break
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if extracted_data:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if extracted_data:
|
|
|
|
|
return self._validate_extraction(extracted_data)
|
|
|
|
|
else:
|
|
|
|
|
logger.warning(f"未找到有效JSON格式: {result_text[:200]}...")
|
|
|
|
|
return self._fallback_extraction(text)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"JSON解析异常: {e}")
|
|
|
|
|
return self._fallback_extraction(text)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"AI提取失败: {e}")
|
|
|
|
|
return self._fallback_extraction(text)
|
|
|
|
|
|
|
|
|
|
def _validate_extraction(self, data: List[Dict]) -> List[Dict]:
|
|
|
|
|
"""验证提取的数据"""
|
|
|
|
|
validated = []
|
|
|
|
|
|
|
|
|
|
for item in data:
|
|
|
|
|
# 确保必要字段存在
|
|
|
|
|
call_sign = item.get('call_sign', '').strip()
|
|
|
|
|
behavior = item.get('behavior', '').strip()
|
|
|
|
|
|
|
|
|
|
if call_sign and behavior:
|
|
|
|
|
validated_item = {
|
|
|
|
|
'call_sign': self._normalize_call_sign(call_sign),
|
|
|
|
|
'behavior': self._normalize_behavior(behavior),
|
|
|
|
|
'flight_level': self._normalize_flight_level(item.get('flight_level')),
|
|
|
|
|
'location': self._normalize_location(item.get('location')),
|
|
|
|
|
'time': self._normalize_time(item.get('time'))
|
|
|
|
|
}
|
|
|
|
|
validated.append(validated_item)
|
|
|
|
|
|
|
|
|
|
return validated
|
|
|
|
|
|
|
|
|
|
def _normalize_call_sign(self, call_sign: str) -> str:
|
|
|
|
|
"""标准化呼号"""
|
|
|
|
|
# 移除多余空格,保持基本格式
|
|
|
|
|
return re.sub(r'\s+', ' ', call_sign.strip())
|
|
|
|
|
|
|
|
|
|
def _normalize_behavior(self, behavior: str) -> str:
|
|
|
|
|
"""标准化行为动词"""
|
|
|
|
|
behavior = behavior.lower().strip()
|
|
|
|
|
|
|
|
|
|
# 动词标准化映射
|
|
|
|
|
verb_map = {
|
|
|
|
|
'climbing': 'climb',
|
|
|
|
|
'descending': 'descend',
|
|
|
|
|
'turning': 'turn',
|
|
|
|
|
'maintaining': 'maintain',
|
|
|
|
|
'holding': 'hold',
|
|
|
|
|
'contacting': 'contact',
|
|
|
|
|
'cleared': 'clear',
|
|
|
|
|
'proceeding': 'proceed',
|
|
|
|
|
'approaching': 'approach',
|
|
|
|
|
'departing': 'depart'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return verb_map.get(behavior, behavior)
|
|
|
|
|
|
|
|
|
|
def _normalize_flight_level(self, flight_level) -> Optional[str]:
|
|
|
|
|
"""标准化飞行高度"""
|
|
|
|
|
if not flight_level or flight_level == 'null':
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
fl_str = str(flight_level).lower().strip()
|
|
|
|
|
|
|
|
|
|
# 提取数字
|
|
|
|
|
numbers = re.findall(r'\d+', fl_str)
|
|
|
|
|
if numbers:
|
|
|
|
|
# 转换为单词形式
|
|
|
|
|
num_str = numbers[0]
|
|
|
|
|
word_map = {
|
|
|
|
|
'0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four',
|
|
|
|
|
'5': 'five', '6': 'six', '7': 'seven', '8': 'eight', '9': 'nine'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
words = []
|
|
|
|
|
for digit in num_str:
|
|
|
|
|
words.append(word_map.get(digit, digit))
|
|
|
|
|
|
|
|
|
|
return ' '.join(words)
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _normalize_location(self, location) -> Optional[str]:
|
|
|
|
|
"""标准化位置"""
|
|
|
|
|
if not location or location == 'null':
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
loc_str = str(location).strip()
|
|
|
|
|
if len(loc_str) >= 3: # 有效的位置信息
|
|
|
|
|
return loc_str
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _normalize_time(self, time) -> Optional[str]:
|
|
|
|
|
"""标准化时间"""
|
|
|
|
|
if not time or time == 'null':
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
time_str = str(time).strip()
|
|
|
|
|
# 简单的时间格式验证
|
|
|
|
|
if re.match(r'\d{2}:\d{2}', time_str):
|
|
|
|
|
return time_str
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _fallback_extraction(self, text: str) -> List[Dict]:
|
|
|
|
|
"""备用提取方法(基于正则表达式)"""
|
|
|
|
|
logger.info("使用备用提取方法")
|
|
|
|
|
|
|
|
|
|
# 简单的呼号提取正则
|
|
|
|
|
call_sign_patterns = [
|
|
|
|
|
r'\b[A-Z]{2,3}\s?\d{1,4}[A-Z]?\b', # CCA123, CSN456
|
|
|
|
|
r'\b(?:Air\s+China|China\s+Southern|Hainan\s+Airlines)\s+\d+\b', # 航空公司名称+数字
|
|
|
|
|
r'\b\w+\s+\d{1,4}\b' # 简单格式
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
extracted = []
|
|
|
|
|
for pattern in call_sign_patterns:
|
|
|
|
|
matches = re.finditer(pattern, text, re.IGNORECASE)
|
|
|
|
|
for match in matches:
|
|
|
|
|
call_sign = match.group().strip()
|
|
|
|
|
|
|
|
|
|
# 尝试提取行为
|
|
|
|
|
behavior = self._extract_behavior_around(text, match.start(), match.end())
|
|
|
|
|
|
|
|
|
|
if call_sign and behavior:
|
|
|
|
|
extracted.append({
|
|
|
|
|
'call_sign': call_sign,
|
|
|
|
|
'behavior': behavior,
|
|
|
|
|
'flight_level': None,
|
|
|
|
|
'location': None,
|
|
|
|
|
'time': None
|
|
|
|
|
})
|
|
|
|
|
break # 每段文本只提取一个主要信息
|
|
|
|
|
|
|
|
|
|
return extracted
|
|
|
|
|
|
|
|
|
|
def _extract_behavior_around(self, text: str, start: int, end: int) -> str:
|
|
|
|
|
"""从呼号周围提取行为"""
|
|
|
|
|
# 获取呼号后的词汇
|
|
|
|
|
after_text = text[end:end+50].lower()
|
|
|
|
|
|
|
|
|
|
# 常见行为词汇
|
|
|
|
|
behaviors = ['climb', 'descend', 'turn', 'maintain', 'contact', 'clear', 'hold', 'approach']
|
|
|
|
|
|
|
|
|
|
for behavior in behaviors:
|
|
|
|
|
if behavior in after_text:
|
|
|
|
|
return behavior
|
|
|
|
|
|
|
|
|
|
return 'unknown'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImprovedCallSignValidator:
|
|
|
|
|
"""改进版呼号验证器"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.airline_data = self._load_airline_data()
|
|
|
|
|
self.phonetic_map = self._init_phonetic_map()
|
|
|
|
|
|
|
|
|
|
def _load_airline_data(self) -> pd.DataFrame:
|
|
|
|
|
"""加载航空公司数据"""
|
|
|
|
|
try:
|
|
|
|
|
with connection.cursor() as cursor:
|
|
|
|
|
cursor.execute("SELECT * FROM air_company")
|
|
|
|
|
columns = [desc[0] for desc in cursor.description]
|
|
|
|
|
data = cursor.fetchall()
|
|
|
|
|
|
|
|
|
|
return pd.DataFrame(data, columns=columns)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"加载航空公司数据失败: {e}")
|
|
|
|
|
return pd.DataFrame()
|
|
|
|
|
|
|
|
|
|
def _init_phonetic_map(self) -> Dict[str, str]:
|
|
|
|
|
"""初始化语音字母映射"""
|
|
|
|
|
return {
|
|
|
|
|
'alpha': 'A', 'bravo': 'B', 'charlie': 'C', 'delta': 'D',
|
|
|
|
|
'echo': 'E', 'foxtrot': 'F', 'golf': 'G', 'hotel': 'H',
|
|
|
|
|
'india': 'I', 'juliet': 'J', 'kilo': 'K', 'lima': 'L',
|
|
|
|
|
'mike': 'M', 'november': 'N', 'oscar': 'O', 'papa': 'P',
|
|
|
|
|
'quebec': 'Q', 'romeo': 'R', 'sierra': 'S', 'tango': 'T',
|
|
|
|
|
'uniform': 'U', 'victor': 'V', 'whiskey': 'W', 'xray': 'X',
|
|
|
|
|
'yankee': 'Y', 'zulu': 'Z'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def validate_call_sign(self, call_sign: str) -> Tuple[bool, str]:
|
|
|
|
|
"""验证呼号格式"""
|
|
|
|
|
if not call_sign:
|
|
|
|
|
return False, "呼号为空"
|
|
|
|
|
|
|
|
|
|
# 基本格式检查 - 放宽要求
|
|
|
|
|
if len(call_sign) < 3:
|
|
|
|
|
return False, "呼号过短"
|
|
|
|
|
|
|
|
|
|
# 检查是否包含字母和数字
|
|
|
|
|
has_letter = bool(re.search(r'[A-Za-z]', call_sign))
|
|
|
|
|
has_number = bool(re.search(r'\d', call_sign))
|
|
|
|
|
|
|
|
|
|
if not (has_letter and has_number):
|
|
|
|
|
return False, "呼号格式不标准"
|
|
|
|
|
|
|
|
|
|
return True, "有效呼号"
|
|
|
|
|
|
|
|
|
|
def process_extracted_data(self, extracted_list: List[Dict], original_id: str) -> List[Dict]:
|
|
|
|
|
"""处理提取的数据"""
|
|
|
|
|
processed = []
|
|
|
|
|
|
|
|
|
|
for item in extracted_list:
|
|
|
|
|
call_sign = item.get('call_sign', '')
|
|
|
|
|
is_valid, reason = self.validate_call_sign(call_sign)
|
|
|
|
|
|
|
|
|
|
processed_item = {
|
|
|
|
|
'id': original_id,
|
|
|
|
|
'Call Sign': call_sign,
|
|
|
|
|
'Behavior': item.get('behavior', ''),
|
|
|
|
|
'Flight Level': item.get('flight_level', ''),
|
|
|
|
|
'Location': item.get('location', ''),
|
|
|
|
|
'Time': item.get('time', ''),
|
|
|
|
|
'is_valid': is_valid,
|
|
|
|
|
'validation_reason': reason
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
processed.append(processed_item)
|
|
|
|
|
|
|
|
|
|
return processed
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def improved_process_data():
|
|
|
|
|
"""改进版数据处理主函数"""
|
|
|
|
|
logger.info("开始改进版数据处理流程...")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# 读取原始数据
|
|
|
|
|
with connection.cursor() as cursor:
|
|
|
|
|
cursor.execute("SELECT id, text FROM prewashed_table")
|
|
|
|
|
raw_data = cursor.fetchall()
|
|
|
|
|
|
|
|
|
|
if not raw_data:
|
|
|
|
|
return {
|
|
|
|
|
"status": "error",
|
|
|
|
|
"message": "prewashed_table中没有数据,请先上传文件"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(f"读取到 {len(raw_data)} 条原始数据")
|
|
|
|
|
|
|
|
|
|
# 初始化提取器和验证器
|
|
|
|
|
extractor = ImprovedCallSignExtractor()
|
|
|
|
|
validator = ImprovedCallSignValidator()
|
|
|
|
|
|
|
|
|
|
all_processed = []
|
|
|
|
|
extraction_count = 0
|
|
|
|
|
|
|
|
|
|
# 逐条处理数据
|
|
|
|
|
for original_id, text in raw_data:
|
|
|
|
|
try:
|
|
|
|
|
# AI提取
|
|
|
|
|
extracted_list = extractor.extract_from_text(text)
|
|
|
|
|
extraction_count += len(extracted_list)
|
|
|
|
|
|
|
|
|
|
# 验证处理
|
|
|
|
|
processed_list = validator.process_extracted_data(extracted_list, original_id)
|
|
|
|
|
all_processed.extend(processed_list)
|
|
|
|
|
|
|
|
|
|
logger.debug(f"处理 {original_id}: 提取 {len(extracted_list)} 条,处理 {len(processed_list)} 条")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"处理 {original_id} 时出错: {e}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if not all_processed:
|
|
|
|
|
return {
|
|
|
|
|
"status": "error",
|
|
|
|
|
"message": "AI提取未产生有效数据"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# 转换为DataFrame进行后续处理
|
|
|
|
|
df = pd.DataFrame(all_processed)
|
|
|
|
|
|
|
|
|
|
# 分离有效和无效数据
|
|
|
|
|
valid_df = df[df['is_valid'] == True].drop(columns=['is_valid', 'validation_reason'])
|
|
|
|
|
invalid_df = df[df['is_valid'] == False].drop(columns=['is_valid'])
|
|
|
|
|
|
|
|
|
|
# 写入数据库
|
|
|
|
|
processed_count = len(df)
|
|
|
|
|
valid_count = len(valid_df)
|
|
|
|
|
invalid_count = len(invalid_df)
|
|
|
|
|
|
|
|
|
|
# 写入处理后数据表(包含所有提取结果)
|
|
|
|
|
_write_processed_data(df)
|
|
|
|
|
|
|
|
|
|
# 写入最终表
|
|
|
|
|
if not valid_df.empty:
|
|
|
|
|
_write_final_data(valid_df)
|
|
|
|
|
|
|
|
|
|
if not invalid_df.empty:
|
|
|
|
|
_write_quarantine_data(invalid_df)
|
|
|
|
|
|
|
|
|
|
logger.info(f"处理完成: 总计 {processed_count} 条,有效 {valid_count} 条,无效 {invalid_count} 条")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"status": "success",
|
|
|
|
|
"original_count": len(raw_data),
|
|
|
|
|
"extracted_count": extraction_count,
|
|
|
|
|
"processed_count": processed_count,
|
|
|
|
|
"valid_count": valid_count,
|
|
|
|
|
"invalid_count": invalid_count,
|
|
|
|
|
"extraction_rate": round(extraction_count / len(raw_data) * 100, 2) if len(raw_data) > 0 else 0,
|
|
|
|
|
"validation_rate": round(valid_count / processed_count * 100, 2) if processed_count > 0 else 0,
|
|
|
|
|
"message": f"AI信息抽取和验证完成:从 {len(raw_data)} 条原始记录中提取 {extraction_count} 个结果,验证通过 {valid_count} 条"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"数据处理失败: {e}")
|
|
|
|
|
return {
|
|
|
|
|
"status": "error",
|
|
|
|
|
"message": f"数据处理过程中发生错误: {str(e)}"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _write_processed_data(df: pd.DataFrame):
|
|
|
|
|
"""写入处理后数据表"""
|
|
|
|
|
try:
|
|
|
|
|
with connection.cursor() as cursor:
|
|
|
|
|
# 清空并重建表
|
|
|
|
|
cursor.execute("DROP TABLE IF EXISTS processed_table")
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
CREATE TABLE processed_table (
|
|
|
|
|
num INT AUTO_INCREMENT PRIMARY KEY,
|
|
|
|
|
id VARCHAR(50),
|
|
|
|
|
`Call Sign` VARCHAR(100),
|
|
|
|
|
Behavior VARCHAR(50),
|
|
|
|
|
`Flight Level` VARCHAR(50),
|
|
|
|
|
Location VARCHAR(100),
|
|
|
|
|
Time VARCHAR(50)
|
|
|
|
|
)
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
# 插入数据
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
INSERT INTO processed_table (id, `Call Sign`, Behavior, `Flight Level`, Location, Time)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
|
|
""", (
|
|
|
|
|
row['id'],
|
|
|
|
|
row['Call Sign'],
|
|
|
|
|
row['Behavior'],
|
|
|
|
|
row['Flight Level'] or None,
|
|
|
|
|
row['Location'] or None,
|
|
|
|
|
row['Time'] or None
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
logger.info(f"写入 processed_table: {len(df)} 条记录")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"写入处理后数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _write_final_data(df: pd.DataFrame):
|
|
|
|
|
"""写入最终有效数据表"""
|
|
|
|
|
try:
|
|
|
|
|
with connection.cursor() as cursor:
|
|
|
|
|
# 清空现有数据
|
|
|
|
|
cursor.execute("DELETE FROM final_table")
|
|
|
|
|
|
|
|
|
|
# 插入有效数据
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
INSERT INTO final_table (id, `Call Sign`, Behavior, `Flight Level`, Location, Time)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
|
|
""", (
|
|
|
|
|
row['id'],
|
|
|
|
|
row['Call Sign'],
|
|
|
|
|
row['Behavior'],
|
|
|
|
|
row['Flight Level'] or None,
|
|
|
|
|
row['Location'] or None,
|
|
|
|
|
row['Time'] or None
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
logger.info(f"写入 final_table: {len(df)} 条记录")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"写入最终数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _write_quarantine_data(df: pd.DataFrame):
|
|
|
|
|
"""写入隔离数据表"""
|
|
|
|
|
try:
|
|
|
|
|
with connection.cursor() as cursor:
|
|
|
|
|
# 清空现有数据
|
|
|
|
|
cursor.execute("DELETE FROM quarantine_table")
|
|
|
|
|
|
|
|
|
|
# 插入无效数据
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
INSERT INTO quarantine_table (id, `Call Sign`, Behavior, `Flight Level`, Location, Time)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
|
|
""", (
|
|
|
|
|
row['id'],
|
|
|
|
|
row['Call Sign'],
|
|
|
|
|
row['Behavior'],
|
|
|
|
|
row['Flight Level'] or None,
|
|
|
|
|
row['Location'] or None,
|
|
|
|
|
row['Time'] or None
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
logger.info(f"写入 quarantine_table: {len(df)} 条记录")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"写入隔离数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 为了向后兼容,提供旧接口
|
|
|
|
|
def process_data():
|
|
|
|
|
"""向后兼容的接口"""
|
|
|
|
|
return improved_process_data()
|