# P2P Network Communication - Image Processor Module """ 图片处理模块 负责图片格式检测、缩略图生成和图片压缩 需求: 5.1, 5.5 """ import io import logging import os from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Optional, Tuple, Union try: from PIL import Image, UnidentifiedImageError PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False Image = None UnidentifiedImageError = Exception from config import ClientConfig # 设置日志 logger = logging.getLogger(__name__) class ImageFormat(Enum): """支持的图片格式枚举""" JPEG = "jpeg" PNG = "png" GIF = "gif" BMP = "bmp" UNKNOWN = "unknown" class ImageProcessorError(Exception): """图片处理错误基类""" pass class UnsupportedFormatError(ImageProcessorError): """不支持的图片格式错误""" pass class ImageNotFoundError(ImageProcessorError): """图片文件不存在错误""" pass class ThumbnailGenerationError(ImageProcessorError): """缩略图生成错误""" pass class CompressionError(ImageProcessorError): """图片压缩错误""" pass @dataclass class ImageInfo: """图片信息数据结构""" path: str format: ImageFormat width: int height: int file_size: int mode: str # RGB, RGBA, L, etc. @property def aspect_ratio(self) -> float: """获取宽高比""" if self.height == 0: return 0.0 return self.width / self.height @property def is_large(self) -> bool: """判断是否为大图片(超过5MB)""" return self.file_size > 5 * 1024 * 1024 def to_dict(self) -> dict: """转换为字典""" return { "path": self.path, "format": self.format.value, "width": self.width, "height": self.height, "file_size": self.file_size, "mode": self.mode, "aspect_ratio": self.aspect_ratio, "is_large": self.is_large, } # 文件头魔数映射 IMAGE_SIGNATURES = { b'\xff\xd8\xff': ImageFormat.JPEG, b'\x89PNG\r\n\x1a\n': ImageFormat.PNG, b'GIF87a': ImageFormat.GIF, b'GIF89a': ImageFormat.GIF, b'BM': ImageFormat.BMP, } # 格式到扩展名映射 FORMAT_EXTENSIONS = { ImageFormat.JPEG: ['.jpg', '.jpeg', '.jpe', '.jfif'], ImageFormat.PNG: ['.png'], ImageFormat.GIF: ['.gif'], ImageFormat.BMP: ['.bmp', '.dib'], } # PIL格式名称映射 PIL_FORMAT_MAP = { 'JPEG': ImageFormat.JPEG, 'PNG': ImageFormat.PNG, 'GIF': ImageFormat.GIF, 'BMP': ImageFormat.BMP, } # 反向映射:ImageFormat到PIL格式名称 FORMAT_TO_PIL = { ImageFormat.JPEG: 'JPEG', ImageFormat.PNG: 'PNG', ImageFormat.GIF: 'GIF', ImageFormat.BMP: 'BMP', } class ImageProcessor: """ 图片处理器 负责: - 图片格式检测 (需求 5.1) - 缩略图生成 (需求 5.2, 5.3) - 图片压缩 (需求 5.5) """ # 支持的图片格式 SUPPORTED_FORMATS = {ImageFormat.JPEG, ImageFormat.PNG, ImageFormat.GIF, ImageFormat.BMP} # 默认缩略图大小 DEFAULT_THUMBNAIL_SIZE = (200, 200) # 默认压缩质量 (JPEG) DEFAULT_QUALITY = 85 # 大图片阈值 (5MB) LARGE_IMAGE_THRESHOLD = 5 * 1024 * 1024 def __init__(self, config: Optional[ClientConfig] = None): """ 初始化图片处理器 Args: config: 客户端配置 """ if not PIL_AVAILABLE: raise ImageProcessorError("Pillow library is not installed. Please install it with: pip install Pillow") self.config = config or ClientConfig() self._thumbnail_size = getattr(self.config, 'thumbnail_size', self.DEFAULT_THUMBNAIL_SIZE) # 确保缓存目录存在 self._cache_dir = Path(self.config.cache_dir) self._cache_dir.mkdir(parents=True, exist_ok=True) logger.info("ImageProcessor initialized") # ==================== 格式检测 (需求 5.1) ==================== def detect_format_from_header(self, file_path: str) -> ImageFormat: """ 通过文件头魔数检测图片格式 实现图片格式检测(JPG、PNG、GIF、BMP)(需求 5.1) Args: file_path: 文件路径 Returns: 检测到的图片格式 Raises: ImageNotFoundError: 文件不存在 """ if not os.path.exists(file_path): raise ImageNotFoundError(f"Image file not found: {file_path}") try: with open(file_path, 'rb') as f: header = f.read(16) # 读取前16字节 # 检查各种格式的魔数 for signature, fmt in IMAGE_SIGNATURES.items(): if header.startswith(signature): return fmt return ImageFormat.UNKNOWN except Exception as e: logger.error(f"Failed to detect image format: {e}") return ImageFormat.UNKNOWN def detect_format_from_extension(self, file_path: str) -> ImageFormat: """ 通过文件扩展名检测图片格式 Args: file_path: 文件路径 Returns: 检测到的图片格式 """ ext = Path(file_path).suffix.lower() for fmt, extensions in FORMAT_EXTENSIONS.items(): if ext in extensions: return fmt return ImageFormat.UNKNOWN def detect_format(self, file_path: str) -> ImageFormat: """ 检测图片格式(优先使用文件头,其次使用扩展名) 实现图片格式检测(JPG、PNG、GIF、BMP)(需求 5.1) Args: file_path: 文件路径 Returns: 检测到的图片格式 Raises: ImageNotFoundError: 文件不存在 """ # 优先使用文件头检测 fmt = self.detect_format_from_header(file_path) if fmt != ImageFormat.UNKNOWN: return fmt # 回退到扩展名检测 return self.detect_format_from_extension(file_path) def is_supported_format(self, file_path: str) -> bool: """ 检查文件是否为支持的图片格式 Args: file_path: 文件路径 Returns: 如果是支持的格式返回True,否则返回False """ try: fmt = self.detect_format(file_path) return fmt in self.SUPPORTED_FORMATS except ImageNotFoundError: return False def get_image_info(self, file_path: str) -> ImageInfo: """ 获取图片详细信息 Args: file_path: 文件路径 Returns: 图片信息 Raises: ImageNotFoundError: 文件不存在 UnsupportedFormatError: 不支持的格式 """ if not os.path.exists(file_path): raise ImageNotFoundError(f"Image file not found: {file_path}") try: with Image.open(file_path) as img: pil_format = img.format fmt = PIL_FORMAT_MAP.get(pil_format, ImageFormat.UNKNOWN) if fmt == ImageFormat.UNKNOWN: # 尝试通过文件头检测 fmt = self.detect_format(file_path) return ImageInfo( path=file_path, format=fmt, width=img.width, height=img.height, file_size=os.path.getsize(file_path), mode=img.mode, ) except UnidentifiedImageError: raise UnsupportedFormatError(f"Unsupported image format: {file_path}") except Exception as e: logger.error(f"Failed to get image info: {e}") raise ImageProcessorError(f"Failed to get image info: {e}") # ==================== 缩略图生成 (需求 5.2, 5.3) ==================== def generate_thumbnail( self, file_path: str, output_path: Optional[str] = None, size: Optional[Tuple[int, int]] = None, preserve_aspect_ratio: bool = True ) -> str: """ 生成图片缩略图 实现图片缩略图生成 (需求 5.2, 5.3) Args: file_path: 原始图片路径 output_path: 输出路径(可选,默认保存到缓存目录) size: 缩略图大小 (width, height),默认 (200, 200) preserve_aspect_ratio: 是否保持宽高比 Returns: 缩略图文件路径 Raises: ImageNotFoundError: 文件不存在 ThumbnailGenerationError: 缩略图生成失败 """ if not os.path.exists(file_path): raise ImageNotFoundError(f"Image file not found: {file_path}") size = size or self._thumbnail_size try: with Image.open(file_path) as img: # 处理RGBA模式的图片(转换为RGB以支持JPEG保存) if img.mode in ('RGBA', 'LA', 'P'): # 创建白色背景 background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) img = background elif img.mode != 'RGB': img = img.convert('RGB') # 生成缩略图 if preserve_aspect_ratio: img.thumbnail(size, Image.Resampling.LANCZOS) else: img = img.resize(size, Image.Resampling.LANCZOS) # 确定输出路径 if output_path is None: file_name = Path(file_path).stem output_path = str(self._cache_dir / f"{file_name}_thumb.jpg") # 确保输出目录存在 os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) # 保存缩略图 img.save(output_path, 'JPEG', quality=85) logger.info(f"Thumbnail generated: {output_path}") return output_path except UnidentifiedImageError: raise ThumbnailGenerationError(f"Cannot open image: {file_path}") except Exception as e: logger.error(f"Failed to generate thumbnail: {e}") raise ThumbnailGenerationError(f"Failed to generate thumbnail: {e}") def generate_thumbnail_bytes( self, file_path: str, size: Optional[Tuple[int, int]] = None, format: str = 'JPEG', quality: int = 85 ) -> bytes: """ 生成缩略图并返回字节数据 Args: file_path: 原始图片路径 size: 缩略图大小 format: 输出格式 quality: 压缩质量 Returns: 缩略图字节数据 """ if not os.path.exists(file_path): raise ImageNotFoundError(f"Image file not found: {file_path}") size = size or self._thumbnail_size try: with Image.open(file_path) as img: # 处理不同模式 if img.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) img = background elif img.mode != 'RGB': img = img.convert('RGB') img.thumbnail(size, Image.Resampling.LANCZOS) buffer = io.BytesIO() img.save(buffer, format=format, quality=quality) return buffer.getvalue() except Exception as e: logger.error(f"Failed to generate thumbnail bytes: {e}") raise ThumbnailGenerationError(f"Failed to generate thumbnail: {e}") # ==================== 图片压缩 (需求 5.5) ==================== def compress_image( self, file_path: str, output_path: Optional[str] = None, quality: int = 85, max_size: Optional[Tuple[int, int]] = None, target_file_size: Optional[int] = None ) -> str: """ 压缩图片 实现图片压缩功能 (需求 5.5) WHERE 图片文件较大(超过5MB) THEN File_Transfer_Module SHALL 提供压缩选项以加快传输 Args: file_path: 原始图片路径 output_path: 输出路径(可选) quality: 压缩质量 (1-100),仅对JPEG有效 max_size: 最大尺寸 (width, height),超过则缩放 target_file_size: 目标文件大小(字节),会自动调整质量 Returns: 压缩后的文件路径 Raises: ImageNotFoundError: 文件不存在 CompressionError: 压缩失败 """ if not os.path.exists(file_path): raise ImageNotFoundError(f"Image file not found: {file_path}") try: with Image.open(file_path) as img: original_format = img.format # 处理不同模式 if img.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) img = background elif img.mode != 'RGB': img = img.convert('RGB') # 如果指定了最大尺寸,进行缩放 if max_size: img.thumbnail(max_size, Image.Resampling.LANCZOS) # 确定输出路径 if output_path is None: file_name = Path(file_path).stem output_path = str(self._cache_dir / f"{file_name}_compressed.jpg") # 确保输出目录存在 os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) # 如果指定了目标文件大小,自动调整质量 if target_file_size: quality = self._find_optimal_quality(img, target_file_size) # 保存压缩后的图片 img.save(output_path, 'JPEG', quality=quality, optimize=True) original_size = os.path.getsize(file_path) compressed_size = os.path.getsize(output_path) compression_ratio = (1 - compressed_size / original_size) * 100 logger.info(f"Image compressed: {file_path} -> {output_path} " f"({original_size} -> {compressed_size} bytes, " f"{compression_ratio:.1f}% reduction)") return output_path except UnidentifiedImageError: raise CompressionError(f"Cannot open image: {file_path}") except Exception as e: logger.error(f"Failed to compress image: {e}") raise CompressionError(f"Failed to compress image: {e}") def _find_optimal_quality( self, img: 'Image.Image', target_size: int, min_quality: int = 10, max_quality: int = 95 ) -> int: """ 二分查找最优压缩质量以达到目标文件大小 Args: img: PIL Image对象 target_size: 目标文件大小(字节) min_quality: 最小质量 max_quality: 最大质量 Returns: 最优质量值 """ low, high = min_quality, max_quality best_quality = max_quality while low <= high: mid = (low + high) // 2 buffer = io.BytesIO() img.save(buffer, 'JPEG', quality=mid, optimize=True) size = buffer.tell() if size <= target_size: best_quality = mid low = mid + 1 else: high = mid - 1 return best_quality def compress_image_bytes( self, image_data: bytes, quality: int = 85, max_size: Optional[Tuple[int, int]] = None ) -> bytes: """ 压缩图片字节数据 Args: image_data: 原始图片字节数据 quality: 压缩质量 max_size: 最大尺寸 Returns: 压缩后的字节数据 """ try: with Image.open(io.BytesIO(image_data)) as img: # 处理不同模式 if img.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) img = background elif img.mode != 'RGB': img = img.convert('RGB') if max_size: img.thumbnail(max_size, Image.Resampling.LANCZOS) buffer = io.BytesIO() img.save(buffer, 'JPEG', quality=quality, optimize=True) return buffer.getvalue() except Exception as e: logger.error(f"Failed to compress image bytes: {e}") raise CompressionError(f"Failed to compress image: {e}") def should_compress(self, file_path: str) -> bool: """ 判断图片是否需要压缩 WHERE 图片文件较大(超过5MB) THEN File_Transfer_Module SHALL 提供压缩选项 Args: file_path: 文件路径 Returns: 如果文件大于5MB返回True """ try: file_size = os.path.getsize(file_path) return file_size > self.LARGE_IMAGE_THRESHOLD except Exception: return False # ==================== 图片操作 (需求 5.6) ==================== def rotate_image( self, file_path: str, angle: float, output_path: Optional[str] = None, expand: bool = True ) -> str: """ 旋转图片 实现图片旋转操作 (需求 5.6) Args: file_path: 原始图片路径 angle: 旋转角度(逆时针) output_path: 输出路径 expand: 是否扩展画布以容纳旋转后的图片 Returns: 旋转后的文件路径 """ if not os.path.exists(file_path): raise ImageNotFoundError(f"Image file not found: {file_path}") try: with Image.open(file_path) as img: rotated = img.rotate(angle, expand=expand, resample=Image.Resampling.BICUBIC) if output_path is None: file_name = Path(file_path).stem ext = Path(file_path).suffix output_path = str(self._cache_dir / f"{file_name}_rotated{ext}") os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) # 保持原始格式 fmt = img.format or 'JPEG' if fmt == 'JPEG' and rotated.mode == 'RGBA': rotated = rotated.convert('RGB') rotated.save(output_path, fmt) logger.info(f"Image rotated {angle} degrees: {output_path}") return output_path except Exception as e: logger.error(f"Failed to rotate image: {e}") raise ImageProcessorError(f"Failed to rotate image: {e}") def resize_image( self, file_path: str, size: Tuple[int, int], output_path: Optional[str] = None, preserve_aspect_ratio: bool = True ) -> str: """ 调整图片大小 实现图片缩放操作 (需求 5.6) Args: file_path: 原始图片路径 size: 目标大小 (width, height) output_path: 输出路径 preserve_aspect_ratio: 是否保持宽高比 Returns: 调整后的文件路径 """ if not os.path.exists(file_path): raise ImageNotFoundError(f"Image file not found: {file_path}") try: with Image.open(file_path) as img: if preserve_aspect_ratio: img.thumbnail(size, Image.Resampling.LANCZOS) resized = img.copy() else: resized = img.resize(size, Image.Resampling.LANCZOS) if output_path is None: file_name = Path(file_path).stem ext = Path(file_path).suffix output_path = str(self._cache_dir / f"{file_name}_resized{ext}") os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) # 保持原始格式 fmt = img.format or 'JPEG' if fmt == 'JPEG' and resized.mode == 'RGBA': resized = resized.convert('RGB') resized.save(output_path, fmt) logger.info(f"Image resized to {resized.size}: {output_path}") return output_path except Exception as e: logger.error(f"Failed to resize image: {e}") raise ImageProcessorError(f"Failed to resize image: {e}") # ==================== 工具方法 ==================== def get_supported_formats(self) -> list: """ 获取支持的图片格式列表 Returns: 支持的格式列表 """ return [fmt.value for fmt in self.SUPPORTED_FORMATS] def get_supported_extensions(self) -> list: """ 获取支持的文件扩展名列表 Returns: 支持的扩展名列表 """ extensions = [] for fmt in self.SUPPORTED_FORMATS: extensions.extend(FORMAT_EXTENSIONS.get(fmt, [])) return extensions def clear_cache(self) -> int: """ 清理缓存目录中的临时文件 Returns: 删除的文件数量 """ count = 0 try: for file in self._cache_dir.glob("*_thumb.*"): file.unlink() count += 1 for file in self._cache_dir.glob("*_compressed.*"): file.unlink() count += 1 for file in self._cache_dir.glob("*_rotated.*"): file.unlink() count += 1 for file in self._cache_dir.glob("*_resized.*"): file.unlink() count += 1 logger.info(f"Cleared {count} cached files") return count except Exception as e: logger.error(f"Failed to clear cache: {e}") return count