From 7e7e11970b3413be9b98cd4ff7c957ed95fda383 Mon Sep 17 00:00:00 2001 From: p5rqvkz4o <493566904@qq.com> Date: Thu, 15 May 2025 20:24:52 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E3=80=81=E5=A4=84=E7=90=86=E3=80=82=E8=A6=81=E6=B1=82=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E4=B8=AD=E5=AD=98=E5=9C=A8only=5Ftext?= =?UTF-8?q?=E8=A1=A8=EF=BC=88=E5=8F=AF=E8=87=AA=E8=A1=8C=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E5=8F=82=E6=95=B0=EF=BC=89=EF=BC=8C=E5=8D=B3=E5=8F=AA=E5=8C=85?= =?UTF-8?q?=E5=90=ABid=E5=92=8Ctext=E7=9A=84=E5=A4=84=E7=90=86=E5=90=8E?= =?UTF-8?q?=E8=A1=A8=E6=A0=BC=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Code_Reticent/CallSign.py | 234 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 234 insertions(+) create mode 100644 Code_Reticent/CallSign.py diff --git a/Code_Reticent/CallSign.py b/Code_Reticent/CallSign.py new file mode 100644 index 0000000..ae9e441 --- /dev/null +++ b/Code_Reticent/CallSign.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed May 14 08:06:07 2025 + +@author: 缄默 +""" + +import ollama +import concurrent.futures +import json +from typing import Dict, Any, Tuple +import csv +import re + +import pymysql + +# ==================== 数据库 ==================== +# 数据库配置(需根据实际环境修改) +DB_CONFIG = { + 'host': 'localhost', + 'user': 'root', + 'password': '111111', + 'database': 'atc', + 'charset': 'utf8mb4' +} + +# 读取函数实现 +def read_from_table(table_name, key_name): + """ + 读取指定表的指定列数据 + :param table_name: 目标表名 + :param key_name: 目标字段名 + :return: 包含所有字段值的列表 + """ + try: + conn = pymysql.connect(**DB_CONFIG) + cursor = conn.cursor() + + # 使用反引号包裹防止SQL注入 + query = f"SELECT `{key_name}` FROM `{table_name}`" + cursor.execute(query) + + # 提取结果并转换为列表 + return [row[0] for row in cursor.fetchall()] + except Exception as e: + print(f"Database error: {str(e)}") + return [] + finally: + if 'conn' in locals() and conn.open: + cursor.close() + conn.close() + +# 写入函数实现 +def write_to_table(data, headers, table_name): + """ + 写入数据到指定表(自动建表) + :param data: 二维数据列表 + :param headers: 表头列表 + :param table_name: 目标表名 + """ + try: + conn = pymysql.connect(**DB_CONFIG) + cursor = conn.cursor() + + # 自动建表逻辑 + columns = [f"`{col}` TEXT" for col in headers] + create_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({','.join(columns)})" + cursor.execute(create_sql) + + # 数据预处理(填充None) + processed_data = [] + for row in data: + extended_row = list(row) + [None] * (len(headers) - len(row)) + processed_data.append(extended_row) + + # 构造插入语句 + fields = ', '.join([f'`{h}`' for h in headers]) + placeholders = ', '.join(['%s'] * len(headers)) + insert_sql = f"INSERT INTO `{table_name}` ({fields}) VALUES ({placeholders})" + + # 批量插入数据 + cursor.executemany(insert_sql, processed_data) + conn.commit() + except Exception as e: + print(f"Database error: {str(e)}") + conn.rollback() + finally: + if 'conn' in locals() and conn.open: + cursor.close() + conn.close() + + +HANDER = ["ID","CallSignal","Behavior","FlightLevel","Location","Time"] +# ==================== 读入文件/处理 ==================== +def read_csv_to_array(file_path): + with open(file_path, 'r', newline='', encoding='utf-8') as file: + csv_reader = csv.reader(file) + data = list(csv_reader) + return data +def extract_result(text): + # 匹配 "result:{...}" 格式的部分 + match = re.search(r'result:(\s*\{.*?\})', text, re.DOTALL) + if not match: + return "" + + json_str = match.group(1).strip() + + return json_str +def parse_input(s): + # 去掉最外层的大括号 + s = s.strip('{}') + # 去掉首尾的方括号 + if len(s) >= 2 and s[0] == '[' and s[-1] == ']': + s = s[1:-1] + # 分割成各个行 + rows = s.split('],[') + # 分割每行的字段并生成二维列表 + return [row.split(',') for row in rows] +# ==================== 核心处理模块 ==================== +class CallSignExtractor: + def __init__(self, model1="deepseek-r1:8b", model2="qwen2"): + self.model1 = model1 + self.model2 = model2 + self.prompt_template = """As an aviation communications analyst, extract all flight communications data from the following ATC dialogue following these strict rules: +Data Extraction Requirements: +Identify all aircraft call signs (e.g. “Lufthansa Seven Thirty Nine”) using standard aviation patterns. +Extraction is performed for each occurrence of the callsign: +Behavior: action phrase following the callsign (climb/descend/left turn/pointing/hold, etc.) +Flight level: number extracted from the phrase “FL310”/“level three zero zero zero”,only output number,such as "three zero zero zero" (NULL if not present) +Position: NAV waypoint (3-5 letter code) or airport name (NULL if none) +Time: UTC timestamp (if explicitly mentioned) (NULL if not present) + +Format Rules: +Multiple entries for same callsign must be listed separately +Output must be ordered: [id,callsign,behavior,flight_level,location,time] +Use NULL for missing information +Strict CSV format inside result brackets with NO additional text + +Processing Logic: +Analyze context before/after callsign mentions +Preserve original callsign wording exactly +Treat multi-line interactions as single context when connected + +Example Input: +"APP-pZiJZT", "AT: lufthansa two juliett whiskey climb FL250\nPI: turn left direct kolad lufthansa two juliett whiskey" + +Example Output: +result:{{[APP-pZiJZT,lufthansa two juliett whiskey,climbing,two five zero,NULL,NULL],[APP-pZiJZT,lufthansa two juliett whiskey,turn left,NULL,kolad,NULL]}}/result + +Process this transmission: +ID:{cid} +Text:{ctext}""" + + def _call_model(self, model_name: str, prompt: str) -> str: + """调用Ollama模型并返回响应""" + try: + response = ollama.generate(model=model_name, prompt=prompt) + rs = extract_result(response["response"]) + #print(rs,type(rs)) + return rs + except Exception as e: + print(f"模型调用错误 ({model_name}): {e}") + return "" + + def _dual_model_inference(self, prompt: str) -> Tuple[str, str]: + """并行执行双模型推理""" + with concurrent.futures.ThreadPoolExecutor() as executor: + future1 = executor.submit(self._call_model, self.model1, prompt) + future2 = executor.submit(self._call_model, self.model2, prompt) + return future1.result(), future2.result() + + def _model_inference(self, prompt: str) -> Tuple[str, str]: + """并行执行双模型推理""" + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(self._call_model, self.model2, prompt) + return future.result() + + def _merge_results(self, result1: str, result2: str) -> Dict[str, str]: + """智能融合双模型结果""" + try: + json1 = json.loads(result1) if result1.strip() else {} + except json.JSONDecodeError: + json1 = {} + + try: + json2 = json.loads(result2) if result2.strip() else {} + except json.JSONDecodeError: + json2 = {} + + # 合并策略:保留所有识别结果,冲突时优先模型1 + merged = {**json1, **json2} # 后者覆盖前者 + return merged + + def extract_call_signs_DoubleModel(self, cid: str,ctext: str) -> Dict[str, str]: + """执行完整提取流程""" + prompt = self.prompt_template.format(cid=cid,ctext=ctext) + result1, result2 = self._dual_model_inference(prompt) + return self._merge_results(result1, result2) + def extract_call_signs(self, cid: str,ctext: str) -> Dict[str, str]: + """执行完整提取流程""" + prompt = self.prompt_template.format(cid=cid,ctext=ctext) + result = self._model_inference(prompt) + return result + + + +# ==================== 主执行流程 ==================== +def main(): + # 示例输入数据 + #input_data = read_csv_to_array(r'./output_lite.csv') + + # 预处理 + id_data = read_from_table("only_text", "id") + text_data = read_from_table("only_text", "text") + data = [[id_data[i],text_data[i]] for i in range(len(id_data))] + #print(raw_text) #把id去掉,只返回text字段 + # 初始化提取器 + extractor = CallSignExtractor() + all_result = [] + # 执行提取 + for i in range(len(data)): + raw_id,raw_text = data[i][0],data[i][1] + #result = extractor.extract_call_signs_DoubleModel(raw_id,raw_text) + result = extractor.extract_call_signs(raw_id,raw_text) + # 格式化输出 + result_list = parse_input(result) + print(result_list) + for row in result_list: + all_result.append(row) + break; + write_to_table(all_result,HANDER, "CsExtraction") + +if __name__ == "__main__": + main() \ No newline at end of file