# P2P Network Communication - Media Player Module """ 媒体播放器模块 负责音频和视频文件的播放控制 需求: 6.1, 6.2, 6.3, 6.4, 6.5, 6.7 """ import logging import os import threading import time from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Callable, Optional, Tuple from config import ClientConfig # 设置日志 logger = logging.getLogger(__name__) # ==================== 枚举类型 ==================== class MediaType(Enum): """媒体类型枚举""" AUDIO = "audio" VIDEO = "video" UNKNOWN = "unknown" class PlaybackState(Enum): """播放状态枚举""" STOPPED = "stopped" PLAYING = "playing" PAUSED = "paused" LOADING = "loading" ERROR = "error" class AudioFormat(Enum): """支持的音频格式枚举 (需求 6.1)""" MP3 = "mp3" WAV = "wav" AAC = "aac" FLAC = "flac" UNKNOWN = "unknown" class VideoFormat(Enum): """支持的视频格式枚举 (需求 6.2)""" MP4 = "mp4" AVI = "avi" MKV = "mkv" MOV = "mov" UNKNOWN = "unknown" # ==================== 异常类 ==================== class MediaPlayerError(Exception): """媒体播放器错误基类""" pass class MediaNotFoundError(MediaPlayerError): """媒体文件不存在错误""" pass class UnsupportedMediaFormatError(MediaPlayerError): """不支持的媒体格式错误""" pass class PlaybackError(MediaPlayerError): """播放错误""" pass class MediaLoadError(MediaPlayerError): """媒体加载错误""" pass # ==================== 数据结构 ==================== @dataclass class MediaInfo: """媒体信息数据结构""" path: str media_type: MediaType format: str duration: float # 时长(秒) file_size: int # 音频属性 sample_rate: Optional[int] = None channels: Optional[int] = None bitrate: Optional[int] = None # 视频属性 width: Optional[int] = None height: Optional[int] = None fps: Optional[float] = None def to_dict(self) -> dict: """转换为字典""" return { "path": self.path, "media_type": self.media_type.value, "format": self.format, "duration": self.duration, "file_size": self.file_size, "sample_rate": self.sample_rate, "channels": self.channels, "bitrate": self.bitrate, "width": self.width, "height": self.height, "fps": self.fps, } # ==================== 格式映射 ==================== # 音频文件扩展名映射 AUDIO_EXTENSIONS = { '.mp3': AudioFormat.MP3, '.wav': AudioFormat.WAV, '.aac': AudioFormat.AAC, '.m4a': AudioFormat.AAC, '.flac': AudioFormat.FLAC, } # 视频文件扩展名映射 VIDEO_EXTENSIONS = { '.mp4': VideoFormat.MP4, '.avi': VideoFormat.AVI, '.mkv': VideoFormat.MKV, '.mov': VideoFormat.MOV, } # 支持的音频格式集合 SUPPORTED_AUDIO_FORMATS = {AudioFormat.MP3, AudioFormat.WAV, AudioFormat.AAC, AudioFormat.FLAC} # 支持的视频格式集合 SUPPORTED_VIDEO_FORMATS = {VideoFormat.MP4, VideoFormat.AVI, VideoFormat.MKV, VideoFormat.MOV} # 回调类型定义 StateChangeCallback = Callable[[PlaybackState], None] PositionChangeCallback = Callable[[float], None] # ==================== 音频播放器 (需求 6.1, 6.3, 6.5) ==================== class AudioPlayer: """ 音频播放器 实现音频文件的加载和播放控制 需求: - 6.1: 支持常见音频格式(MP3、WAV、AAC、FLAC) - 6.3: 提供内置播放器进行播放 - 6.5: 提供播放、暂停、进度拖动、音量调节功能 """ # 支持的音频格式 SUPPORTED_FORMATS = SUPPORTED_AUDIO_FORMATS def __init__(self, config: Optional[ClientConfig] = None): """ 初始化音频播放器 Args: config: 客户端配置 """ self.config = config or ClientConfig() # 播放状态 self._state: PlaybackState = PlaybackState.STOPPED self._current_file: Optional[str] = None self._media_info: Optional[MediaInfo] = None # 播放控制 self._position: float = 0.0 # 当前播放位置(秒) self._duration: float = 0.0 # 总时长(秒) self._volume: float = 1.0 # 音量 (0.0 - 1.0) # 线程控制 self._playback_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._pause_event = threading.Event() self._pause_event.set() # 初始不暂停 # 回调函数 self._state_callback: Optional[StateChangeCallback] = None self._position_callback: Optional[PositionChangeCallback] = None # 锁 self._lock = threading.RLock() logger.info("AudioPlayer initialized") # ==================== 属性 ==================== @property def state(self) -> PlaybackState: """获取当前播放状态""" return self._state @property def position(self) -> float: """获取当前播放位置(秒)""" return self._position @property def duration(self) -> float: """获取媒体总时长(秒)""" return self._duration @property def volume(self) -> float: """获取当前音量 (0.0 - 1.0)""" return self._volume @property def is_playing(self) -> bool: """是否正在播放""" return self._state == PlaybackState.PLAYING @property def is_paused(self) -> bool: """是否已暂停""" return self._state == PlaybackState.PAUSED @property def current_file(self) -> Optional[str]: """获取当前加载的文件路径""" return self._current_file @property def media_info(self) -> Optional[MediaInfo]: """获取当前媒体信息""" return self._media_info # ==================== 回调设置 ==================== def set_state_callback(self, callback: Optional[StateChangeCallback]) -> None: """设置状态变化回调""" self._state_callback = callback def set_position_callback(self, callback: Optional[PositionChangeCallback]) -> None: """设置播放位置变化回调""" self._position_callback = callback def _notify_state_change(self, state: PlaybackState) -> None: """通知状态变化""" self._state = state if self._state_callback: try: self._state_callback(state) except Exception as e: logger.error(f"State callback error: {e}") def _notify_position_change(self, position: float) -> None: """通知播放位置变化""" self._position = position if self._position_callback: try: self._position_callback(position) except Exception as e: logger.error(f"Position callback error: {e}") # ==================== 格式检测 ==================== def detect_format(self, file_path: str) -> AudioFormat: """ 检测音频文件格式 Args: file_path: 文件路径 Returns: 音频格式 """ ext = Path(file_path).suffix.lower() return AUDIO_EXTENSIONS.get(ext, AudioFormat.UNKNOWN) def is_supported_format(self, file_path: str) -> bool: """ 检查是否为支持的音频格式 Args: file_path: 文件路径 Returns: 是否支持 """ fmt = self.detect_format(file_path) return fmt in self.SUPPORTED_FORMATS # ==================== 加载音频 (需求 6.1) ==================== def load_audio(self, file_path: str) -> bool: """ 加载音频文件 实现 load_audio() 加载音频 (需求 6.1) WHEN 用户选择发送音频文件 THEN File_Transfer_Module SHALL 支持常见音频格式 Args: file_path: 音频文件路径 Returns: 加载是否成功 Raises: MediaNotFoundError: 文件不存在 UnsupportedMediaFormatError: 不支持的格式 MediaLoadError: 加载失败 """ with self._lock: # 检查文件是否存在 if not os.path.exists(file_path): raise MediaNotFoundError(f"Audio file not found: {file_path}") # 检查格式是否支持 audio_format = self.detect_format(file_path) if audio_format not in self.SUPPORTED_FORMATS: raise UnsupportedMediaFormatError( f"Unsupported audio format: {audio_format.value}. " f"Supported formats: {[f.value for f in self.SUPPORTED_FORMATS]}" ) # 停止当前播放 if self._state in (PlaybackState.PLAYING, PlaybackState.PAUSED): self.stop() self._notify_state_change(PlaybackState.LOADING) try: # 获取文件信息 file_size = os.path.getsize(file_path) # 尝试获取音频时长和详细信息 duration, sample_rate, channels, bitrate = self._get_audio_info(file_path) self._media_info = MediaInfo( path=file_path, media_type=MediaType.AUDIO, format=audio_format.value, duration=duration, file_size=file_size, sample_rate=sample_rate, channels=channels, bitrate=bitrate, ) self._current_file = file_path self._duration = duration self._position = 0.0 self._notify_state_change(PlaybackState.STOPPED) logger.info(f"Audio loaded: {file_path} (duration: {duration:.2f}s)") return True except Exception as e: self._notify_state_change(PlaybackState.ERROR) logger.error(f"Failed to load audio: {e}") raise MediaLoadError(f"Failed to load audio: {e}") def _get_audio_info(self, file_path: str) -> Tuple[float, Optional[int], Optional[int], Optional[int]]: """ 获取音频文件信息 Args: file_path: 文件路径 Returns: (duration, sample_rate, channels, bitrate) """ # 尝试使用 mutagen 获取音频信息 try: from mutagen import File as MutagenFile audio = MutagenFile(file_path) if audio is not None: duration = audio.info.length if hasattr(audio.info, 'length') else 0.0 sample_rate = getattr(audio.info, 'sample_rate', None) channels = getattr(audio.info, 'channels', None) bitrate = getattr(audio.info, 'bitrate', None) return duration, sample_rate, channels, bitrate except ImportError: logger.debug("mutagen not available, using fallback") except Exception as e: logger.debug(f"mutagen failed: {e}") # 回退:基于文件大小估算时长 file_size = os.path.getsize(file_path) ext = Path(file_path).suffix.lower() # 估算比特率(粗略估计) estimated_bitrate = { '.mp3': 128000, # 128 kbps '.wav': 1411200, # CD quality '.aac': 128000, '.flac': 800000, }.get(ext, 128000) duration = (file_size * 8) / estimated_bitrate return duration, None, None, estimated_bitrate // 1000 # ==================== 播放控制 (需求 6.3, 6.5) ==================== def play(self) -> None: """ 开始播放 实现播放控制 play (需求 6.3, 6.5) WHEN 音频文件接收完成 THEN Media_Player SHALL 提供内置播放器进行播放 WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能 Raises: PlaybackError: 没有加载媒体或播放失败 """ with self._lock: if self._current_file is None: raise PlaybackError("No audio file loaded") if self._state == PlaybackState.PLAYING: logger.debug("Already playing") return if self._state == PlaybackState.PAUSED: # 从暂停恢复 self._pause_event.set() self._notify_state_change(PlaybackState.PLAYING) logger.info("Playback resumed") return # 开始新的播放 self._stop_event.clear() self._pause_event.set() self._playback_thread = threading.Thread( target=self._playback_loop, daemon=True ) self._playback_thread.start() self._notify_state_change(PlaybackState.PLAYING) logger.info(f"Playback started: {self._current_file}") def pause(self) -> None: """ 暂停播放 实现播放控制 pause (需求 6.5) WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能 """ with self._lock: if self._state != PlaybackState.PLAYING: logger.debug("Not playing, cannot pause") return self._pause_event.clear() self._notify_state_change(PlaybackState.PAUSED) logger.info("Playback paused") def stop(self) -> None: """ 停止播放 实现播放控制 stop (需求 6.5) """ with self._lock: if self._state == PlaybackState.STOPPED: logger.debug("Already stopped") return self._stop_event.set() self._pause_event.set() # 确保线程不会卡在暂停状态 if self._playback_thread and self._playback_thread.is_alive(): self._playback_thread.join(timeout=1.0) self._position = 0.0 self._notify_state_change(PlaybackState.STOPPED) logger.info("Playback stopped") def seek(self, position: float) -> None: """ 跳转到指定位置 实现进度拖动 (需求 6.5) WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能 Args: position: 目标位置(秒) Raises: PlaybackError: 没有加载媒体 """ with self._lock: if self._current_file is None: raise PlaybackError("No audio file loaded") # 限制在有效范围内 position = max(0.0, min(position, self._duration)) self._position = position self._notify_position_change(position) logger.info(f"Seeked to position: {position:.2f}s") def set_volume(self, volume: float) -> None: """ 设置音量 实现音量调节 (需求 6.5) WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能 Args: volume: 音量值 (0.0 - 1.0) """ with self._lock: self._volume = max(0.0, min(1.0, volume)) logger.debug(f"Volume set to: {self._volume:.2f}") def get_duration(self) -> float: """ 获取媒体时长 Returns: 时长(秒) """ return self._duration def get_position(self) -> float: """ 获取当前播放位置 Returns: 当前位置(秒) """ return self._position def _playback_loop(self) -> None: """ 播放循环(在单独线程中运行) 模拟音频播放进度更新 """ update_interval = 0.1 # 100ms 更新一次 while not self._stop_event.is_set(): # 等待暂停事件 self._pause_event.wait() if self._stop_event.is_set(): break # 更新播放位置 with self._lock: self._position += update_interval if self._position >= self._duration: # 播放完成 self._position = self._duration self._notify_position_change(self._position) self._notify_state_change(PlaybackState.STOPPED) logger.info("Playback completed") break self._notify_position_change(self._position) time.sleep(update_interval) # ==================== 工具方法 ==================== def get_supported_formats(self) -> list: """ 获取支持的音频格式列表 Returns: 支持的格式列表 """ return [fmt.value for fmt in self.SUPPORTED_FORMATS] def get_supported_extensions(self) -> list: """ 获取支持的文件扩展名列表 Returns: 支持的扩展名列表 """ return list(AUDIO_EXTENSIONS.keys()) def release(self) -> None: """ 释放资源 """ self.stop() self._current_file = None self._media_info = None self._duration = 0.0 self._position = 0.0 logger.info("AudioPlayer resources released") # ==================== 视频播放器 (需求 6.2, 6.4, 6.7) ==================== class VideoPlayer: """ 视频播放器 实现视频文件的加载和播放控制 需求: - 6.2: 支持常见视频格式(MP4、AVI、MKV、MOV) - 6.4: 提供内置播放器进行播放 - 6.5: 提供播放、暂停、进度拖动、音量调节功能 - 6.7: 支持全屏模式 """ # 支持的视频格式 SUPPORTED_FORMATS = SUPPORTED_VIDEO_FORMATS def __init__(self, config: Optional[ClientConfig] = None): """ 初始化视频播放器 Args: config: 客户端配置 """ self.config = config or ClientConfig() # 播放状态 self._state: PlaybackState = PlaybackState.STOPPED self._current_file: Optional[str] = None self._media_info: Optional[MediaInfo] = None # 播放控制 self._position: float = 0.0 # 当前播放位置(秒) self._duration: float = 0.0 # 总时长(秒) self._volume: float = 1.0 # 音量 (0.0 - 1.0) # 视频属性 self._width: int = 0 self._height: int = 0 self._fps: float = 0.0 self._fullscreen: bool = False # 线程控制 self._playback_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._pause_event = threading.Event() self._pause_event.set() # 初始不暂停 # 回调函数 self._state_callback: Optional[StateChangeCallback] = None self._position_callback: Optional[PositionChangeCallback] = None self._frame_callback: Optional[Callable[[bytes, int, int], None]] = None # 锁 self._lock = threading.RLock() logger.info("VideoPlayer initialized") # ==================== 属性 ==================== @property def state(self) -> PlaybackState: """获取当前播放状态""" return self._state @property def position(self) -> float: """获取当前播放位置(秒)""" return self._position @property def duration(self) -> float: """获取媒体总时长(秒)""" return self._duration @property def volume(self) -> float: """获取当前音量 (0.0 - 1.0)""" return self._volume @property def is_playing(self) -> bool: """是否正在播放""" return self._state == PlaybackState.PLAYING @property def is_paused(self) -> bool: """是否已暂停""" return self._state == PlaybackState.PAUSED @property def is_fullscreen(self) -> bool: """是否全屏模式""" return self._fullscreen @property def current_file(self) -> Optional[str]: """获取当前加载的文件路径""" return self._current_file @property def media_info(self) -> Optional[MediaInfo]: """获取当前媒体信息""" return self._media_info @property def video_size(self) -> Tuple[int, int]: """获取视频尺寸 (width, height)""" return (self._width, self._height) @property def fps(self) -> float: """获取视频帧率""" return self._fps # ==================== 回调设置 ==================== def set_state_callback(self, callback: Optional[StateChangeCallback]) -> None: """设置状态变化回调""" self._state_callback = callback def set_position_callback(self, callback: Optional[PositionChangeCallback]) -> None: """设置播放位置变化回调""" self._position_callback = callback def set_frame_callback(self, callback: Optional[Callable[[bytes, int, int], None]]) -> None: """ 设置帧渲染回调 Args: callback: 回调函数,参数为 (frame_data, width, height) """ self._frame_callback = callback def _notify_state_change(self, state: PlaybackState) -> None: """通知状态变化""" self._state = state if self._state_callback: try: self._state_callback(state) except Exception as e: logger.error(f"State callback error: {e}") def _notify_position_change(self, position: float) -> None: """通知播放位置变化""" self._position = position if self._position_callback: try: self._position_callback(position) except Exception as e: logger.error(f"Position callback error: {e}") # ==================== 格式检测 ==================== def detect_format(self, file_path: str) -> VideoFormat: """ 检测视频文件格式 Args: file_path: 文件路径 Returns: 视频格式 """ ext = Path(file_path).suffix.lower() return VIDEO_EXTENSIONS.get(ext, VideoFormat.UNKNOWN) def is_supported_format(self, file_path: str) -> bool: """ 检查是否为支持的视频格式 Args: file_path: 文件路径 Returns: 是否支持 """ fmt = self.detect_format(file_path) return fmt in self.SUPPORTED_FORMATS # ==================== 加载视频 (需求 6.2) ==================== def load_video(self, file_path: str) -> bool: """ 加载视频文件 实现 load_video() 加载视频 (需求 6.2) WHEN 用户选择发送视频文件 THEN File_Transfer_Module SHALL 支持常见视频格式 Args: file_path: 视频文件路径 Returns: 加载是否成功 Raises: MediaNotFoundError: 文件不存在 UnsupportedMediaFormatError: 不支持的格式 MediaLoadError: 加载失败 """ with self._lock: # 检查文件是否存在 if not os.path.exists(file_path): raise MediaNotFoundError(f"Video file not found: {file_path}") # 检查格式是否支持 video_format = self.detect_format(file_path) if video_format not in self.SUPPORTED_FORMATS: raise UnsupportedMediaFormatError( f"Unsupported video format: {video_format.value}. " f"Supported formats: {[f.value for f in self.SUPPORTED_FORMATS]}" ) # 停止当前播放 if self._state in (PlaybackState.PLAYING, PlaybackState.PAUSED): self.stop() self._notify_state_change(PlaybackState.LOADING) try: # 获取文件信息 file_size = os.path.getsize(file_path) # 尝试获取视频时长和详细信息 duration, width, height, fps = self._get_video_info(file_path) self._media_info = MediaInfo( path=file_path, media_type=MediaType.VIDEO, format=video_format.value, duration=duration, file_size=file_size, width=width, height=height, fps=fps, ) self._current_file = file_path self._duration = duration self._position = 0.0 self._width = width or 0 self._height = height or 0 self._fps = fps or 0.0 self._notify_state_change(PlaybackState.STOPPED) logger.info(f"Video loaded: {file_path} (duration: {duration:.2f}s, " f"size: {width}x{height}, fps: {fps:.2f})") return True except Exception as e: self._notify_state_change(PlaybackState.ERROR) logger.error(f"Failed to load video: {e}") raise MediaLoadError(f"Failed to load video: {e}") def _get_video_info(self, file_path: str) -> Tuple[float, Optional[int], Optional[int], Optional[float]]: """ 获取视频文件信息 Args: file_path: 文件路径 Returns: (duration, width, height, fps) """ # 尝试使用 OpenCV 获取视频信息 try: import cv2 cap = cv2.VideoCapture(file_path) if cap.isOpened(): fps = cap.get(cv2.CAP_PROP_FPS) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration = frame_count / fps if fps > 0 else 0.0 cap.release() return duration, width, height, fps except ImportError: logger.debug("OpenCV not available, using fallback") except Exception as e: logger.debug(f"OpenCV failed: {e}") # 回退:基于文件大小估算 file_size = os.path.getsize(file_path) # 假设平均比特率为 5 Mbps estimated_bitrate = 5 * 1024 * 1024 duration = (file_size * 8) / estimated_bitrate # 默认视频属性 return duration, 1920, 1080, 30.0 # ==================== 播放控制 (需求 6.4, 6.5) ==================== def play(self) -> None: """ 开始播放 实现视频渲染和播放控制 (需求 6.4) WHEN 视频文件接收完成 THEN Media_Player SHALL 提供内置播放器进行播放 Raises: PlaybackError: 没有加载媒体或播放失败 """ with self._lock: if self._current_file is None: raise PlaybackError("No video file loaded") if self._state == PlaybackState.PLAYING: logger.debug("Already playing") return if self._state == PlaybackState.PAUSED: # 从暂停恢复 self._pause_event.set() self._notify_state_change(PlaybackState.PLAYING) logger.info("Playback resumed") return # 开始新的播放 self._stop_event.clear() self._pause_event.set() self._playback_thread = threading.Thread( target=self._playback_loop, daemon=True ) self._playback_thread.start() self._notify_state_change(PlaybackState.PLAYING) logger.info(f"Playback started: {self._current_file}") def pause(self) -> None: """ 暂停播放 实现播放控制 pause (需求 6.5) """ with self._lock: if self._state != PlaybackState.PLAYING: logger.debug("Not playing, cannot pause") return self._pause_event.clear() self._notify_state_change(PlaybackState.PAUSED) logger.info("Playback paused") def stop(self) -> None: """ 停止播放 实现播放控制 stop (需求 6.5) """ with self._lock: if self._state == PlaybackState.STOPPED: logger.debug("Already stopped") return self._stop_event.set() self._pause_event.set() # 确保线程不会卡在暂停状态 if self._playback_thread and self._playback_thread.is_alive(): self._playback_thread.join(timeout=1.0) self._position = 0.0 self._notify_state_change(PlaybackState.STOPPED) logger.info("Playback stopped") def seek(self, position: float) -> None: """ 跳转到指定位置 实现进度拖动 (需求 6.5) Args: position: 目标位置(秒) Raises: PlaybackError: 没有加载媒体 """ with self._lock: if self._current_file is None: raise PlaybackError("No video file loaded") # 限制在有效范围内 position = max(0.0, min(position, self._duration)) self._position = position self._notify_position_change(position) logger.info(f"Seeked to position: {position:.2f}s") def set_volume(self, volume: float) -> None: """ 设置音量 实现音量调节 (需求 6.5) Args: volume: 音量值 (0.0 - 1.0) """ with self._lock: self._volume = max(0.0, min(1.0, volume)) logger.debug(f"Volume set to: {self._volume:.2f}") def get_duration(self) -> float: """ 获取媒体时长 Returns: 时长(秒) """ return self._duration def get_position(self) -> float: """ 获取当前播放位置 Returns: 当前位置(秒) """ return self._position # ==================== 全屏模式 (需求 6.7) ==================== def set_fullscreen(self, enabled: bool) -> None: """ 设置全屏模式 实现全屏模式 (需求 6.7) WHEN 播放视频 THEN Media_Player SHALL 支持全屏模式 Args: enabled: 是否启用全屏 """ with self._lock: self._fullscreen = enabled logger.info(f"Fullscreen mode: {'enabled' if enabled else 'disabled'}") def toggle_fullscreen(self) -> bool: """ 切换全屏模式 Returns: 切换后的全屏状态 """ with self._lock: self._fullscreen = not self._fullscreen logger.info(f"Fullscreen toggled: {'enabled' if self._fullscreen else 'disabled'}") return self._fullscreen # ==================== 播放循环 ==================== def _playback_loop(self) -> None: """ 播放循环(在单独线程中运行) 模拟视频播放进度更新 """ frame_interval = 1.0 / max(self._fps, 30.0) if self._fps > 0 else 1.0 / 30.0 while not self._stop_event.is_set(): # 等待暂停事件 self._pause_event.wait() if self._stop_event.is_set(): break # 更新播放位置 with self._lock: self._position += frame_interval if self._position >= self._duration: # 播放完成 self._position = self._duration self._notify_position_change(self._position) self._notify_state_change(PlaybackState.STOPPED) logger.info("Playback completed") break self._notify_position_change(self._position) time.sleep(frame_interval) # ==================== 工具方法 ==================== def get_supported_formats(self) -> list: """ 获取支持的视频格式列表 Returns: 支持的格式列表 """ return [fmt.value for fmt in self.SUPPORTED_FORMATS] def get_supported_extensions(self) -> list: """ 获取支持的文件扩展名列表 Returns: 支持的扩展名列表 """ return list(VIDEO_EXTENSIONS.keys()) def release(self) -> None: """ 释放资源 """ self.stop() self._current_file = None self._media_info = None self._duration = 0.0 self._position = 0.0 self._width = 0 self._height = 0 self._fps = 0.0 self._fullscreen = False logger.info("VideoPlayer resources released") # ==================== 统一媒体播放器 ==================== class MediaPlayer: """ 统一媒体播放器 整合音频和视频播放功能,提供统一的接口 需求: - 6.1: 支持常见音频格式 - 6.2: 支持常见视频格式 - 6.3, 6.4: 提供内置播放器 - 6.5: 提供播放控制功能 - 6.7: 支持全屏模式 """ def __init__(self, config: Optional[ClientConfig] = None): """ 初始化媒体播放器 Args: config: 客户端配置 """ self.config = config or ClientConfig() # 内部播放器 self._audio_player = AudioPlayer(config) self._video_player = VideoPlayer(config) # 当前活动的播放器 self._active_player: Optional[str] = None # "audio" or "video" logger.info("MediaPlayer initialized") # ==================== 属性 ==================== @property def state(self) -> PlaybackState: """获取当前播放状态""" if self._active_player == "audio": return self._audio_player.state elif self._active_player == "video": return self._video_player.state return PlaybackState.STOPPED @property def position(self) -> float: """获取当前播放位置(秒)""" if self._active_player == "audio": return self._audio_player.position elif self._active_player == "video": return self._video_player.position return 0.0 @property def duration(self) -> float: """获取媒体总时长(秒)""" if self._active_player == "audio": return self._audio_player.duration elif self._active_player == "video": return self._video_player.duration return 0.0 @property def volume(self) -> float: """获取当前音量""" if self._active_player == "audio": return self._audio_player.volume elif self._active_player == "video": return self._video_player.volume return 1.0 @property def is_playing(self) -> bool: """是否正在播放""" return self.state == PlaybackState.PLAYING @property def is_paused(self) -> bool: """是否已暂停""" return self.state == PlaybackState.PAUSED @property def media_type(self) -> Optional[MediaType]: """获取当前媒体类型""" if self._active_player == "audio": return MediaType.AUDIO elif self._active_player == "video": return MediaType.VIDEO return None @property def audio_player(self) -> AudioPlayer: """获取音频播放器实例""" return self._audio_player @property def video_player(self) -> VideoPlayer: """获取视频播放器实例""" return self._video_player # ==================== 媒体检测 ==================== def detect_media_type(self, file_path: str) -> MediaType: """ 检测媒体文件类型 Args: file_path: 文件路径 Returns: 媒体类型 """ ext = Path(file_path).suffix.lower() if ext in AUDIO_EXTENSIONS: return MediaType.AUDIO elif ext in VIDEO_EXTENSIONS: return MediaType.VIDEO return MediaType.UNKNOWN def is_supported(self, file_path: str) -> bool: """ 检查文件是否为支持的媒体格式 Args: file_path: 文件路径 Returns: 是否支持 """ media_type = self.detect_media_type(file_path) if media_type == MediaType.AUDIO: return self._audio_player.is_supported_format(file_path) elif media_type == MediaType.VIDEO: return self._video_player.is_supported_format(file_path) return False # ==================== 加载媒体 ==================== def load_audio(self, file_path: str) -> bool: """ 加载音频文件 Args: file_path: 音频文件路径 Returns: 加载是否成功 """ # 停止当前播放 self.stop() result = self._audio_player.load_audio(file_path) if result: self._active_player = "audio" return result def load_video(self, file_path: str) -> bool: """ 加载视频文件 Args: file_path: 视频文件路径 Returns: 加载是否成功 """ # 停止当前播放 self.stop() result = self._video_player.load_video(file_path) if result: self._active_player = "video" return result def load(self, file_path: str) -> bool: """ 自动检测并加载媒体文件 Args: file_path: 媒体文件路径 Returns: 加载是否成功 Raises: UnsupportedMediaFormatError: 不支持的格式 """ media_type = self.detect_media_type(file_path) if media_type == MediaType.AUDIO: return self.load_audio(file_path) elif media_type == MediaType.VIDEO: return self.load_video(file_path) else: raise UnsupportedMediaFormatError(f"Unsupported media format: {file_path}") # ==================== 播放控制 ==================== def play(self) -> None: """开始播放""" if self._active_player == "audio": self._audio_player.play() elif self._active_player == "video": self._video_player.play() else: raise PlaybackError("No media loaded") def pause(self) -> None: """暂停播放""" if self._active_player == "audio": self._audio_player.pause() elif self._active_player == "video": self._video_player.pause() def stop(self) -> None: """停止播放""" if self._active_player == "audio": self._audio_player.stop() elif self._active_player == "video": self._video_player.stop() def seek(self, position: float) -> None: """ 跳转到指定位置 Args: position: 目标位置(秒) """ if self._active_player == "audio": self._audio_player.seek(position) elif self._active_player == "video": self._video_player.seek(position) else: raise PlaybackError("No media loaded") def set_volume(self, volume: float) -> None: """ 设置音量 Args: volume: 音量值 (0.0 - 1.0) """ if self._active_player == "audio": self._audio_player.set_volume(volume) elif self._active_player == "video": self._video_player.set_volume(volume) def get_duration(self) -> float: """获取媒体时长""" return self.duration def get_position(self) -> float: """获取当前播放位置""" return self.position def set_fullscreen(self, enabled: bool) -> None: """ 设置全屏模式(仅视频) Args: enabled: 是否启用全屏 """ if self._active_player == "video": self._video_player.set_fullscreen(enabled) # ==================== 回调设置 ==================== def set_state_callback(self, callback: Optional[StateChangeCallback]) -> None: """设置状态变化回调""" self._audio_player.set_state_callback(callback) self._video_player.set_state_callback(callback) def set_position_callback(self, callback: Optional[PositionChangeCallback]) -> None: """设置播放位置变化回调""" self._audio_player.set_position_callback(callback) self._video_player.set_position_callback(callback) # ==================== 工具方法 ==================== def get_supported_audio_formats(self) -> list: """获取支持的音频格式列表""" return self._audio_player.get_supported_formats() def get_supported_video_formats(self) -> list: """获取支持的视频格式列表""" return self._video_player.get_supported_formats() def get_all_supported_extensions(self) -> list: """获取所有支持的文件扩展名""" return (self._audio_player.get_supported_extensions() + self._video_player.get_supported_extensions()) def release(self) -> None: """释放所有资源""" self._audio_player.release() self._video_player.release() self._active_player = None logger.info("MediaPlayer resources released")