确保媒体模块测试通过

main
杨博文 4 months ago
parent f9bd845a11
commit 0b2994e594

@ -25,3 +25,19 @@ from client.file_transfer import (
TransferState,
ProgressCallback,
)
from client.media_player import (
MediaPlayer,
AudioPlayer,
VideoPlayer,
MediaType,
PlaybackState,
AudioFormat,
VideoFormat,
MediaInfo,
MediaPlayerError,
MediaNotFoundError,
UnsupportedMediaFormatError,
PlaybackError,
MediaLoadError,
)

@ -0,0 +1,760 @@
# P2P Network Communication - Image Processor Module
"""
图片处理模块
负责图片格式检测缩略图生成和图片压缩
需求: 5.1, 5.5
"""
import io
import logging
import os
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Optional, Tuple, Union
try:
from PIL import Image, UnidentifiedImageError
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
Image = None
UnidentifiedImageError = Exception
from config import ClientConfig
# 设置日志
logger = logging.getLogger(__name__)
class ImageFormat(Enum):
"""支持的图片格式枚举"""
JPEG = "jpeg"
PNG = "png"
GIF = "gif"
BMP = "bmp"
UNKNOWN = "unknown"
class ImageProcessorError(Exception):
"""图片处理错误基类"""
pass
class UnsupportedFormatError(ImageProcessorError):
"""不支持的图片格式错误"""
pass
class ImageNotFoundError(ImageProcessorError):
"""图片文件不存在错误"""
pass
class ThumbnailGenerationError(ImageProcessorError):
"""缩略图生成错误"""
pass
class CompressionError(ImageProcessorError):
"""图片压缩错误"""
pass
@dataclass
class ImageInfo:
"""图片信息数据结构"""
path: str
format: ImageFormat
width: int
height: int
file_size: int
mode: str # RGB, RGBA, L, etc.
@property
def aspect_ratio(self) -> float:
"""获取宽高比"""
if self.height == 0:
return 0.0
return self.width / self.height
@property
def is_large(self) -> bool:
"""判断是否为大图片超过5MB"""
return self.file_size > 5 * 1024 * 1024
def to_dict(self) -> dict:
"""转换为字典"""
return {
"path": self.path,
"format": self.format.value,
"width": self.width,
"height": self.height,
"file_size": self.file_size,
"mode": self.mode,
"aspect_ratio": self.aspect_ratio,
"is_large": self.is_large,
}
# 文件头魔数映射
IMAGE_SIGNATURES = {
b'\xff\xd8\xff': ImageFormat.JPEG,
b'\x89PNG\r\n\x1a\n': ImageFormat.PNG,
b'GIF87a': ImageFormat.GIF,
b'GIF89a': ImageFormat.GIF,
b'BM': ImageFormat.BMP,
}
# 格式到扩展名映射
FORMAT_EXTENSIONS = {
ImageFormat.JPEG: ['.jpg', '.jpeg', '.jpe', '.jfif'],
ImageFormat.PNG: ['.png'],
ImageFormat.GIF: ['.gif'],
ImageFormat.BMP: ['.bmp', '.dib'],
}
# PIL格式名称映射
PIL_FORMAT_MAP = {
'JPEG': ImageFormat.JPEG,
'PNG': ImageFormat.PNG,
'GIF': ImageFormat.GIF,
'BMP': ImageFormat.BMP,
}
# 反向映射ImageFormat到PIL格式名称
FORMAT_TO_PIL = {
ImageFormat.JPEG: 'JPEG',
ImageFormat.PNG: 'PNG',
ImageFormat.GIF: 'GIF',
ImageFormat.BMP: 'BMP',
}
class ImageProcessor:
"""
图片处理器
负责:
- 图片格式检测 (需求 5.1)
- 缩略图生成 (需求 5.2, 5.3)
- 图片压缩 (需求 5.5)
"""
# 支持的图片格式
SUPPORTED_FORMATS = {ImageFormat.JPEG, ImageFormat.PNG, ImageFormat.GIF, ImageFormat.BMP}
# 默认缩略图大小
DEFAULT_THUMBNAIL_SIZE = (200, 200)
# 默认压缩质量 (JPEG)
DEFAULT_QUALITY = 85
# 大图片阈值 (5MB)
LARGE_IMAGE_THRESHOLD = 5 * 1024 * 1024
def __init__(self, config: Optional[ClientConfig] = None):
"""
初始化图片处理器
Args:
config: 客户端配置
"""
if not PIL_AVAILABLE:
raise ImageProcessorError("Pillow library is not installed. Please install it with: pip install Pillow")
self.config = config or ClientConfig()
self._thumbnail_size = getattr(self.config, 'thumbnail_size', self.DEFAULT_THUMBNAIL_SIZE)
# 确保缓存目录存在
self._cache_dir = Path(self.config.cache_dir)
self._cache_dir.mkdir(parents=True, exist_ok=True)
logger.info("ImageProcessor initialized")
# ==================== 格式检测 (需求 5.1) ====================
def detect_format_from_header(self, file_path: str) -> ImageFormat:
"""
通过文件头魔数检测图片格式
实现图片格式检测JPGPNGGIFBMP(需求 5.1)
Args:
file_path: 文件路径
Returns:
检测到的图片格式
Raises:
ImageNotFoundError: 文件不存在
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with open(file_path, 'rb') as f:
header = f.read(16) # 读取前16字节
# 检查各种格式的魔数
for signature, fmt in IMAGE_SIGNATURES.items():
if header.startswith(signature):
return fmt
return ImageFormat.UNKNOWN
except Exception as e:
logger.error(f"Failed to detect image format: {e}")
return ImageFormat.UNKNOWN
def detect_format_from_extension(self, file_path: str) -> ImageFormat:
"""
通过文件扩展名检测图片格式
Args:
file_path: 文件路径
Returns:
检测到的图片格式
"""
ext = Path(file_path).suffix.lower()
for fmt, extensions in FORMAT_EXTENSIONS.items():
if ext in extensions:
return fmt
return ImageFormat.UNKNOWN
def detect_format(self, file_path: str) -> ImageFormat:
"""
检测图片格式优先使用文件头其次使用扩展名
实现图片格式检测JPGPNGGIFBMP(需求 5.1)
Args:
file_path: 文件路径
Returns:
检测到的图片格式
Raises:
ImageNotFoundError: 文件不存在
"""
# 优先使用文件头检测
fmt = self.detect_format_from_header(file_path)
if fmt != ImageFormat.UNKNOWN:
return fmt
# 回退到扩展名检测
return self.detect_format_from_extension(file_path)
def is_supported_format(self, file_path: str) -> bool:
"""
检查文件是否为支持的图片格式
Args:
file_path: 文件路径
Returns:
如果是支持的格式返回True否则返回False
"""
try:
fmt = self.detect_format(file_path)
return fmt in self.SUPPORTED_FORMATS
except ImageNotFoundError:
return False
def get_image_info(self, file_path: str) -> ImageInfo:
"""
获取图片详细信息
Args:
file_path: 文件路径
Returns:
图片信息
Raises:
ImageNotFoundError: 文件不存在
UnsupportedFormatError: 不支持的格式
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
pil_format = img.format
fmt = PIL_FORMAT_MAP.get(pil_format, ImageFormat.UNKNOWN)
if fmt == ImageFormat.UNKNOWN:
# 尝试通过文件头检测
fmt = self.detect_format(file_path)
return ImageInfo(
path=file_path,
format=fmt,
width=img.width,
height=img.height,
file_size=os.path.getsize(file_path),
mode=img.mode,
)
except UnidentifiedImageError:
raise UnsupportedFormatError(f"Unsupported image format: {file_path}")
except Exception as e:
logger.error(f"Failed to get image info: {e}")
raise ImageProcessorError(f"Failed to get image info: {e}")
# ==================== 缩略图生成 (需求 5.2, 5.3) ====================
def generate_thumbnail(
self,
file_path: str,
output_path: Optional[str] = None,
size: Optional[Tuple[int, int]] = None,
preserve_aspect_ratio: bool = True
) -> str:
"""
生成图片缩略图
实现图片缩略图生成 (需求 5.2, 5.3)
Args:
file_path: 原始图片路径
output_path: 输出路径可选默认保存到缓存目录
size: 缩略图大小 (width, height)默认 (200, 200)
preserve_aspect_ratio: 是否保持宽高比
Returns:
缩略图文件路径
Raises:
ImageNotFoundError: 文件不存在
ThumbnailGenerationError: 缩略图生成失败
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
size = size or self._thumbnail_size
try:
with Image.open(file_path) as img:
# 处理RGBA模式的图片转换为RGB以支持JPEG保存
if img.mode in ('RGBA', 'LA', 'P'):
# 创建白色背景
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
# 生成缩略图
if preserve_aspect_ratio:
img.thumbnail(size, Image.Resampling.LANCZOS)
else:
img = img.resize(size, Image.Resampling.LANCZOS)
# 确定输出路径
if output_path is None:
file_name = Path(file_path).stem
output_path = str(self._cache_dir / f"{file_name}_thumb.jpg")
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 保存缩略图
img.save(output_path, 'JPEG', quality=85)
logger.info(f"Thumbnail generated: {output_path}")
return output_path
except UnidentifiedImageError:
raise ThumbnailGenerationError(f"Cannot open image: {file_path}")
except Exception as e:
logger.error(f"Failed to generate thumbnail: {e}")
raise ThumbnailGenerationError(f"Failed to generate thumbnail: {e}")
def generate_thumbnail_bytes(
self,
file_path: str,
size: Optional[Tuple[int, int]] = None,
format: str = 'JPEG',
quality: int = 85
) -> bytes:
"""
生成缩略图并返回字节数据
Args:
file_path: 原始图片路径
size: 缩略图大小
format: 输出格式
quality: 压缩质量
Returns:
缩略图字节数据
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
size = size or self._thumbnail_size
try:
with Image.open(file_path) as img:
# 处理不同模式
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
img.thumbnail(size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format=format, quality=quality)
return buffer.getvalue()
except Exception as e:
logger.error(f"Failed to generate thumbnail bytes: {e}")
raise ThumbnailGenerationError(f"Failed to generate thumbnail: {e}")
# ==================== 图片压缩 (需求 5.5) ====================
def compress_image(
self,
file_path: str,
output_path: Optional[str] = None,
quality: int = 85,
max_size: Optional[Tuple[int, int]] = None,
target_file_size: Optional[int] = None
) -> str:
"""
压缩图片
实现图片压缩功能 (需求 5.5)
WHERE 图片文件较大超过5MB THEN File_Transfer_Module SHALL 提供压缩选项以加快传输
Args:
file_path: 原始图片路径
output_path: 输出路径可选
quality: 压缩质量 (1-100)仅对JPEG有效
max_size: 最大尺寸 (width, height)超过则缩放
target_file_size: 目标文件大小字节会自动调整质量
Returns:
压缩后的文件路径
Raises:
ImageNotFoundError: 文件不存在
CompressionError: 压缩失败
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
original_format = img.format
# 处理不同模式
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
# 如果指定了最大尺寸,进行缩放
if max_size:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
# 确定输出路径
if output_path is None:
file_name = Path(file_path).stem
output_path = str(self._cache_dir / f"{file_name}_compressed.jpg")
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 如果指定了目标文件大小,自动调整质量
if target_file_size:
quality = self._find_optimal_quality(img, target_file_size)
# 保存压缩后的图片
img.save(output_path, 'JPEG', quality=quality, optimize=True)
original_size = os.path.getsize(file_path)
compressed_size = os.path.getsize(output_path)
compression_ratio = (1 - compressed_size / original_size) * 100
logger.info(f"Image compressed: {file_path} -> {output_path} "
f"({original_size} -> {compressed_size} bytes, "
f"{compression_ratio:.1f}% reduction)")
return output_path
except UnidentifiedImageError:
raise CompressionError(f"Cannot open image: {file_path}")
except Exception as e:
logger.error(f"Failed to compress image: {e}")
raise CompressionError(f"Failed to compress image: {e}")
def _find_optimal_quality(
self,
img: 'Image.Image',
target_size: int,
min_quality: int = 10,
max_quality: int = 95
) -> int:
"""
二分查找最优压缩质量以达到目标文件大小
Args:
img: PIL Image对象
target_size: 目标文件大小字节
min_quality: 最小质量
max_quality: 最大质量
Returns:
最优质量值
"""
low, high = min_quality, max_quality
best_quality = max_quality
while low <= high:
mid = (low + high) // 2
buffer = io.BytesIO()
img.save(buffer, 'JPEG', quality=mid, optimize=True)
size = buffer.tell()
if size <= target_size:
best_quality = mid
low = mid + 1
else:
high = mid - 1
return best_quality
def compress_image_bytes(
self,
image_data: bytes,
quality: int = 85,
max_size: Optional[Tuple[int, int]] = None
) -> bytes:
"""
压缩图片字节数据
Args:
image_data: 原始图片字节数据
quality: 压缩质量
max_size: 最大尺寸
Returns:
压缩后的字节数据
"""
try:
with Image.open(io.BytesIO(image_data)) as img:
# 处理不同模式
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
if max_size:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, 'JPEG', quality=quality, optimize=True)
return buffer.getvalue()
except Exception as e:
logger.error(f"Failed to compress image bytes: {e}")
raise CompressionError(f"Failed to compress image: {e}")
def should_compress(self, file_path: str) -> bool:
"""
判断图片是否需要压缩
WHERE 图片文件较大超过5MB THEN File_Transfer_Module SHALL 提供压缩选项
Args:
file_path: 文件路径
Returns:
如果文件大于5MB返回True
"""
try:
file_size = os.path.getsize(file_path)
return file_size > self.LARGE_IMAGE_THRESHOLD
except Exception:
return False
# ==================== 图片操作 (需求 5.6) ====================
def rotate_image(
self,
file_path: str,
angle: float,
output_path: Optional[str] = None,
expand: bool = True
) -> str:
"""
旋转图片
实现图片旋转操作 (需求 5.6)
Args:
file_path: 原始图片路径
angle: 旋转角度逆时针
output_path: 输出路径
expand: 是否扩展画布以容纳旋转后的图片
Returns:
旋转后的文件路径
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
rotated = img.rotate(angle, expand=expand, resample=Image.Resampling.BICUBIC)
if output_path is None:
file_name = Path(file_path).stem
ext = Path(file_path).suffix
output_path = str(self._cache_dir / f"{file_name}_rotated{ext}")
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 保持原始格式
fmt = img.format or 'JPEG'
if fmt == 'JPEG' and rotated.mode == 'RGBA':
rotated = rotated.convert('RGB')
rotated.save(output_path, fmt)
logger.info(f"Image rotated {angle} degrees: {output_path}")
return output_path
except Exception as e:
logger.error(f"Failed to rotate image: {e}")
raise ImageProcessorError(f"Failed to rotate image: {e}")
def resize_image(
self,
file_path: str,
size: Tuple[int, int],
output_path: Optional[str] = None,
preserve_aspect_ratio: bool = True
) -> str:
"""
调整图片大小
实现图片缩放操作 (需求 5.6)
Args:
file_path: 原始图片路径
size: 目标大小 (width, height)
output_path: 输出路径
preserve_aspect_ratio: 是否保持宽高比
Returns:
调整后的文件路径
"""
if not os.path.exists(file_path):
raise ImageNotFoundError(f"Image file not found: {file_path}")
try:
with Image.open(file_path) as img:
if preserve_aspect_ratio:
img.thumbnail(size, Image.Resampling.LANCZOS)
resized = img.copy()
else:
resized = img.resize(size, Image.Resampling.LANCZOS)
if output_path is None:
file_name = Path(file_path).stem
ext = Path(file_path).suffix
output_path = str(self._cache_dir / f"{file_name}_resized{ext}")
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
# 保持原始格式
fmt = img.format or 'JPEG'
if fmt == 'JPEG' and resized.mode == 'RGBA':
resized = resized.convert('RGB')
resized.save(output_path, fmt)
logger.info(f"Image resized to {resized.size}: {output_path}")
return output_path
except Exception as e:
logger.error(f"Failed to resize image: {e}")
raise ImageProcessorError(f"Failed to resize image: {e}")
# ==================== 工具方法 ====================
def get_supported_formats(self) -> list:
"""
获取支持的图片格式列表
Returns:
支持的格式列表
"""
return [fmt.value for fmt in self.SUPPORTED_FORMATS]
def get_supported_extensions(self) -> list:
"""
获取支持的文件扩展名列表
Returns:
支持的扩展名列表
"""
extensions = []
for fmt in self.SUPPORTED_FORMATS:
extensions.extend(FORMAT_EXTENSIONS.get(fmt, []))
return extensions
def clear_cache(self) -> int:
"""
清理缓存目录中的临时文件
Returns:
删除的文件数量
"""
count = 0
try:
for file in self._cache_dir.glob("*_thumb.*"):
file.unlink()
count += 1
for file in self._cache_dir.glob("*_compressed.*"):
file.unlink()
count += 1
for file in self._cache_dir.glob("*_rotated.*"):
file.unlink()
count += 1
for file in self._cache_dir.glob("*_resized.*"):
file.unlink()
count += 1
logger.info(f"Cleared {count} cached files")
return count
except Exception as e:
logger.error(f"Failed to clear cache: {e}")
return count

