|
|
# P2P Network Communication - Image Processor Tests
|
|
|
"""
|
|
|
图片处理模块测试
|
|
|
测试图片格式检测、缩略图生成和图片压缩功能
|
|
|
|
|
|
需求: 5.1, 5.5
|
|
|
"""
|
|
|
|
|
|
import io
|
|
|
import os
|
|
|
import tempfile
|
|
|
import pytest
|
|
|
from pathlib import Path
|
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
# 尝试导入PIL,如果不可用则跳过测试
|
|
|
try:
|
|
|
from PIL import Image
|
|
|
PIL_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
PIL_AVAILABLE = False
|
|
|
|
|
|
from client.image_processor import (
|
|
|
ImageProcessor,
|
|
|
ImageFormat,
|
|
|
ImageInfo,
|
|
|
ImageProcessorError,
|
|
|
UnsupportedFormatError,
|
|
|
ImageNotFoundError,
|
|
|
ThumbnailGenerationError,
|
|
|
CompressionError,
|
|
|
IMAGE_SIGNATURES,
|
|
|
FORMAT_EXTENSIONS,
|
|
|
)
|
|
|
from config import ClientConfig
|
|
|
|
|
|
|
|
|
# 如果PIL不可用,跳过所有测试
|
|
|
pytestmark = pytest.mark.skipif(not PIL_AVAILABLE, reason="Pillow not installed")
|
|
|
|
|
|
|
|
|
class TestImageProcessorInit:
|
|
|
"""图片处理器初始化测试"""
|
|
|
|
|
|
def test_init_with_default_config(self):
|
|
|
"""测试使用默认配置初始化"""
|
|
|
processor = ImageProcessor()
|
|
|
|
|
|
assert processor.config is not None
|
|
|
assert processor._thumbnail_size == (200, 200)
|
|
|
|
|
|
def test_init_with_custom_config(self):
|
|
|
"""测试使用自定义配置初始化"""
|
|
|
config = ClientConfig(thumbnail_size=(100, 100))
|
|
|
processor = ImageProcessor(config=config)
|
|
|
|
|
|
assert processor._thumbnail_size == (100, 100)
|
|
|
|
|
|
def test_supported_formats(self):
|
|
|
"""测试支持的格式列表"""
|
|
|
processor = ImageProcessor()
|
|
|
|
|
|
assert ImageFormat.JPEG in processor.SUPPORTED_FORMATS
|
|
|
assert ImageFormat.PNG in processor.SUPPORTED_FORMATS
|
|
|
assert ImageFormat.GIF in processor.SUPPORTED_FORMATS
|
|
|
assert ImageFormat.BMP in processor.SUPPORTED_FORMATS
|
|
|
|
|
|
|
|
|
class TestFormatDetection:
|
|
|
"""图片格式检测测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""每个测试前创建处理器"""
|
|
|
self.processor = ImageProcessor()
|
|
|
self.temp_files = []
|
|
|
|
|
|
def teardown_method(self):
|
|
|
"""每个测试后清理临时文件"""
|
|
|
for f in self.temp_files:
|
|
|
if os.path.exists(f):
|
|
|
os.unlink(f)
|
|
|
|
|
|
def _create_test_image(self, format: str, size: tuple = (100, 100)) -> str:
|
|
|
"""创建测试图片"""
|
|
|
img = Image.new('RGB', size, color='red')
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=f'.{format.lower()}', delete=False) as f:
|
|
|
temp_path = f.name
|
|
|
|
|
|
img.save(temp_path, format)
|
|
|
self.temp_files.append(temp_path)
|
|
|
return temp_path
|
|
|
|
|
|
def test_detect_jpeg_format(self):
|
|
|
"""测试检测JPEG格式"""
|
|
|
temp_path = self._create_test_image('JPEG')
|
|
|
|
|
|
fmt = self.processor.detect_format(temp_path)
|
|
|
|
|
|
assert fmt == ImageFormat.JPEG
|
|
|
|
|
|
def test_detect_png_format(self):
|
|
|
"""测试检测PNG格式"""
|
|
|
temp_path = self._create_test_image('PNG')
|
|
|
|
|
|
fmt = self.processor.detect_format(temp_path)
|
|
|
|
|
|
assert fmt == ImageFormat.PNG
|
|
|
|
|
|
def test_detect_gif_format(self):
|
|
|
"""测试检测GIF格式"""
|
|
|
temp_path = self._create_test_image('GIF')
|
|
|
|
|
|
fmt = self.processor.detect_format(temp_path)
|
|
|
|
|
|
assert fmt == ImageFormat.GIF
|
|
|
|
|
|
def test_detect_bmp_format(self):
|
|
|
"""测试检测BMP格式"""
|
|
|
temp_path = self._create_test_image('BMP')
|
|
|
|
|
|
fmt = self.processor.detect_format(temp_path)
|
|
|
|
|
|
assert fmt == ImageFormat.BMP
|
|
|
|
|
|
def test_detect_format_nonexistent_file(self):
|
|
|
"""测试检测不存在的文件"""
|
|
|
with pytest.raises(ImageNotFoundError):
|
|
|
self.processor.detect_format("/nonexistent/image.jpg")
|
|
|
|
|
|
def test_detect_format_from_extension(self):
|
|
|
"""测试通过扩展名检测格式"""
|
|
|
assert self.processor.detect_format_from_extension("test.jpg") == ImageFormat.JPEG
|
|
|
assert self.processor.detect_format_from_extension("test.jpeg") == ImageFormat.JPEG
|
|
|
assert self.processor.detect_format_from_extension("test.png") == ImageFormat.PNG
|
|
|
assert self.processor.detect_format_from_extension("test.gif") == ImageFormat.GIF
|
|
|
assert self.processor.detect_format_from_extension("test.bmp") == ImageFormat.BMP
|
|
|
assert self.processor.detect_format_from_extension("test.txt") == ImageFormat.UNKNOWN
|
|
|
|
|
|
def test_is_supported_format(self):
|
|
|
"""测试检查是否为支持的格式"""
|
|
|
temp_path = self._create_test_image('JPEG')
|
|
|
|
|
|
assert self.processor.is_supported_format(temp_path) is True
|
|
|
assert self.processor.is_supported_format("/nonexistent/file.jpg") is False
|
|
|
|
|
|
def test_get_image_info(self):
|
|
|
"""测试获取图片信息"""
|
|
|
temp_path = self._create_test_image('JPEG', size=(200, 150))
|
|
|
|
|
|
info = self.processor.get_image_info(temp_path)
|
|
|
|
|
|
assert info.format == ImageFormat.JPEG
|
|
|
assert info.width == 200
|
|
|
assert info.height == 150
|
|
|
assert info.mode == 'RGB'
|
|
|
assert info.file_size > 0
|
|
|
|
|
|
def test_get_image_info_nonexistent(self):
|
|
|
"""测试获取不存在文件的信息"""
|
|
|
with pytest.raises(ImageNotFoundError):
|
|
|
self.processor.get_image_info("/nonexistent/image.jpg")
|
|
|
|
|
|
|
|
|
class TestThumbnailGeneration:
|
|
|
"""缩略图生成测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""每个测试前创建处理器"""
|
|
|
self.processor = ImageProcessor()
|
|
|
self.temp_files = []
|
|
|
|
|
|
def teardown_method(self):
|
|
|
"""每个测试后清理临时文件"""
|
|
|
for f in self.temp_files:
|
|
|
if os.path.exists(f):
|
|
|
os.unlink(f)
|
|
|
# 清理缓存
|
|
|
self.processor.clear_cache()
|
|
|
|
|
|
def _create_test_image(self, format: str, size: tuple = (800, 600)) -> str:
|
|
|
"""创建测试图片"""
|
|
|
img = Image.new('RGB', size, color='blue')
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=f'.{format.lower()}', delete=False) as f:
|
|
|
temp_path = f.name
|
|
|
|
|
|
img.save(temp_path, format)
|
|
|
self.temp_files.append(temp_path)
|
|
|
return temp_path
|
|
|
|
|
|
def test_generate_thumbnail_default_size(self):
|
|
|
"""测试生成默认大小的缩略图"""
|
|
|
temp_path = self._create_test_image('JPEG')
|
|
|
|
|
|
thumb_path = self.processor.generate_thumbnail(temp_path)
|
|
|
self.temp_files.append(thumb_path)
|
|
|
|
|
|
assert os.path.exists(thumb_path)
|
|
|
|
|
|
with Image.open(thumb_path) as thumb:
|
|
|
assert thumb.width <= 200
|
|
|
assert thumb.height <= 200
|
|
|
|
|
|
def test_generate_thumbnail_custom_size(self):
|
|
|
"""测试生成自定义大小的缩略图"""
|
|
|
temp_path = self._create_test_image('JPEG')
|
|
|
|
|
|
thumb_path = self.processor.generate_thumbnail(temp_path, size=(100, 100))
|
|
|
self.temp_files.append(thumb_path)
|
|
|
|
|
|
with Image.open(thumb_path) as thumb:
|
|
|
assert thumb.width <= 100
|
|
|
assert thumb.height <= 100
|
|
|
|
|
|
def test_generate_thumbnail_preserve_aspect_ratio(self):
|
|
|
"""测试保持宽高比生成缩略图"""
|
|
|
temp_path = self._create_test_image('JPEG', size=(800, 400))
|
|
|
|
|
|
thumb_path = self.processor.generate_thumbnail(
|
|
|
temp_path,
|
|
|
size=(200, 200),
|
|
|
preserve_aspect_ratio=True
|
|
|
)
|
|
|
self.temp_files.append(thumb_path)
|
|
|
|
|
|
with Image.open(thumb_path) as thumb:
|
|
|
# 宽高比应该接近2:1
|
|
|
ratio = thumb.width / thumb.height
|
|
|
assert 1.9 <= ratio <= 2.1
|
|
|
|
|
|
def test_generate_thumbnail_custom_output_path(self):
|
|
|
"""测试指定输出路径生成缩略图"""
|
|
|
temp_path = self._create_test_image('JPEG')
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
|
|
|
output_path = f.name
|
|
|
self.temp_files.append(output_path)
|
|
|
|
|
|
result_path = self.processor.generate_thumbnail(temp_path, output_path=output_path)
|
|
|
|
|
|
assert result_path == output_path
|
|
|
assert os.path.exists(output_path)
|
|
|
|
|
|
def test_generate_thumbnail_nonexistent_file(self):
|
|
|
"""测试为不存在的文件生成缩略图"""
|
|
|
with pytest.raises(ImageNotFoundError):
|
|
|
self.processor.generate_thumbnail("/nonexistent/image.jpg")
|
|
|
|
|
|
def test_generate_thumbnail_bytes(self):
|
|
|
"""测试生成缩略图字节数据"""
|
|
|
temp_path = self._create_test_image('JPEG')
|
|
|
|
|
|
thumb_bytes = self.processor.generate_thumbnail_bytes(temp_path)
|
|
|
|
|
|
assert isinstance(thumb_bytes, bytes)
|
|
|
assert len(thumb_bytes) > 0
|
|
|
|
|
|
# 验证是有效的JPEG
|
|
|
with Image.open(io.BytesIO(thumb_bytes)) as img:
|
|
|
assert img.format == 'JPEG'
|
|
|
|
|
|
def test_generate_thumbnail_rgba_image(self):
|
|
|
"""测试为RGBA图片生成缩略图"""
|
|
|
# 创建RGBA图片
|
|
|
img = Image.new('RGBA', (400, 300), color=(255, 0, 0, 128))
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
|
|
|
temp_path = f.name
|
|
|
|
|
|
img.save(temp_path, 'PNG')
|
|
|
self.temp_files.append(temp_path)
|
|
|
|
|
|
thumb_path = self.processor.generate_thumbnail(temp_path)
|
|
|
self.temp_files.append(thumb_path)
|
|
|
|
|
|
assert os.path.exists(thumb_path)
|
|
|
|
|
|
|
|
|
class TestImageCompression:
|
|
|
"""图片压缩测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""每个测试前创建处理器"""
|
|
|
self.processor = ImageProcessor()
|
|
|
self.temp_files = []
|
|
|
|
|
|
def teardown_method(self):
|
|
|
"""每个测试后清理临时文件"""
|
|
|
for f in self.temp_files:
|
|
|
if os.path.exists(f):
|
|
|
os.unlink(f)
|
|
|
self.processor.clear_cache()
|
|
|
|
|
|
def _create_test_image(self, size: tuple = (1000, 800)) -> str:
|
|
|
"""创建测试图片"""
|
|
|
# 创建有内容的图片以便压缩有效果
|
|
|
img = Image.new('RGB', size)
|
|
|
pixels = img.load()
|
|
|
for i in range(size[0]):
|
|
|
for j in range(size[1]):
|
|
|
pixels[i, j] = ((i * j) % 256, (i + j) % 256, (i - j) % 256)
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
|
|
|
temp_path = f.name
|
|
|
|
|
|
img.save(temp_path, 'JPEG', quality=100)
|
|
|
self.temp_files.append(temp_path)
|
|
|
return temp_path
|
|
|
|
|
|
def test_compress_image_default_quality(self):
|
|
|
"""测试默认质量压缩"""
|
|
|
temp_path = self._create_test_image()
|
|
|
original_size = os.path.getsize(temp_path)
|
|
|
|
|
|
compressed_path = self.processor.compress_image(temp_path)
|
|
|
self.temp_files.append(compressed_path)
|
|
|
|
|
|
assert os.path.exists(compressed_path)
|
|
|
compressed_size = os.path.getsize(compressed_path)
|
|
|
|
|
|
# 压缩后应该更小
|
|
|
assert compressed_size <= original_size
|
|
|
|
|
|
def test_compress_image_low_quality(self):
|
|
|
"""测试低质量压缩"""
|
|
|
temp_path = self._create_test_image()
|
|
|
|
|
|
compressed_path = self.processor.compress_image(temp_path, quality=30)
|
|
|
self.temp_files.append(compressed_path)
|
|
|
|
|
|
assert os.path.exists(compressed_path)
|
|
|
|
|
|
def test_compress_image_with_max_size(self):
|
|
|
"""测试带最大尺寸的压缩"""
|
|
|
temp_path = self._create_test_image(size=(2000, 1500))
|
|
|
|
|
|
compressed_path = self.processor.compress_image(
|
|
|
temp_path,
|
|
|
max_size=(800, 600)
|
|
|
)
|
|
|
self.temp_files.append(compressed_path)
|
|
|
|
|
|
with Image.open(compressed_path) as img:
|
|
|
assert img.width <= 800
|
|
|
assert img.height <= 600
|
|
|
|
|
|
def test_compress_image_custom_output_path(self):
|
|
|
"""测试指定输出路径压缩"""
|
|
|
temp_path = self._create_test_image()
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
|
|
|
output_path = f.name
|
|
|
self.temp_files.append(output_path)
|
|
|
|
|
|
result_path = self.processor.compress_image(temp_path, output_path=output_path)
|
|
|
|
|
|
assert result_path == output_path
|
|
|
assert os.path.exists(output_path)
|
|
|
|
|
|
def test_compress_image_nonexistent_file(self):
|
|
|
"""测试压缩不存在的文件"""
|
|
|
with pytest.raises(ImageNotFoundError):
|
|
|
self.processor.compress_image("/nonexistent/image.jpg")
|
|
|
|
|
|
def test_compress_image_bytes(self):
|
|
|
"""测试压缩图片字节数据"""
|
|
|
temp_path = self._create_test_image()
|
|
|
|
|
|
with open(temp_path, 'rb') as f:
|
|
|
original_bytes = f.read()
|
|
|
|
|
|
compressed_bytes = self.processor.compress_image_bytes(original_bytes, quality=50)
|
|
|
|
|
|
assert isinstance(compressed_bytes, bytes)
|
|
|
assert len(compressed_bytes) < len(original_bytes)
|
|
|
|
|
|
def test_should_compress(self):
|
|
|
"""测试判断是否需要压缩"""
|
|
|
# 创建小文件
|
|
|
small_img = Image.new('RGB', (10, 10), color='red')
|
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
|
|
|
small_path = f.name
|
|
|
small_img.save(small_path, 'JPEG')
|
|
|
self.temp_files.append(small_path)
|
|
|
|
|
|
assert self.processor.should_compress(small_path) is False
|
|
|
assert self.processor.should_compress("/nonexistent/file.jpg") is False
|
|
|
|
|
|
|
|
|
class TestImageOperations:
|
|
|
"""图片操作测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""每个测试前创建处理器"""
|
|
|
self.processor = ImageProcessor()
|
|
|
self.temp_files = []
|
|
|
|
|
|
def teardown_method(self):
|
|
|
"""每个测试后清理临时文件"""
|
|
|
for f in self.temp_files:
|
|
|
if os.path.exists(f):
|
|
|
os.unlink(f)
|
|
|
self.processor.clear_cache()
|
|
|
|
|
|
def _create_test_image(self, size: tuple = (400, 300)) -> str:
|
|
|
"""创建测试图片"""
|
|
|
img = Image.new('RGB', size, color='green')
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
|
|
|
temp_path = f.name
|
|
|
|
|
|
img.save(temp_path, 'JPEG')
|
|
|
self.temp_files.append(temp_path)
|
|
|
return temp_path
|
|
|
|
|
|
def test_rotate_image_90_degrees(self):
|
|
|
"""测试旋转90度"""
|
|
|
temp_path = self._create_test_image(size=(400, 200))
|
|
|
|
|
|
rotated_path = self.processor.rotate_image(temp_path, 90)
|
|
|
self.temp_files.append(rotated_path)
|
|
|
|
|
|
with Image.open(rotated_path) as img:
|
|
|
# 旋转90度后宽高交换
|
|
|
assert img.height >= 400 or img.width >= 200
|
|
|
|
|
|
def test_rotate_image_nonexistent_file(self):
|
|
|
"""测试旋转不存在的文件"""
|
|
|
with pytest.raises(ImageNotFoundError):
|
|
|
self.processor.rotate_image("/nonexistent/image.jpg", 90)
|
|
|
|
|
|
def test_resize_image(self):
|
|
|
"""测试调整图片大小"""
|
|
|
temp_path = self._create_test_image(size=(800, 600))
|
|
|
|
|
|
resized_path = self.processor.resize_image(temp_path, (400, 300))
|
|
|
self.temp_files.append(resized_path)
|
|
|
|
|
|
with Image.open(resized_path) as img:
|
|
|
assert img.width <= 400
|
|
|
assert img.height <= 300
|
|
|
|
|
|
def test_resize_image_nonexistent_file(self):
|
|
|
"""测试调整不存在文件的大小"""
|
|
|
with pytest.raises(ImageNotFoundError):
|
|
|
self.processor.resize_image("/nonexistent/image.jpg", (100, 100))
|
|
|
|
|
|
|
|
|
class TestImageInfo:
|
|
|
"""ImageInfo数据类测试"""
|
|
|
|
|
|
def test_image_info_creation(self):
|
|
|
"""测试创建ImageInfo"""
|
|
|
info = ImageInfo(
|
|
|
path="/path/to/image.jpg",
|
|
|
format=ImageFormat.JPEG,
|
|
|
width=800,
|
|
|
height=600,
|
|
|
file_size=100000,
|
|
|
mode="RGB"
|
|
|
)
|
|
|
|
|
|
assert info.path == "/path/to/image.jpg"
|
|
|
assert info.format == ImageFormat.JPEG
|
|
|
assert info.width == 800
|
|
|
assert info.height == 600
|
|
|
|
|
|
def test_aspect_ratio(self):
|
|
|
"""测试宽高比计算"""
|
|
|
info = ImageInfo(
|
|
|
path="/path/to/image.jpg",
|
|
|
format=ImageFormat.JPEG,
|
|
|
width=800,
|
|
|
height=400,
|
|
|
file_size=100000,
|
|
|
mode="RGB"
|
|
|
)
|
|
|
|
|
|
assert info.aspect_ratio == 2.0
|
|
|
|
|
|
def test_is_large(self):
|
|
|
"""测试大图片判断"""
|
|
|
small_info = ImageInfo(
|
|
|
path="/path/to/small.jpg",
|
|
|
format=ImageFormat.JPEG,
|
|
|
width=100,
|
|
|
height=100,
|
|
|
file_size=1000,
|
|
|
mode="RGB"
|
|
|
)
|
|
|
|
|
|
large_info = ImageInfo(
|
|
|
path="/path/to/large.jpg",
|
|
|
format=ImageFormat.JPEG,
|
|
|
width=4000,
|
|
|
height=3000,
|
|
|
file_size=6 * 1024 * 1024, # 6MB
|
|
|
mode="RGB"
|
|
|
)
|
|
|
|
|
|
assert small_info.is_large is False
|
|
|
assert large_info.is_large is True
|
|
|
|
|
|
def test_to_dict(self):
|
|
|
"""测试转换为字典"""
|
|
|
info = ImageInfo(
|
|
|
path="/path/to/image.jpg",
|
|
|
format=ImageFormat.JPEG,
|
|
|
width=800,
|
|
|
height=600,
|
|
|
file_size=100000,
|
|
|
mode="RGB"
|
|
|
)
|
|
|
|
|
|
d = info.to_dict()
|
|
|
|
|
|
assert d["path"] == "/path/to/image.jpg"
|
|
|
assert d["format"] == "jpeg"
|
|
|
assert d["width"] == 800
|
|
|
assert d["height"] == 600
|
|
|
|
|
|
|
|
|
class TestUtilityMethods:
|
|
|
"""工具方法测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""每个测试前创建处理器"""
|
|
|
self.processor = ImageProcessor()
|
|
|
|
|
|
def teardown_method(self):
|
|
|
"""每个测试后清理"""
|
|
|
self.processor.clear_cache()
|
|
|
|
|
|
def test_get_supported_formats(self):
|
|
|
"""测试获取支持的格式"""
|
|
|
formats = self.processor.get_supported_formats()
|
|
|
|
|
|
assert "jpeg" in formats
|
|
|
assert "png" in formats
|
|
|
assert "gif" in formats
|
|
|
assert "bmp" in formats
|
|
|
|
|
|
def test_get_supported_extensions(self):
|
|
|
"""测试获取支持的扩展名"""
|
|
|
extensions = self.processor.get_supported_extensions()
|
|
|
|
|
|
assert ".jpg" in extensions
|
|
|
assert ".jpeg" in extensions
|
|
|
assert ".png" in extensions
|
|
|
assert ".gif" in extensions
|
|
|
assert ".bmp" in extensions
|
|
|
|
|
|
def test_clear_cache(self):
|
|
|
"""测试清理缓存"""
|
|
|
# 创建一些缓存文件
|
|
|
cache_dir = self.processor._cache_dir
|
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
test_files = [
|
|
|
cache_dir / "test_thumb.jpg",
|
|
|
cache_dir / "test_compressed.jpg",
|
|
|
]
|
|
|
|
|
|
for f in test_files:
|
|
|
f.touch()
|
|
|
|
|
|
count = self.processor.clear_cache()
|
|
|
|
|
|
assert count >= 2
|
|
|
for f in test_files:
|
|
|
assert not f.exists()
|