You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

761 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# P2P Network Communication - Image Processor Module
"""
图片处理模块
负责图片格式检测、缩略图生成和图片压缩
需求: 5.1, 5.5
"""
import io
import logging
import os
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Optional, Tuple, Union
try:
from PIL import Image, UnidentifiedImageError
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
Image = None
UnidentifiedImageError = Exception
from config import ClientConfig
# 设置日志
logger = logging.getLogger(__name__)
class ImageFormat(Enum):
"""支持的图片格式枚举"""
JPEG = "jpeg"
PNG = "png"
GIF = "gif"
BMP = "bmp"
UNKNOWN = "unknown"
class ImageProcessorError(Exception):
"""图片处理错误基类"""
pass
class UnsupportedFormatError(ImageProcessorError):
"""不支持的图片格式错误"""
pass
class ImageNotFoundError(ImageProcessorError):
"""图片文件不存在错误"""
pass
class ThumbnailGenerationError(ImageProcessorError):
"""缩略图生成错误"""
pass
class CompressionError(ImageProcessorError):
"""图片压缩错误"""
pass
@dataclass
class ImageInfo:
"""图片信息数据结构"""
path: str
format: ImageFormat
width: int
height: int
file_size: int
mode: str # RGB, RGBA, L, etc.
@property
def aspect_ratio(self) -> float:
"""获取宽高比"""
if self.height == 0:
return 0.0
return self.width / self.height
@property
def is_large(self) -> bool:
"""判断是否为大图片超过5MB"""
return self.file_size > 5 * 1024 * 1024
def to_dict(self) -> dict:
"""转换为字典"""
return {
"path": self.path,
"format": self.format.value,
"width": self.width,
"height": self.height,
"file_size": self.file_size,
"mode": self.mode,
"aspect_ratio": self.aspect_ratio,
"is_large": self.is_large,
}
# 文件头魔数映射
IMAGE_SIGNATURES = {
b'\xff\xd8\xff': ImageFormat.JPEG,
b'\x89PNG\r\n\x1a\n': ImageFormat.PNG,
b'GIF87a': ImageFormat.GIF,
b'GIF89a': ImageFormat.GIF,
b'BM': ImageFormat.BMP,
}
# 格式到扩展名映射
FORMAT_EXTENSIONS = {
ImageFormat.JPEG: ['.jpg', '.jpeg', '.jpe', '.jfif'],
ImageFormat.PNG: ['.png'],
ImageFormat.GIF: ['.gif'],
ImageFormat.BMP: ['.bmp', '.dib'],
}
# PIL格式名称映射
PIL_FORMAT_MAP = {
'JPEG': ImageFormat.JPEG,
'PNG': ImageFormat.PNG,
'GIF': ImageFormat.GIF,
'BMP': ImageFormat.BMP,
}
# 反向映射ImageFormat到PIL格式名称
FORMAT_TO_PIL = {
ImageFormat.JPEG: 'JPEG',
ImageFormat.PNG: 'PNG',
ImageFormat.GIF: 'GIF',
ImageFormat.BMP: 'BMP',
}
class ImageProcessor:
"""
图片处理器
负责:
- 图片格式检测 (需求 5.1)
- 缩略图生成 (需求 5.2, 5.3)
- 图片压缩 (需求 5.5)
"""
# 支持的图片格式
SUPPORTED_FORMATS = {ImageFormat.JPEG, ImageFormat.PNG, ImageFormat.GIF, ImageFormat.BMP}
# 默认缩略图大小
DEFAULT_THUMBNAIL_SIZE = (200, 200)
# 默认压缩质量 (JPEG)
DEFAULT_QUALITY = 85
# 大图片阈值 (5MB)
LARGE_IMAGE_THRESHOLD = 5 * 1024 * 1024
def __init__(self, config: Optional[ClientConfig] = None):
"""
初始化图片处理器
Args:
config: 客户端配置
"""
if not PIL_AVAILABLE:
raise ImageProcessorError("Pillow library is not installed. Please install it with: pip install Pillow")
self.config = config or ClientConfig()
self._thumbnail_size = getattr(self.config, 'thumbnail_size', self.DEFAULT_THUMBNAIL_SIZE)
# 确保缓存目录存在
self._cache_dir = Path(self.config.cache_dir)
self._cache_dir.mkdir(parents=True, exist_ok=True)
logger.info("ImageProcessor initialized")
# ==================== 格式检测 (需求 5.1) ====================
def detect_format_from_header(self, file_path: str) -> ImageFormat:
"""
通过文件头魔数检测图片格式
实现图片格式检测JPG、PNG、GIF、BMP(需求 5.1)
Args:
file_path: 文件路径
Returns:
检测到的图片格式
Raises:
ImageNotFoundError: 文件不存在
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with open(file_path, 'rb') as f:
header = f.read(16) # 读取前16字节
# 检查各种格式的魔数
for signature, fmt in IMAGE_SIGNATURES.items():
if header.startswith(signature):
return fmt
return ImageFormat.UNKNOWN
except Exception as e:
logger.error(f"Failed to detect image format: {e}")
return ImageFormat.UNKNOWN
def detect_format_from_extension(self, file_path: str) -> ImageFormat:
"""
通过文件扩展名检测图片格式
Args:
file_path: 文件路径
Returns:
检测到的图片格式
"""
ext = Path(file_path).suffix.lower()
for fmt, extensions in FORMAT_EXTENSIONS.items():
if ext in extensions:
return fmt
return ImageFormat.UNKNOWN
def detect_format(self, file_path: str) -> ImageFormat:
"""
检测图片格式(优先使用文件头,其次使用扩展名)
实现图片格式检测JPG、PNG、GIF、BMP(需求 5.1)
Args:
file_path: 文件路径
Returns:
检测到的图片格式
Raises:
ImageNotFoundError: 文件不存在
"""
# 优先使用文件头检测
fmt = self.detect_format_from_header(file_path)
if fmt != ImageFormat.UNKNOWN:
return fmt
# 回退到扩展名检测
return self.detect_format_from_extension(file_path)
def is_supported_format(self, file_path: str) -> bool:
"""
检查文件是否为支持的图片格式
Args:
file_path: 文件路径
Returns:
如果是支持的格式返回True否则返回False
"""
try:
fmt = self.detect_format(file_path)
return fmt in self.SUPPORTED_FORMATS
except ImageNotFoundError:
return False
def get_image_info(self, file_path: str) -> ImageInfo:
"""
获取图片详细信息
Args:
file_path: 文件路径
Returns:
图片信息
Raises:
ImageNotFoundError: 文件不存在
UnsupportedFormatError: 不支持的格式
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
pil_format = img.format
fmt = PIL_FORMAT_MAP.get(pil_format, ImageFormat.UNKNOWN)
if fmt == ImageFormat.UNKNOWN:
# 尝试通过文件头检测
fmt = self.detect_format(file_path)
return ImageInfo(
path=file_path,
format=fmt,
width=img.width,
height=img.height,
file_size=os.path.getsize(file_path),
mode=img.mode,
)
except UnidentifiedImageError:
raise UnsupportedFormatError(f"Unsupported image format: {file_path}")
except Exception as e:
logger.error(f"Failed to get image info: {e}")
raise ImageProcessorError(f"Failed to get image info: {e}")
# ==================== 缩略图生成 (需求 5.2, 5.3) ====================
def generate_thumbnail(
self,
file_path: str,
output_path: Optional[str] = None,
size: Optional[Tuple[int, int]] = None,
preserve_aspect_ratio: bool = True
) -> str:
"""
生成图片缩略图
实现图片缩略图生成 (需求 5.2, 5.3)
Args:
file_path: 原始图片路径
output_path: 输出路径(可选,默认保存到缓存目录)
size: 缩略图大小 (width, height),默认 (200, 200)
preserve_aspect_ratio: 是否保持宽高比
Returns:
缩略图文件路径
Raises:
ImageNotFoundError: 文件不存在
ThumbnailGenerationError: 缩略图生成失败
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
size = size or self._thumbnail_size
try:
with Image.open(file_path) as img:
# 处理RGBA模式的图片转换为RGB以支持JPEG保存
if img.mode in ('RGBA', 'LA', 'P'):
# 创建白色背景
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
# 生成缩略图
if preserve_aspect_ratio:
img.thumbnail(size, Image.Resampling.LANCZOS)
else:
img = img.resize(size, Image.Resampling.LANCZOS)
# 确定输出路径
if output_path is None:
file_name = Path(file_path).stem
output_path = str(self._cache_dir / f"{file_name}_thumb.jpg")
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 保存缩略图
img.save(output_path, 'JPEG', quality=85)
logger.info(f"Thumbnail generated: {output_path}")
return output_path
except UnidentifiedImageError:
raise ThumbnailGenerationError(f"Cannot open image: {file_path}")
except Exception as e:
logger.error(f"Failed to generate thumbnail: {e}")
raise ThumbnailGenerationError(f"Failed to generate thumbnail: {e}")
def generate_thumbnail_bytes(
self,
file_path: str,
size: Optional[Tuple[int, int]] = None,
format: str = 'JPEG',
quality: int = 85
) -> bytes:
"""
生成缩略图并返回字节数据
Args:
file_path: 原始图片路径
size: 缩略图大小
format: 输出格式
quality: 压缩质量
Returns:
缩略图字节数据
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
size = size or self._thumbnail_size
try:
with Image.open(file_path) as img:
# 处理不同模式
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
img.thumbnail(size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format=format, quality=quality)
return buffer.getvalue()
except Exception as e:
logger.error(f"Failed to generate thumbnail bytes: {e}")
raise ThumbnailGenerationError(f"Failed to generate thumbnail: {e}")
# ==================== 图片压缩 (需求 5.5) ====================
def compress_image(
self,
file_path: str,
output_path: Optional[str] = None,
quality: int = 85,
max_size: Optional[Tuple[int, int]] = None,
target_file_size: Optional[int] = None
) -> str:
"""
压缩图片
实现图片压缩功能 (需求 5.5)
WHERE 图片文件较大超过5MB THEN File_Transfer_Module SHALL 提供压缩选项以加快传输
Args:
file_path: 原始图片路径
output_path: 输出路径(可选)
quality: 压缩质量 (1-100)仅对JPEG有效
max_size: 最大尺寸 (width, height),超过则缩放
target_file_size: 目标文件大小(字节),会自动调整质量
Returns:
压缩后的文件路径
Raises:
ImageNotFoundError: 文件不存在
CompressionError: 压缩失败
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
original_format = img.format
# 处理不同模式
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
# 如果指定了最大尺寸,进行缩放
if max_size:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
# 确定输出路径
if output_path is None:
file_name = Path(file_path).stem
output_path = str(self._cache_dir / f"{file_name}_compressed.jpg")
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 如果指定了目标文件大小,自动调整质量
if target_file_size:
quality = self._find_optimal_quality(img, target_file_size)
# 保存压缩后的图片
img.save(output_path, 'JPEG', quality=quality, optimize=True)
original_size = os.path.getsize(file_path)
compressed_size = os.path.getsize(output_path)
compression_ratio = (1 - compressed_size / original_size) * 100
logger.info(f"Image compressed: {file_path} -> {output_path} "
f"({original_size} -> {compressed_size} bytes, "
f"{compression_ratio:.1f}% reduction)")
return output_path
except UnidentifiedImageError:
raise CompressionError(f"Cannot open image: {file_path}")
except Exception as e:
logger.error(f"Failed to compress image: {e}")
raise CompressionError(f"Failed to compress image: {e}")
def _find_optimal_quality(
self,
img: 'Image.Image',
target_size: int,
min_quality: int = 10,
max_quality: int = 95
) -> int:
"""
二分查找最优压缩质量以达到目标文件大小
Args:
img: PIL Image对象
target_size: 目标文件大小(字节)
min_quality: 最小质量
max_quality: 最大质量
Returns:
最优质量值
"""
low, high = min_quality, max_quality
best_quality = max_quality
while low <= high:
mid = (low + high) // 2
buffer = io.BytesIO()
img.save(buffer, 'JPEG', quality=mid, optimize=True)
size = buffer.tell()
if size <= target_size:
best_quality = mid
low = mid + 1
else:
high = mid - 1
return best_quality
def compress_image_bytes(
self,
image_data: bytes,
quality: int = 85,
max_size: Optional[Tuple[int, int]] = None
) -> bytes:
"""
压缩图片字节数据
Args:
image_data: 原始图片字节数据
quality: 压缩质量
max_size: 最大尺寸
Returns:
压缩后的字节数据
"""
try:
with Image.open(io.BytesIO(image_data)) as img:
# 处理不同模式
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
if max_size:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, 'JPEG', quality=quality, optimize=True)
return buffer.getvalue()
except Exception as e:
logger.error(f"Failed to compress image bytes: {e}")
raise CompressionError(f"Failed to compress image: {e}")
def should_compress(self, file_path: str) -> bool:
"""
判断图片是否需要压缩
WHERE 图片文件较大超过5MB THEN File_Transfer_Module SHALL 提供压缩选项
Args:
file_path: 文件路径
Returns:
如果文件大于5MB返回True
"""
try:
file_size = os.path.getsize(file_path)
return file_size > self.LARGE_IMAGE_THRESHOLD
except Exception:
return False
# ==================== 图片操作 (需求 5.6) ====================
def rotate_image(
self,
file_path: str,
angle: float,
output_path: Optional[str] = None,
expand: bool = True
) -> str:
"""
旋转图片
实现图片旋转操作 (需求 5.6)
Args:
file_path: 原始图片路径
angle: 旋转角度(逆时针)
output_path: 输出路径
expand: 是否扩展画布以容纳旋转后的图片
Returns:
旋转后的文件路径
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
rotated = img.rotate(angle, expand=expand, resample=Image.Resampling.BICUBIC)
if output_path is None:
file_name = Path(file_path).stem
ext = Path(file_path).suffix
output_path = str(self._cache_dir / f"{file_name}_rotated{ext}")
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 保持原始格式
fmt = img.format or 'JPEG'
if fmt == 'JPEG' and rotated.mode == 'RGBA':
rotated = rotated.convert('RGB')
rotated.save(output_path, fmt)
logger.info(f"Image rotated {angle} degrees: {output_path}")
return output_path
except Exception as e:
logger.error(f"Failed to rotate image: {e}")
raise ImageProcessorError(f"Failed to rotate image: {e}")
def resize_image(
self,
file_path: str,
size: Tuple[int, int],
output_path: Optional[str] = None,
preserve_aspect_ratio: bool = True
) -> str:
"""
调整图片大小
实现图片缩放操作 (需求 5.6)
Args:
file_path: 原始图片路径
size: 目标大小 (width, height)
output_path: 输出路径
preserve_aspect_ratio: 是否保持宽高比
Returns:
调整后的文件路径
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
if preserve_aspect_ratio:
img.thumbnail(size, Image.Resampling.LANCZOS)
resized = img.copy()
else:
resized = img.resize(size, Image.Resampling.LANCZOS)
if output_path is None:
file_name = Path(file_path).stem
ext = Path(file_path).suffix
output_path = str(self._cache_dir / f"{file_name}_resized{ext}")
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 保持原始格式
fmt = img.format or 'JPEG'
if fmt == 'JPEG' and resized.mode == 'RGBA':
resized = resized.convert('RGB')
resized.save(output_path, fmt)
logger.info(f"Image resized to {resized.size}: {output_path}")
return output_path
except Exception as e:
logger.error(f"Failed to resize image: {e}")
raise ImageProcessorError(f"Failed to resize image: {e}")
# ==================== 工具方法 ====================
def get_supported_formats(self) -> list:
"""
获取支持的图片格式列表
Returns:
支持的格式列表
"""
return [fmt.value for fmt in self.SUPPORTED_FORMATS]
def get_supported_extensions(self) -> list:
"""
获取支持的文件扩展名列表
Returns:
支持的扩展名列表
"""
extensions = []
for fmt in self.SUPPORTED_FORMATS:
extensions.extend(FORMAT_EXTENSIONS.get(fmt, []))
return extensions
def clear_cache(self) -> int:
"""
清理缓存目录中的临时文件
Returns:
删除的文件数量
"""
count = 0
try:
for file in self._cache_dir.glob("*_thumb.*"):
file.unlink()
count += 1
for file in self._cache_dir.glob("*_compressed.*"):
file.unlink()
count += 1
for file in self._cache_dir.glob("*_rotated.*"):
file.unlink()
count += 1
for file in self._cache_dir.glob("*_resized.*"):
file.unlink()
count += 1
logger.info(f"Cleared {count} cached files")
return count
except Exception as e:
logger.error(f"Failed to clear cache: {e}")
return count