File diff suppressed because it is too large Load Diff

@ -11,6 +11,10 @@ PyQt6>=6.5.0
PyAudio>=0.2.13
opencv-python>=4.8.0
ffmpeg-python>=0.2.0
mutagen>=1.47.0
# Image Processing
Pillow>=10.0.0
# Encryption
cryptography>=41.0.0

@ -0,0 +1,572 @@
# P2P Network Communication - Image Processor Tests
"""
图片处理模块测试
测试图片格式检测缩略图生成和图片压缩功能
需求: 5.1, 5.5
"""
import io
import os
import tempfile
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
# 尝试导入PIL如果不可用则跳过测试
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
from client.image_processor import (
ImageProcessor,
ImageFormat,
ImageInfo,
ImageProcessorError,
UnsupportedFormatError,
ImageNotFoundError,
ThumbnailGenerationError,
CompressionError,
IMAGE_SIGNATURES,
FORMAT_EXTENSIONS,
)
from config import ClientConfig
# 如果PIL不可用跳过所有测试
pytestmark = pytest.mark.skipif(not PIL_AVAILABLE, reason="Pillow not installed")
class TestImageProcessorInit:
"""图片处理器初始化测试"""
def test_init_with_default_config(self):
"""测试使用默认配置初始化"""
processor = ImageProcessor()
assert processor.config is not None
assert processor._thumbnail_size == (200, 200)
def test_init_with_custom_config(self):
"""测试使用自定义配置初始化"""
config = ClientConfig(thumbnail_size=(100, 100))
processor = ImageProcessor(config=config)
assert processor._thumbnail_size == (100, 100)
def test_supported_formats(self):
"""测试支持的格式列表"""
processor = ImageProcessor()
assert ImageFormat.JPEG in processor.SUPPORTED_FORMATS
assert ImageFormat.PNG in processor.SUPPORTED_FORMATS
assert ImageFormat.GIF in processor.SUPPORTED_FORMATS
assert ImageFormat.BMP in processor.SUPPORTED_FORMATS
class TestFormatDetection:
"""图片格式检测测试"""
def setup_method(self):
"""每个测试前创建处理器"""
self.processor = ImageProcessor()
self.temp_files = []
def teardown_method(self):
"""每个测试后清理临时文件"""
for f in self.temp_files:
if os.path.exists(f):
os.unlink(f)
def _create_test_image(self, format: str, size: tuple = (100, 100)) -> str:
"""创建测试图片"""
img = Image.new('RGB', size, color='red')
with tempfile.NamedTemporaryFile(suffix=f'.{format.lower()}', delete=False) as f:
temp_path = f.name
img.save(temp_path, format)
self.temp_files.append(temp_path)
return temp_path
def test_detect_jpeg_format(self):
"""测试检测JPEG格式"""
temp_path = self._create_test_image('JPEG')
fmt = self.processor.detect_format(temp_path)
assert fmt == ImageFormat.JPEG
def test_detect_png_format(self):
"""测试检测PNG格式"""
temp_path = self._create_test_image('PNG')
fmt = self.processor.detect_format(temp_path)
assert fmt == ImageFormat.PNG
def test_detect_gif_format(self):
"""测试检测GIF格式"""
temp_path = self._create_test_image('GIF')
fmt = self.processor.detect_format(temp_path)
assert fmt == ImageFormat.GIF
def test_detect_bmp_format(self):
"""测试检测BMP格式"""
temp_path = self._create_test_image('BMP')
fmt = self.processor.detect_format(temp_path)
assert fmt == ImageFormat.BMP
def test_detect_format_nonexistent_file(self):
"""测试检测不存在的文件"""
with pytest.raises(ImageNotFoundError):
self.processor.detect_format("/nonexistent/image.jpg")
def test_detect_format_from_extension(self):
"""测试通过扩展名检测格式"""
assert self.processor.detect_format_from_extension("test.jpg") == ImageFormat.JPEG
assert self.processor.detect_format_from_extension("test.jpeg") == ImageFormat.JPEG
assert self.processor.detect_format_from_extension("test.png") == ImageFormat.PNG
assert self.processor.detect_format_from_extension("test.gif") == ImageFormat.GIF
assert self.processor.detect_format_from_extension("test.bmp") == ImageFormat.BMP
assert self.processor.detect_format_from_extension("test.txt") == ImageFormat.UNKNOWN
def test_is_supported_format(self):
"""测试检查是否为支持的格式"""
temp_path = self._create_test_image('JPEG')
assert self.processor.is_supported_format(temp_path) is True
assert self.processor.is_supported_format("/nonexistent/file.jpg") is False
def test_get_image_info(self):
"""测试获取图片信息"""
temp_path = self._create_test_image('JPEG', size=(200, 150))
info = self.processor.get_image_info(temp_path)
assert info.format == ImageFormat.JPEG
assert info.width == 200
assert info.height == 150
assert info.mode == 'RGB'
assert info.file_size > 0
def test_get_image_info_nonexistent(self):
"""测试获取不存在文件的信息"""
with pytest.raises(ImageNotFoundError):
self.processor.get_image_info("/nonexistent/image.jpg")
class TestThumbnailGeneration:
"""缩略图生成测试"""
def setup_method(self):
"""每个测试前创建处理器"""
self.processor = ImageProcessor()
self.temp_files = []
def teardown_method(self):
"""每个测试后清理临时文件"""
for f in self.temp_files:
if os.path.exists(f):
os.unlink(f)
# 清理缓存
self.processor.clear_cache()
def _create_test_image(self, format: str, size: tuple = (800, 600)) -> str:
"""创建测试图片"""
img = Image.new('RGB', size, color='blue')
with tempfile.NamedTemporaryFile(suffix=f'.{format.lower()}', delete=False) as f:
temp_path = f.name
img.save(temp_path, format)
self.temp_files.append(temp_path)
return temp_path
def test_generate_thumbnail_default_size(self):
"""测试生成默认大小的缩略图"""
temp_path = self._create_test_image('JPEG')
thumb_path = self.processor.generate_thumbnail(temp_path)
self.temp_files.append(thumb_path)
assert os.path.exists(thumb_path)
with Image.open(thumb_path) as thumb:
assert thumb.width <= 200
assert thumb.height <= 200
def test_generate_thumbnail_custom_size(self):
"""测试生成自定义大小的缩略图"""
temp_path = self._create_test_image('JPEG')
thumb_path = self.processor.generate_thumbnail(temp_path, size=(100, 100))
self.temp_files.append(thumb_path)
with Image.open(thumb_path) as thumb:
assert thumb.width <= 100
assert thumb.height <= 100
def test_generate_thumbnail_preserve_aspect_ratio(self):
"""测试保持宽高比生成缩略图"""
temp_path = self._create_test_image('JPEG', size=(800, 400))
thumb_path = self.processor.generate_thumbnail(
temp_path,
size=(200, 200),
preserve_aspect_ratio=True
)
self.temp_files.append(thumb_path)
with Image.open(thumb_path) as thumb:
# 宽高比应该接近2:1
ratio = thumb.width / thumb.height
assert 1.9 <= ratio <= 2.1
def test_generate_thumbnail_custom_output_path(self):
"""测试指定输出路径生成缩略图"""
temp_path = self._create_test_image('JPEG')
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
output_path = f.name
self.temp_files.append(output_path)
result_path = self.processor.generate_thumbnail(temp_path, output_path=output_path)
assert result_path == output_path
assert os.path.exists(output_path)
def test_generate_thumbnail_nonexistent_file(self):
"""测试为不存在的文件生成缩略图"""
with pytest.raises(ImageNotFoundError):
self.processor.generate_thumbnail("/nonexistent/image.jpg")
def test_generate_thumbnail_bytes(self):
"""测试生成缩略图字节数据"""
temp_path = self._create_test_image('JPEG')
thumb_bytes = self.processor.generate_thumbnail_bytes(temp_path)
assert isinstance(thumb_bytes, bytes)
assert len(thumb_bytes) > 0
# 验证是有效的JPEG
with Image.open(io.BytesIO(thumb_bytes)) as img:
assert img.format == 'JPEG'
def test_generate_thumbnail_rgba_image(self):
"""测试为RGBA图片生成缩略图"""
# 创建RGBA图片
img = Image.new('RGBA', (400, 300), color=(255, 0, 0, 128))
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
temp_path = f.name
img.save(temp_path, 'PNG')
self.temp_files.append(temp_path)
thumb_path = self.processor.generate_thumbnail(temp_path)
self.temp_files.append(thumb_path)
assert os.path.exists(thumb_path)
class TestImageCompression:
"""图片压缩测试"""
def setup_method(self):
"""每个测试前创建处理器"""
self.processor = ImageProcessor()
self.temp_files = []
def teardown_method(self):
"""每个测试后清理临时文件"""
for f in self.temp_files:
if os.path.exists(f):
os.unlink(f)
self.processor.clear_cache()
def _create_test_image(self, size: tuple = (1000, 800)) -> str:
"""创建测试图片"""
# 创建有内容的图片以便压缩有效果
img = Image.new('RGB', size)
pixels = img.load()
for i in range(size[0]):
for j in range(size[1]):
pixels[i, j] = ((i * j) % 256, (i + j) % 256, (i - j) % 256)
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
temp_path = f.name
img.save(temp_path, 'JPEG', quality=100)
self.temp_files.append(temp_path)
return temp_path
def test_compress_image_default_quality(self):
"""测试默认质量压缩"""
temp_path = self._create_test_image()
original_size = os.path.getsize(temp_path)
compressed_path = self.processor.compress_image(temp_path)
self.temp_files.append(compressed_path)
assert os.path.exists(compressed_path)
compressed_size = os.path.getsize(compressed_path)
# 压缩后应该更小
assert compressed_size <= original_size
def test_compress_image_low_quality(self):
"""测试低质量压缩"""
temp_path = self._create_test_image()
compressed_path = self.processor.compress_image(temp_path, quality=30)
self.temp_files.append(compressed_path)
assert os.path.exists(compressed_path)
def test_compress_image_with_max_size(self):
"""测试带最大尺寸的压缩"""
temp_path = self._create_test_image(size=(2000, 1500))
compressed_path = self.processor.compress_image(
temp_path,
max_size=(800, 600)
)
self.temp_files.append(compressed_path)
with Image.open(compressed_path) as img:
assert img.width <= 800
assert img.height <= 600
def test_compress_image_custom_output_path(self):
"""测试指定输出路径压缩"""
temp_path = self._create_test_image()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
output_path = f.name
self.temp_files.append(output_path)
result_path = self.processor.compress_image(temp_path, output_path=output_path)
assert result_path == output_path
assert os.path.exists(output_path)
def test_compress_image_nonexistent_file(self):
"""测试压缩不存在的文件"""
with pytest.raises(ImageNotFoundError):
self.processor.compress_image("/nonexistent/image.jpg")
def test_compress_image_bytes(self):
"""测试压缩图片字节数据"""
temp_path = self._create_test_image()
with open(temp_path, 'rb') as f:
original_bytes = f.read()
compressed_bytes = self.processor.compress_image_bytes(original_bytes, quality=50)
assert isinstance(compressed_bytes, bytes)
assert len(compressed_bytes) < len(original_bytes)
def test_should_compress(self):
"""测试判断是否需要压缩"""
# 创建小文件
small_img = Image.new('RGB', (10, 10), color='red')
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
small_path = f.name
small_img.save(small_path, 'JPEG')
self.temp_files.append(small_path)
assert self.processor.should_compress(small_path) is False
assert self.processor.should_compress("/nonexistent/file.jpg") is False
class TestImageOperations:
"""图片操作测试"""
def setup_method(self):
"""每个测试前创建处理器"""
self.processor = ImageProcessor()
self.temp_files = []
def teardown_method(self):
"""每个测试后清理临时文件"""
for f in self.temp_files:
if os.path.exists(f):
os.unlink(f)
self.processor.clear_cache()
def _create_test_image(self, size: tuple = (400, 300)) -> str:
"""创建测试图片"""
img = Image.new('RGB', size, color='green')
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
temp_path = f.name
img.save(temp_path, 'JPEG')
self.temp_files.append(temp_path)
return temp_path
def test_rotate_image_90_degrees(self):
"""测试旋转90度"""
temp_path = self._create_test_image(size=(400, 200))
rotated_path = self.processor.rotate_image(temp_path, 90)
self.temp_files.append(rotated_path)
with Image.open(rotated_path) as img:
# 旋转90度后宽高交换
assert img.height >= 400 or img.width >= 200
def test_rotate_image_nonexistent_file(self):
"""测试旋转不存在的文件"""
with pytest.raises(ImageNotFoundError):
self.processor.rotate_image("/nonexistent/image.jpg", 90)
def test_resize_image(self):
"""测试调整图片大小"""
temp_path = self._create_test_image(size=(800, 600))
resized_path = self.processor.resize_image(temp_path, (400, 300))
self.temp_files.append(resized_path)
with Image.open(resized_path) as img:
assert img.width <= 400
assert img.height <= 300
def test_resize_image_nonexistent_file(self):
"""测试调整不存在文件的大小"""
with pytest.raises(ImageNotFoundError):
self.processor.resize_image("/nonexistent/image.jpg", (100, 100))
class TestImageInfo:
"""ImageInfo数据类测试"""
def test_image_info_creation(self):
"""测试创建ImageInfo"""
info = ImageInfo(
path="/path/to/image.jpg",
format=ImageFormat.JPEG,
width=800,
height=600,
file_size=100000,
mode="RGB"
)
assert info.path == "/path/to/image.jpg"
assert info.format == ImageFormat.JPEG
assert info.width == 800
assert info.height == 600
def test_aspect_ratio(self):
"""测试宽高比计算"""
info = ImageInfo(
path="/path/to/image.jpg",
format=ImageFormat.JPEG,
width=800,
height=400,
file_size=100000,
mode="RGB"
)
assert info.aspect_ratio == 2.0
def test_is_large(self):
"""测试大图片判断"""
small_info = ImageInfo(
path="/path/to/small.jpg",
format=ImageFormat.JPEG,
width=100,
height=100,
file_size=1000,
mode="RGB"
)
large_info = ImageInfo(
path="/path/to/large.jpg",
format=ImageFormat.JPEG,
width=4000,
height=3000,
file_size=6 * 1024 * 1024, # 6MB
mode="RGB"
)
assert small_info.is_large is False
assert large_info.is_large is True
def test_to_dict(self):
"""测试转换为字典"""
info = ImageInfo(
path="/path/to/image.jpg",
format=ImageFormat.JPEG,
width=800,
height=600,
file_size=100000,
mode="RGB"
)
d = info.to_dict()
assert d["path"] == "/path/to/image.jpg"
assert d["format"] == "jpeg"
assert d["width"] == 800
assert d["height"] == 600
class TestUtilityMethods:
"""工具方法测试"""
def setup_method(self):
"""每个测试前创建处理器"""
self.processor = ImageProcessor()
def teardown_method(self):
"""每个测试后清理"""
self.processor.clear_cache()
def test_get_supported_formats(self):
"""测试获取支持的格式"""
formats = self.processor.get_supported_formats()
assert "jpeg" in formats
assert "png" in formats
assert "gif" in formats
assert "bmp" in formats
def test_get_supported_extensions(self):
"""测试获取支持的扩展名"""
extensions = self.processor.get_supported_extensions()
assert ".jpg" in extensions
assert ".jpeg" in extensions
assert ".png" in extensions
assert ".gif" in extensions
assert ".bmp" in extensions
def test_clear_cache(self):
"""测试清理缓存"""
# 创建一些缓存文件
cache_dir = self.processor._cache_dir
cache_dir.mkdir(parents=True, exist_ok=True)
test_files = [
cache_dir / "test_thumb.jpg",
cache_dir / "test_compressed.jpg",
]
for f in test_files:
f.touch()
count = self.processor.clear_cache()
assert count >= 2
for f in test_files:
assert not f.exists()

