import os import zipfile import tempfile from typing import Union, List, Tuple class FileParser: @staticmethod def parse_file(file_path: str) -> str: """解析文件并返回文本内容""" # 验证文件路径 if not FileParser.validate_file_path(file_path): raise ValueError(f"Invalid file path: {file_path}") # 获取文件扩展名 _, ext = os.path.splitext(file_path) ext = ext.lower() # 根据扩展名调用对应的解析函数 try: if ext == '.txt': return FileParser.parse_txt(file_path) elif ext == '.docx': return FileParser.parse_docx(file_path) elif ext == '.pdf': return FileParser.parse_pdf(file_path) elif ext == '.html': return FileParser.parse_html(file_path) else: raise ValueError(f"Unsupported file format: {ext}") except Exception as e: # 统一异常处理 raise Exception(f"Error parsing file {file_path}: {str(e)}") @staticmethod def parse_and_convert_to_txt(file_path: str, output_dir: str = None) -> dict: """ 解析文件并转换为txt格式,保留图片和分段 Args: file_path: 输入文件路径 output_dir: 输出目录,如果为None则使用临时目录 Returns: dict: 包含转换结果的信息 - 'txt_path': 生成的临时txt文件路径 - 'images': 提取的图片列表 [(文件名, 二进制数据), ...] - 'content': 转换后的文本内容 - 'success': 是否成功 - 'error': 错误信息(如果有) """ try: # 验证输入文件 if not FileParser.validate_file_path(file_path): return { 'success': False, 'error': f"Invalid file path: {file_path}" } # 使用临时文件而不是永久文件 import tempfile # 获取文件扩展名 _, ext = os.path.splitext(file_path) ext = ext.lower() # 提取文本内容 content = "" images = [] if ext == '.txt': # TXT文件:直接读取内容 content = FileParser.parse_txt(file_path) images = [] # TXT文件没有图片 elif ext == '.docx': # DOCX文件:提取文本和图片 content = FileParser.parse_docx(file_path) images = FileParser.extract_images_from_docx(file_path) elif ext == '.pdf': # PDF文件:提取文本(图片处理较复杂,暂时只提取文本) content = FileParser.parse_pdf(file_path) images = [] # PDF图片提取较复杂,暂时跳过 elif ext == '.html': # HTML文件:提取文本内容 content = FileParser.parse_html(file_path) images = [] # HTML图片提取较复杂,暂时跳过 else: return { 'success': False, 'error': f"Unsupported file format: {ext}" } # 创建临时文件而不是永久文件 base_name = os.path.splitext(os.path.basename(file_path))[0] # 创建临时txt文件,程序结束时会被自动清理 with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', suffix=f'_{base_name}_converted.txt', delete=False) as temp_file: temp_file.write(content) txt_path = temp_file.name return { 'success': True, 'txt_path': txt_path, 'images': images, 'content': content, 'original_ext': ext, 'is_temp_file': True # 标记这是临时文件 } except Exception as e: return { 'success': False, 'error': str(e) } @staticmethod def parse_txt(file_path: str) -> str: """解析TXT文件""" # 验证文件路径 if not FileParser.validate_file_path(file_path): raise ValueError(f"Invalid file path: {file_path}") # 尝试多种编码方式读取文件 encoding = FileParser.detect_file_encoding(file_path) # 读取文件内容 with open(file_path, 'r', encoding=encoding, errors='ignore') as f: content = f.read() # 统一换行符为\n content = content.replace('\r\n', '\n').replace('\r', '\n') return content @staticmethod def detect_file_encoding(file_path: str) -> str: """检测文件编码""" # 首先尝试UTF-8 try: with open(file_path, 'r', encoding='utf-8') as f: f.read() return 'utf-8' except UnicodeDecodeError: pass # 尝试GBK(中文Windows常用) try: with open(file_path, 'r', encoding='gbk') as f: f.read() return 'gbk' except UnicodeDecodeError: pass # 尝试GB2312 try: with open(file_path, 'r', encoding='gb2312') as f: f.read() return 'gb2312' except UnicodeDecodeError: pass # 尝试使用chardet(如果可用) try: import chardet with open(file_path, 'rb') as f: raw_data = f.read(1024) result = chardet.detect(raw_data) if result and result['encoding']: return result['encoding'] except ImportError: pass # 默认返回UTF-8 return 'utf-8' @staticmethod def extract_images_from_docx(file_path: str) -> List[Tuple[str, bytes]]: """从Word文档中提取图片 Args: file_path: Word文档路径 Returns: 图片列表,每个元素为(图片文件名, 图片二进制数据)的元组 """ if not FileParser.validate_file_path(file_path): raise ValueError(f"Invalid file path: {file_path}") images = [] try: # Word文档实际上是ZIP文件,可以直接解压 with zipfile.ZipFile(file_path, 'r') as zip_file: # 遍历ZIP文件中的所有文件 for file_info in zip_file.filelist: file_name = file_info.filename # Word文档中的图片通常存储在word/media/目录下 if file_name.startswith('word/media/') and file_info.file_size > 0: # 读取图片数据 image_data = zip_file.read(file_name) # 获取图片扩展名 image_ext = os.path.splitext(file_name)[1].lower() if image_ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']: # 保存图片信息 base_name = os.path.basename(file_name) images.append((base_name, image_data)) return images except Exception as e: raise Exception(f"Error extracting images from docx file {file_path}: {str(e)}") @staticmethod def parse_docx(file_path: str) -> str: """解析DOCX文件,保留段落结构""" # 验证文件路径 if not FileParser.validate_file_path(file_path): raise ValueError(f"Invalid file path: {file_path}") # 尝试导入python-docx库 try: from docx import Document except ImportError: raise ImportError("python-docx library is required for parsing .docx files. Please install it using 'pip install python-docx'") # 打开并解析docx文件 try: doc = Document(file_path) # 提取所有段落文本,保留空行以保持格式 paragraphs = [] for paragraph in doc.paragraphs: text = paragraph.text.strip() if text: # 非空段落 paragraphs.append(paragraph.text) else: # 空段落,用空行表示 paragraphs.append("") # 用换行符连接所有段落,保留空行 content = '\n'.join(paragraphs) return content except Exception as e: raise Exception(f"Error parsing docx file {file_path}: {str(e)}") @staticmethod def parse_pdf(file_path: str) -> str: """解析PDF文件,保留段落结构""" # 验证文件路径 if not FileParser.validate_file_path(file_path): raise ValueError(f"Invalid file path: {file_path}") # 尝试导入PyPDF2库 try: import PyPDF2 except ImportError: raise ImportError("PyPDF2 library is required for parsing .pdf files. Please install it using 'pip install PyPDF2'") # 打开并解析pdf文件 try: content = "" with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) # 提取每一页的文本 for i, page in enumerate(pdf_reader.pages): page_text = page.extract_text() if page_text: content += page_text # 在页面之间添加空行分隔 if i < len(pdf_reader.pages) - 1: content += "\n\n" return content except Exception as e: raise Exception(f"Error parsing pdf file {file_path}: {str(e)}") @staticmethod def parse_html(file_path: str) -> str: """解析HTML文件,提取文本内容""" # 验证文件路径 if not FileParser.validate_file_path(file_path): raise ValueError(f"Invalid file path: {file_path}") try: from bs4 import BeautifulSoup except ImportError: raise ImportError("BeautifulSoup4 library is required for parsing .html files. Please install it using 'pip install beautifulsoup4'") try: # 检测文件编码 encoding = FileParser.detect_file_encoding(file_path) # 读取HTML文件 with open(file_path, 'r', encoding=encoding, errors='ignore') as f: html_content = f.read() # 使用BeautifulSoup解析HTML soup = BeautifulSoup(html_content, 'html.parser') # 移除script和style标签 for script in soup(["script", "style"]): script.decompose() # 提取文本内容 text = soup.get_text() # 清理多余的空白字符 lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) return text except Exception as e: raise Exception(f"Error parsing html file {file_path}: {str(e)}") @staticmethod def validate_file_path(file_path: str) -> bool: """验证文件路径是否有效""" # 检查文件是否存在 if not os.path.exists(file_path): return False # 检查是否为文件(而非目录) if not os.path.isfile(file_path): return False # 检查文件是否可读 if not os.access(file_path, os.R_OK): return False # 检查文件大小是否合理(小于10MB) file_size = os.path.getsize(file_path) if file_size > 10 * 1024 * 1024: # 10MB return False return True