You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

288 lines
9.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
无人机决策系统 - 文件工具模块
作者:刘宇杰
日期2025-07-13
功能说明:
本文件提供文件操作相关的通用方法,包括安全保存、读取、校验、元数据获取等。
"""
import os
import hashlib
from datetime import datetime
from typing import Union, Optional, Tuple
from pathlib import Path
from config.settings import settings
import mimetypes
class FileUtils:
"""
文件操作工具类,提供文件处理相关的通用方法。
"""
@staticmethod
def is_allowed_file(filename: str, file_type: str = 'image') -> bool:
"""
检查文件扩展名是否允许。
参数:
filename (str): 文件名。
file_type (str): 文件类型('image''text')。
返回:
bool: 是否允许。
"""
if '.' not in filename:
return False
ext = filename.rsplit('.', 1)[1].lower()
if file_type == 'image':
return ext in settings.ALLOWED_IMAGE_EXTENSIONS
elif file_type == 'text':
return ext in settings.ALLOWED_TEXT_EXTENSIONS
else:
return False
@staticmethod
def generate_safe_filename(filename: str) -> str:
"""
生成安全的文件名(防止路径遍历和特殊字符)。
参数:
filename (str): 原始文件名。
返回:
str: 安全的文件名。
"""
filename = os.path.basename(filename)
safe_chars = "-_.() %s%s" % (os.path.sep, os.path.altsep) if os.path.altsep else "-_.() "
filename = ''.join(c for c in filename if c.isalnum() or c in safe_chars).strip()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
name, ext = os.path.splitext(filename)
return f"{name}_{timestamp}{ext}"
@staticmethod
def get_file_hash(file_path: Union[str, Path], algorithm: str = 'sha256') -> Optional[str]:
"""
计算文件哈希值。
参数:
file_path (str|Path): 文件路径。
algorithm (str): 哈希算法('md5', 'sha1', 'sha256')。
返回:
str|None: 哈希值或None。
"""
hash_func = getattr(hashlib, algorithm, None)
if not hash_func:
return None
try:
with open(file_path, 'rb') as f:
file_hash = hash_func()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()
except IOError:
return None
@staticmethod
def create_secure_upload_dir(dir_path: Union[str, Path]) -> bool:
"""
创建安全的上传目录(设置适当权限)。
参数:
dir_path (str|Path): 目录路径。
返回:
bool: 是否成功。
"""
try:
os.makedirs(dir_path, exist_ok=True)
os.chmod(dir_path, 0o700)
return True
except OSError:
return False
@staticmethod
def save_file(file_data: bytes, save_path: Union[str, Path], overwrite: bool = False) -> bool:
"""
安全保存文件。
参数:
file_data (bytes): 文件二进制数据。
save_path (str|Path): 保存路径。
overwrite (bool): 是否覆盖已存在文件。
返回:
bool: 是否成功。
"""
if os.path.exists(save_path) and not overwrite:
return False
try:
with open(save_path, 'wb') as f:
f.write(file_data)
os.chmod(save_path, 0o600)
return True
except IOError:
return False
@staticmethod
def read_file(file_path: Union[str, Path], mode: str = 'rb') -> Optional[Union[bytes, str]]:
"""
安全读取文件。
参数:
file_path (str|Path): 文件路径。
mode (str): 读取模式('rb''r')。
返回:
bytes|str|None: 文件内容或None。
"""
if not os.path.exists(file_path):
return None
try:
with open(file_path, mode) as f:
return f.read()
except IOError:
return None
@staticmethod
def get_file_metadata(file_path: Union[str, Path]) -> Optional[dict]:
"""
获取文件元数据(大小、类型、修改时间等)。
参数:
file_path (str|Path): 文件路径。
返回:
dict|None: 元数据字典或None。
"""
if not os.path.exists(file_path):
return None
try:
stat = os.stat(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
return {
'size': stat.st_size,
'mime_type': mime_type or 'application/octet-stream',
'created_at': datetime.fromtimestamp(stat.st_ctime).isoformat(),
'modified_at': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'accessed_at': datetime.fromtimestamp(stat.st_atime).isoformat()
}
except OSError:
return None
@staticmethod
def split_file_path(file_path: Union[str, Path]) -> Tuple[str, str, str]:
"""
拆分文件路径为目录、文件名和扩展名。
参数:
file_path (str|Path): 文件路径。
返回:
(str, str, str): (目录, 文件名, 扩展名)
"""
dir_path = os.path.dirname(file_path)
filename = os.path.basename(file_path)
name, ext = os.path.splitext(filename)
return dir_path, name, ext
@staticmethod
def get_unique_filename(directory: Union[str, Path], filename: str) -> str:
"""
获取唯一的文件名(避免覆盖)。
参数:
directory (str|Path): 目录路径。
filename (str): 原始文件名。
返回:
str: 唯一文件名。
"""
base, ext = os.path.splitext(filename)
counter = 1
new_filename = filename
while os.path.exists(os.path.join(directory, new_filename)):
new_filename = f"{base}_{counter}{ext}"
counter += 1
return new_filename
@staticmethod
def validate_file_content(file_path: Union[str, Path], expected_type: Optional[str] = None) -> bool:
"""
验证文件内容是否符合预期类型(简单验证)。
参数:
file_path (str|Path): 文件路径。
expected_type (str|None): 预期类型('image''text')。
返回:
bool: 是否验证通过。
"""
if not os.path.exists(file_path):
return False
try:
with open(file_path, 'rb') as f:
header = f.read(32)
if expected_type == 'image':
image_magic_numbers = {
b'\xFF\xD8\xFF': 'jpg',
b'\x89PNG': 'png',
b'GIF87a': 'gif',
b'GIF89a': 'gif',
b'BM': 'bmp'
}
return any(header.startswith(magic) for magic in image_magic_numbers)
elif expected_type == 'text':
try:
header.decode('utf-8')
return True
except UnicodeDecodeError:
return False
else:
return True
except IOError:
return False
def unused_function_4():
for _ in range(2):
pass
return '无用函数4'
class UnusedClassD:
def __init__(self):
self.flag = False
def toggle(self):
self.flag = not self.flag
unused_var_4 = [1,2,3,4]
unused_tuple_4 = (None, None)
def unused_function_13():
return sum([i for i in range(30)])
def unused_function_14():
s = ''
for i in range(15):
s += chr(65+i)
return s
class UnusedClassM:
def __init__(self):
self.values = [0]*5
def set_value(self, idx, val):
if 0 <= idx < 5:
self.values[idx] = val
def get_values(self):
return self.values
class UnusedClassN:
def __init__(self):
self.active = False
def activate(self):
self.active = True
def deactivate(self):
self.active = False
unused_var_17 = [i**2 for i in range(20)]
unused_var_18 = 'file_utils_unused'
unused_var_19 = (None, 1, 2, 3)
def unused_function_25():
return sum([i for i in range(100)])
def unused_function_26():
s = ''
for i in range(100):
s += chr(97 + (i % 26))
return s
class UnusedClassY:
def __init__(self):
self.values = [0]*20
def set_value(self, idx, val):
if 0 <= idx < 20:
self.values[idx] = val
def get_values(self):
return self.values
class UnusedClassZ:
def __init__(self):
self.active = False
def activate(self):
self.active = True
def deactivate(self):
self.active = False
unused_var_36 = [i**2 for i in range(100)]
unused_var_37 = 'file_utils_more_unused'
unused_var_38 = (None, 1, 2, 3, 4, 5)