@ -0,0 +1,629 @@
# P2P Network Communication - Media Player Tests
"""
媒体播放器模块测试
测试音频和视频播放功能
需求: 6.1, 6.2, 6.3, 6.4, 6.5, 6.7
"""
import os
import tempfile
import time
import pytest
from pathlib import Path
from unittest.mock import MagicMock, patch
from client.media_player import (
AudioPlayer,
VideoPlayer,
MediaPlayer,
MediaType,
PlaybackState,
AudioFormat,
VideoFormat,
MediaInfo,
MediaPlayerError,
MediaNotFoundError,
UnsupportedMediaFormatError,
PlaybackError,
MediaLoadError,
AUDIO_EXTENSIONS,
VIDEO_EXTENSIONS,
)
from config import ClientConfig
# ==================== Fixtures ====================
@pytest.fixture
def audio_player():
"""创建音频播放器实例"""
player = AudioPlayer()
yield player
player.release()
@pytest.fixture
def video_player():
"""创建视频播放器实例"""
player = VideoPlayer()
yield player
player.release()
@pytest.fixture
def media_player():
"""创建统一媒体播放器实例"""
player = MediaPlayer()
yield player
player.release()
@pytest.fixture
def temp_audio_file():
"""创建临时音频文件"""
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f:
# 写入更多虚拟数据模拟音频文件约1MB估算约60秒
f.write(b'\xff\xfb\x90\x00' + b'\x00' * (1024 * 1024)) # MP3 header + data
temp_path = f.name
yield temp_path
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.fixture
def temp_video_file():
"""创建临时视频文件"""
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f:
# 写入更多虚拟数据模拟视频文件约5MB估算约8秒
f.write(b'\x00\x00\x00\x1c\x66\x74\x79\x70' + b'\x00' * (5 * 1024 * 1024)) # MP4 header + data
temp_path = f.name
yield temp_path
if os.path.exists(temp_path):
os.unlink(temp_path)
# ==================== AudioPlayer Tests (需求 6.1, 6.3, 6.5) ====================
class TestAudioPlayer:
"""音频播放器测试类"""
def test_init(self, audio_player):
"""测试初始化"""
assert audio_player.state == PlaybackState.STOPPED
assert audio_player.position == 0.0
assert audio_player.duration == 0.0
assert audio_player.volume == 1.0
assert audio_player.current_file is None
def test_detect_format_mp3(self, audio_player):
"""测试MP3格式检测 (需求 6.1)"""
assert audio_player.detect_format("test.mp3") == AudioFormat.MP3
assert audio_player.detect_format("test.MP3") == AudioFormat.MP3
def test_detect_format_wav(self, audio_player):
"""测试WAV格式检测 (需求 6.1)"""
assert audio_player.detect_format("test.wav") == AudioFormat.WAV
def test_detect_format_aac(self, audio_player):
"""测试AAC格式检测 (需求 6.1)"""
assert audio_player.detect_format("test.aac") == AudioFormat.AAC
assert audio_player.detect_format("test.m4a") == AudioFormat.AAC
def test_detect_format_flac(self, audio_player):
"""测试FLAC格式检测 (需求 6.1)"""
assert audio_player.detect_format("test.flac") == AudioFormat.FLAC
def test_detect_format_unknown(self, audio_player):
"""测试未知格式检测"""
assert audio_player.detect_format("test.xyz") == AudioFormat.UNKNOWN
def test_is_supported_format(self, audio_player):
"""测试格式支持检查 (需求 6.1)"""
assert audio_player.is_supported_format("test.mp3") is True
assert audio_player.is_supported_format("test.wav") is True
assert audio_player.is_supported_format("test.aac") is True
assert audio_player.is_supported_format("test.flac") is True
assert audio_player.is_supported_format("test.xyz") is False
def test_load_audio_file_not_found(self, audio_player):
"""测试加载不存在的文件"""
with pytest.raises(MediaNotFoundError):
audio_player.load_audio("nonexistent.mp3")
def test_load_audio_unsupported_format(self, audio_player):
"""测试加载不支持的格式"""
with tempfile.NamedTemporaryFile(suffix='.xyz', delete=False) as f:
f.write(b'test data')
temp_path = f.name
try:
with pytest.raises(UnsupportedMediaFormatError):
audio_player.load_audio(temp_path)
finally:
os.unlink(temp_path)
def test_load_audio_success(self, audio_player, temp_audio_file):
"""测试成功加载音频文件 (需求 6.1)"""
result = audio_player.load_audio(temp_audio_file)
assert result is True
assert audio_player.state == PlaybackState.STOPPED
assert audio_player.current_file == temp_audio_file
assert audio_player.media_info is not None
assert audio_player.media_info.media_type == MediaType.AUDIO
def test_play_without_load(self, audio_player):
"""测试未加载时播放"""
with pytest.raises(PlaybackError):
audio_player.play()
def test_play_pause_stop(self, audio_player, temp_audio_file):
"""测试播放控制 (需求 6.3, 6.5)"""
audio_player.load_audio(temp_audio_file)
# 播放
audio_player.play()
assert audio_player.state == PlaybackState.PLAYING
assert audio_player.is_playing is True
# 暂停
audio_player.pause()
assert audio_player.state == PlaybackState.PAUSED
assert audio_player.is_paused is True
# 恢复播放
audio_player.play()
assert audio_player.state == PlaybackState.PLAYING
# 停止
audio_player.stop()
assert audio_player.state == PlaybackState.STOPPED
# 停止后位置重置为0
assert audio_player.position == 0.0
def test_seek(self, audio_player, temp_audio_file):
"""测试进度跳转 (需求 6.5)"""
audio_player.load_audio(temp_audio_file)
# 跳转到指定位置(在停止状态下)
audio_player.seek(5.0)
assert audio_player.position == 5.0
# 跳转到超出范围的位置应该被限制
audio_player.seek(-1.0)
assert audio_player.position == 0.0
# 跳转到超过时长的位置应该被限制到时长
audio_player.seek(audio_player.duration + 10.0)
assert audio_player.position == audio_player.duration
def test_set_volume(self, audio_player):
"""测试音量控制 (需求 6.5)"""
# 正常范围
audio_player.set_volume(0.5)
assert audio_player.volume == 0.5
# 超出范围应该被限制
audio_player.set_volume(1.5)
assert audio_player.volume == 1.0
audio_player.set_volume(-0.5)
assert audio_player.volume == 0.0
def test_state_callback(self, audio_player, temp_audio_file):
"""测试状态回调"""
states = []
audio_player.set_state_callback(lambda s: states.append(s))
audio_player.load_audio(temp_audio_file)
audio_player.play()
audio_player.stop()
assert PlaybackState.LOADING in states
assert PlaybackState.STOPPED in states
assert PlaybackState.PLAYING in states
def test_get_supported_formats(self, audio_player):
"""测试获取支持的格式列表"""
formats = audio_player.get_supported_formats()
assert 'mp3' in formats
assert 'wav' in formats
assert 'aac' in formats
assert 'flac' in formats
def test_get_supported_extensions(self, audio_player):
"""测试获取支持的扩展名列表"""
extensions = audio_player.get_supported_extensions()
assert '.mp3' in extensions
assert '.wav' in extensions
# ==================== VideoPlayer Tests (需求 6.2, 6.4, 6.7) ====================
class TestVideoPlayer:
"""视频播放器测试类"""
def test_init(self, video_player):
"""测试初始化"""
assert video_player.state == PlaybackState.STOPPED
assert video_player.position == 0.0
assert video_player.duration == 0.0
assert video_player.volume == 1.0
assert video_player.current_file is None
assert video_player.is_fullscreen is False
def test_detect_format_mp4(self, video_player):
"""测试MP4格式检测 (需求 6.2)"""
assert video_player.detect_format("test.mp4") == VideoFormat.MP4
assert video_player.detect_format("test.MP4") == VideoFormat.MP4
def test_detect_format_avi(self, video_player):
"""测试AVI格式检测 (需求 6.2)"""
assert video_player.detect_format("test.avi") == VideoFormat.AVI
def test_detect_format_mkv(self, video_player):
"""测试MKV格式检测 (需求 6.2)"""
assert video_player.detect_format("test.mkv") == VideoFormat.MKV
def test_detect_format_mov(self, video_player):
"""测试MOV格式检测 (需求 6.2)"""
assert video_player.detect_format("test.mov") == VideoFormat.MOV
def test_detect_format_unknown(self, video_player):
"""测试未知格式检测"""
assert video_player.detect_format("test.xyz") == VideoFormat.UNKNOWN
def test_is_supported_format(self, video_player):
"""测试格式支持检查 (需求 6.2)"""
assert video_player.is_supported_format("test.mp4") is True
assert video_player.is_supported_format("test.avi") is True
assert video_player.is_supported_format("test.mkv") is True
assert video_player.is_supported_format("test.mov") is True
assert video_player.is_supported_format("test.xyz") is False
def test_load_video_file_not_found(self, video_player):
"""测试加载不存在的文件"""
with pytest.raises(MediaNotFoundError):
video_player.load_video("nonexistent.mp4")
def test_load_video_unsupported_format(self, video_player):
"""测试加载不支持的格式"""
with tempfile.NamedTemporaryFile(suffix='.xyz', delete=False) as f:
f.write(b'test data')
temp_path = f.name
try:
with pytest.raises(UnsupportedMediaFormatError):
video_player.load_video(temp_path)
finally:
os.unlink(temp_path)
def test_load_video_success(self, video_player, temp_video_file):
"""测试成功加载视频文件 (需求 6.2)"""
result = video_player.load_video(temp_video_file)
assert result is True
assert video_player.state == PlaybackState.STOPPED
assert video_player.current_file == temp_video_file
assert video_player.media_info is not None
assert video_player.media_info.media_type == MediaType.VIDEO
def test_play_without_load(self, video_player):
"""测试未加载时播放"""
with pytest.raises(PlaybackError):
video_player.play()
def test_play_pause_stop(self, video_player, temp_video_file):
"""测试播放控制 (需求 6.4, 6.5)"""
video_player.load_video(temp_video_file)
# 播放
video_player.play()
assert video_player.state == PlaybackState.PLAYING
assert video_player.is_playing is True
# 暂停
video_player.pause()
assert video_player.state == PlaybackState.PAUSED
assert video_player.is_paused is True
# 恢复播放
video_player.play()
assert video_player.state == PlaybackState.PLAYING
# 停止
video_player.stop()
assert video_player.state == PlaybackState.STOPPED
assert video_player.position == 0.0
def test_seek(self, video_player, temp_video_file):
"""测试进度跳转 (需求 6.5)"""
video_player.load_video(temp_video_file)
# 跳转到指定位置(在停止状态下)
video_player.seek(5.0)
assert video_player.position == 5.0
# 跳转到超出范围的位置应该被限制
video_player.seek(-1.0)
assert video_player.position == 0.0
# 跳转到超过时长的位置应该被限制到时长
video_player.seek(video_player.duration + 10.0)
assert video_player.position == video_player.duration
def test_set_volume(self, video_player):
"""测试音量控制 (需求 6.5)"""
# 正常范围
video_player.set_volume(0.5)
assert video_player.volume == 0.5
# 超出范围应该被限制
video_player.set_volume(1.5)
assert video_player.volume == 1.0
video_player.set_volume(-0.5)
assert video_player.volume == 0.0
def test_fullscreen(self, video_player, temp_video_file):
"""测试全屏模式 (需求 6.7)"""
video_player.load_video(temp_video_file)
# 启用全屏
video_player.set_fullscreen(True)
assert video_player.is_fullscreen is True
# 禁用全屏
video_player.set_fullscreen(False)
assert video_player.is_fullscreen is False
# 切换全屏
result = video_player.toggle_fullscreen()
assert result is True
assert video_player.is_fullscreen is True
result = video_player.toggle_fullscreen()
assert result is False
assert video_player.is_fullscreen is False
def test_video_size(self, video_player, temp_video_file):
"""测试视频尺寸获取"""
video_player.load_video(temp_video_file)
width, height = video_player.video_size
assert width > 0
assert height > 0
def test_get_supported_formats(self, video_player):
"""测试获取支持的格式列表"""
formats = video_player.get_supported_formats()
assert 'mp4' in formats
assert 'avi' in formats
assert 'mkv' in formats
assert 'mov' in formats
def test_get_supported_extensions(self, video_player):
"""测试获取支持的扩展名列表"""
extensions = video_player.get_supported_extensions()
assert '.mp4' in extensions
assert '.avi' in extensions
# ==================== MediaPlayer Tests (统一接口) ====================
class TestMediaPlayer:
"""统一媒体播放器测试类"""
def test_init(self, media_player):
"""测试初始化"""
assert media_player.state == PlaybackState.STOPPED
assert media_player.media_type is None
def test_detect_media_type_audio(self, media_player):
"""测试音频类型检测"""
assert media_player.detect_media_type("test.mp3") == MediaType.AUDIO
assert media_player.detect_media_type("test.wav") == MediaType.AUDIO
assert media_player.detect_media_type("test.aac") == MediaType.AUDIO
assert media_player.detect_media_type("test.flac") == MediaType.AUDIO
def test_detect_media_type_video(self, media_player):
"""测试视频类型检测"""
assert media_player.detect_media_type("test.mp4") == MediaType.VIDEO
assert media_player.detect_media_type("test.avi") == MediaType.VIDEO
assert media_player.detect_media_type("test.mkv") == MediaType.VIDEO
assert media_player.detect_media_type("test.mov") == MediaType.VIDEO
def test_detect_media_type_unknown(self, media_player):
"""测试未知类型检测"""
assert media_player.detect_media_type("test.xyz") == MediaType.UNKNOWN
def test_is_supported(self, media_player):
"""测试格式支持检查"""
assert media_player.is_supported("test.mp3") is True
assert media_player.is_supported("test.mp4") is True
assert media_player.is_supported("test.xyz") is False
def test_load_audio(self, media_player, temp_audio_file):
"""测试加载音频"""
result = media_player.load_audio(temp_audio_file)
assert result is True
assert media_player.media_type == MediaType.AUDIO
def test_load_video(self, media_player, temp_video_file):
"""测试加载视频"""
result = media_player.load_video(temp_video_file)
assert result is True
assert media_player.media_type == MediaType.VIDEO
def test_auto_load_audio(self, media_player, temp_audio_file):
"""测试自动检测加载音频"""
result = media_player.load(temp_audio_file)
assert result is True
assert media_player.media_type == MediaType.AUDIO
def test_auto_load_video(self, media_player, temp_video_file):
"""测试自动检测加载视频"""
result = media_player.load(temp_video_file)
assert result is True
assert media_player.media_type == MediaType.VIDEO
def test_auto_load_unsupported(self, media_player):
"""测试自动加载不支持的格式"""
with tempfile.NamedTemporaryFile(suffix='.xyz', delete=False) as f:
f.write(b'test data')
temp_path = f.name
try:
with pytest.raises(UnsupportedMediaFormatError):
media_player.load(temp_path)
finally:
os.unlink(temp_path)
def test_play_without_load(self, media_player):
"""测试未加载时播放"""
with pytest.raises(PlaybackError):
media_player.play()
def test_playback_controls_audio(self, media_player, temp_audio_file):
"""测试音频播放控制"""
media_player.load_audio(temp_audio_file)
media_player.play()
assert media_player.is_playing is True
media_player.pause()
# 给一点时间让状态更新
time.sleep(0.05)
assert media_player.is_paused is True
media_player.stop()
assert media_player.state == PlaybackState.STOPPED
def test_playback_controls_video(self, media_player, temp_video_file):
"""测试视频播放控制"""
media_player.load_video(temp_video_file)
media_player.play()
assert media_player.is_playing is True
media_player.pause()
# 给一点时间让状态更新
time.sleep(0.05)
assert media_player.is_paused is True
media_player.stop()
assert media_player.state == PlaybackState.STOPPED
def test_get_supported_formats(self, media_player):
"""测试获取支持的格式列表"""
audio_formats = media_player.get_supported_audio_formats()
video_formats = media_player.get_supported_video_formats()
all_extensions = media_player.get_all_supported_extensions()
assert 'mp3' in audio_formats
assert 'mp4' in video_formats
assert '.mp3' in all_extensions
assert '.mp4' in all_extensions
def test_release(self, media_player, temp_audio_file):
"""测试资源释放"""
media_player.load_audio(temp_audio_file)
media_player.play()
media_player.release()
assert media_player.state == PlaybackState.STOPPED
# ==================== 属性测试 (需求 6.5) ====================
class TestPlaybackStateTransitions:
"""播放状态转换测试 - 属性 8: 媒体播放器控制一致性"""
def test_audio_state_transitions(self, audio_player, temp_audio_file):
"""
测试音频播放器状态转换的确定性
属性 8: 媒体播放器控制一致性
*对于任意*媒体播放状态播放暂停进度跳转操作应该正确改变播放器状态
且状态转换应该是确定性的
验证: 需求 6.5
"""
audio_player.load_audio(temp_audio_file)
# STOPPED -> PLAYING
audio_player.play()
assert audio_player.state == PlaybackState.PLAYING
# PLAYING -> PAUSED
audio_player.pause()
assert audio_player.state == PlaybackState.PAUSED
# PAUSED -> PLAYING
audio_player.play()
assert audio_player.state == PlaybackState.PLAYING
# PLAYING -> STOPPED
audio_player.stop()
assert audio_player.state == PlaybackState.STOPPED
# STOPPED -> PLAYING -> STOPPED
audio_player.play()
audio_player.stop()
assert audio_player.state == PlaybackState.STOPPED
def test_video_state_transitions(self, video_player, temp_video_file):
"""
测试视频播放器状态转换的确定性
属性 8: 媒体播放器控制一致性
验证: 需求 6.5
"""
video_player.load_video(temp_video_file)
# STOPPED -> PLAYING
video_player.play()
assert video_player.state == PlaybackState.PLAYING
# PLAYING -> PAUSED
video_player.pause()
# 给一点时间让状态更新
time.sleep(0.05)
assert video_player.state == PlaybackState.PAUSED
# PAUSED -> PLAYING
video_player.play()
assert video_player.state == PlaybackState.PLAYING
# PLAYING -> STOPPED
video_player.stop()
assert video_player.state == PlaybackState.STOPPED
def test_idempotent_operations(self, audio_player, temp_audio_file):
"""
测试幂等操作
多次调用相同操作应该保持状态一致
"""
audio_player.load_audio(temp_audio_file)
# 多次播放
audio_player.play()
audio_player.play()
assert audio_player.state == PlaybackState.PLAYING
# 多次暂停
audio_player.pause()
audio_player.pause()
assert audio_player.state == PlaybackState.PAUSED
# 多次停止
audio_player.stop()
audio_player.stop()
assert audio_player.state == PlaybackState.STOPPED
Loading…
Cancel
Save