You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1439 lines
43 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# P2P Network Communication - Media Player Module
"""
媒体播放器模块
负责音频和视频文件的播放控制
需求: 6.1, 6.2, 6.3, 6.4, 6.5, 6.7
"""
import logging
import os
import threading
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Callable, Optional, Tuple
from config import ClientConfig
# 设置日志
logger = logging.getLogger(__name__)
# ==================== 枚举类型 ====================
class MediaType(Enum):
"""媒体类型枚举"""
AUDIO = "audio"
VIDEO = "video"
UNKNOWN = "unknown"
class PlaybackState(Enum):
"""播放状态枚举"""
STOPPED = "stopped"
PLAYING = "playing"
PAUSED = "paused"
LOADING = "loading"
ERROR = "error"
class AudioFormat(Enum):
"""支持的音频格式枚举 (需求 6.1)"""
MP3 = "mp3"
WAV = "wav"
AAC = "aac"
FLAC = "flac"
UNKNOWN = "unknown"
class VideoFormat(Enum):
"""支持的视频格式枚举 (需求 6.2)"""
MP4 = "mp4"
AVI = "avi"
MKV = "mkv"
MOV = "mov"
UNKNOWN = "unknown"
# ==================== 异常类 ====================
class MediaPlayerError(Exception):
"""媒体播放器错误基类"""
pass
class MediaNotFoundError(MediaPlayerError):
"""媒体文件不存在错误"""
pass
class UnsupportedMediaFormatError(MediaPlayerError):
"""不支持的媒体格式错误"""
pass
class PlaybackError(MediaPlayerError):
"""播放错误"""
pass
class MediaLoadError(MediaPlayerError):
"""媒体加载错误"""
pass
# ==================== 数据结构 ====================
@dataclass
class MediaInfo:
"""媒体信息数据结构"""
path: str
media_type: MediaType
format: str
duration: float # 时长(秒)
file_size: int
# 音频属性
sample_rate: Optional[int] = None
channels: Optional[int] = None
bitrate: Optional[int] = None
# 视频属性
width: Optional[int] = None
height: Optional[int] = None
fps: Optional[float] = None
def to_dict(self) -> dict:
"""转换为字典"""
return {
"path": self.path,
"media_type": self.media_type.value,
"format": self.format,
"duration": self.duration,
"file_size": self.file_size,
"sample_rate": self.sample_rate,
"channels": self.channels,
"bitrate": self.bitrate,
"width": self.width,
"height": self.height,
"fps": self.fps,
}
# ==================== 格式映射 ====================
# 音频文件扩展名映射
AUDIO_EXTENSIONS = {
'.mp3': AudioFormat.MP3,
'.wav': AudioFormat.WAV,
'.aac': AudioFormat.AAC,
'.m4a': AudioFormat.AAC,
'.flac': AudioFormat.FLAC,
}
# 视频文件扩展名映射
VIDEO_EXTENSIONS = {
'.mp4': VideoFormat.MP4,
'.avi': VideoFormat.AVI,
'.mkv': VideoFormat.MKV,
'.mov': VideoFormat.MOV,
}
# 支持的音频格式集合
SUPPORTED_AUDIO_FORMATS = {AudioFormat.MP3, AudioFormat.WAV, AudioFormat.AAC, AudioFormat.FLAC}
# 支持的视频格式集合
SUPPORTED_VIDEO_FORMATS = {VideoFormat.MP4, VideoFormat.AVI, VideoFormat.MKV, VideoFormat.MOV}
# 回调类型定义
StateChangeCallback = Callable[[PlaybackState], None]
PositionChangeCallback = Callable[[float], None]
# ==================== 音频播放器 (需求 6.1, 6.3, 6.5) ====================
class AudioPlayer:
"""
音频播放器
实现音频文件的加载和播放控制
需求:
- 6.1: 支持常见音频格式MP3、WAV、AAC、FLAC
- 6.3: 提供内置播放器进行播放
- 6.5: 提供播放、暂停、进度拖动、音量调节功能
"""
# 支持的音频格式
SUPPORTED_FORMATS = SUPPORTED_AUDIO_FORMATS
def __init__(self, config: Optional[ClientConfig] = None):
"""
初始化音频播放器
Args:
config: 客户端配置
"""
self.config = config or ClientConfig()
# 播放状态
self._state: PlaybackState = PlaybackState.STOPPED
self._current_file: Optional[str] = None
self._media_info: Optional[MediaInfo] = None
# 播放控制
self._position: float = 0.0 # 当前播放位置(秒)
self._duration: float = 0.0 # 总时长(秒)
self._volume: float = 1.0 # 音量 (0.0 - 1.0)
# 线程控制
self._playback_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._pause_event = threading.Event()
self._pause_event.set() # 初始不暂停
# 回调函数
self._state_callback: Optional[StateChangeCallback] = None
self._position_callback: Optional[PositionChangeCallback] = None
# 锁
self._lock = threading.RLock()
logger.info("AudioPlayer initialized")
# ==================== 属性 ====================
@property
def state(self) -> PlaybackState:
"""获取当前播放状态"""
return self._state
@property
def position(self) -> float:
"""获取当前播放位置(秒)"""
return self._position
@property
def duration(self) -> float:
"""获取媒体总时长(秒)"""
return self._duration
@property
def volume(self) -> float:
"""获取当前音量 (0.0 - 1.0)"""
return self._volume
@property
def is_playing(self) -> bool:
"""是否正在播放"""
return self._state == PlaybackState.PLAYING
@property
def is_paused(self) -> bool:
"""是否已暂停"""
return self._state == PlaybackState.PAUSED
@property
def current_file(self) -> Optional[str]:
"""获取当前加载的文件路径"""
return self._current_file
@property
def media_info(self) -> Optional[MediaInfo]:
"""获取当前媒体信息"""
return self._media_info
# ==================== 回调设置 ====================
def set_state_callback(self, callback: Optional[StateChangeCallback]) -> None:
"""设置状态变化回调"""
self._state_callback = callback
def set_position_callback(self, callback: Optional[PositionChangeCallback]) -> None:
"""设置播放位置变化回调"""
self._position_callback = callback
def _notify_state_change(self, state: PlaybackState) -> None:
"""通知状态变化"""
self._state = state
if self._state_callback:
try:
self._state_callback(state)
except Exception as e:
logger.error(f"State callback error: {e}")
def _notify_position_change(self, position: float) -> None:
"""通知播放位置变化"""
self._position = position
if self._position_callback:
try:
self._position_callback(position)
except Exception as e:
logger.error(f"Position callback error: {e}")
# ==================== 格式检测 ====================
def detect_format(self, file_path: str) -> AudioFormat:
"""
检测音频文件格式
Args:
file_path: 文件路径
Returns:
音频格式
"""
ext = Path(file_path).suffix.lower()
return AUDIO_EXTENSIONS.get(ext, AudioFormat.UNKNOWN)
def is_supported_format(self, file_path: str) -> bool:
"""
检查是否为支持的音频格式
Args:
file_path: 文件路径
Returns:
是否支持
"""
fmt = self.detect_format(file_path)
return fmt in self.SUPPORTED_FORMATS
# ==================== 加载音频 (需求 6.1) ====================
def load_audio(self, file_path: str) -> bool:
"""
加载音频文件
实现 load_audio() 加载音频 (需求 6.1)
WHEN 用户选择发送音频文件 THEN File_Transfer_Module SHALL 支持常见音频格式
Args:
file_path: 音频文件路径
Returns:
加载是否成功
Raises:
MediaNotFoundError: 文件不存在
UnsupportedMediaFormatError: 不支持的格式
MediaLoadError: 加载失败
"""
with self._lock:
# 检查文件是否存在
if not os.path.exists(file_path):
raise MediaNotFoundError(f"Audio file not found: {file_path}")
# 检查格式是否支持
audio_format = self.detect_format(file_path)
if audio_format not in self.SUPPORTED_FORMATS:
raise UnsupportedMediaFormatError(
f"Unsupported audio format: {audio_format.value}. "
f"Supported formats: {[f.value for f in self.SUPPORTED_FORMATS]}"
)
# 停止当前播放
if self._state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
self.stop()
self._notify_state_change(PlaybackState.LOADING)
try:
# 获取文件信息
file_size = os.path.getsize(file_path)
# 尝试获取音频时长和详细信息
duration, sample_rate, channels, bitrate = self._get_audio_info(file_path)
self._media_info = MediaInfo(
path=file_path,
media_type=MediaType.AUDIO,
format=audio_format.value,
duration=duration,
file_size=file_size,
sample_rate=sample_rate,
channels=channels,
bitrate=bitrate,
)
self._current_file = file_path
self._duration = duration
self._position = 0.0
self._notify_state_change(PlaybackState.STOPPED)
logger.info(f"Audio loaded: {file_path} (duration: {duration:.2f}s)")
return True
except Exception as e:
self._notify_state_change(PlaybackState.ERROR)
logger.error(f"Failed to load audio: {e}")
raise MediaLoadError(f"Failed to load audio: {e}")
def _get_audio_info(self, file_path: str) -> Tuple[float, Optional[int], Optional[int], Optional[int]]:
"""
获取音频文件信息
Args:
file_path: 文件路径
Returns:
(duration, sample_rate, channels, bitrate)
"""
# 尝试使用 mutagen 获取音频信息
try:
from mutagen import File as MutagenFile
audio = MutagenFile(file_path)
if audio is not None:
duration = audio.info.length if hasattr(audio.info, 'length') else 0.0
sample_rate = getattr(audio.info, 'sample_rate', None)
channels = getattr(audio.info, 'channels', None)
bitrate = getattr(audio.info, 'bitrate', None)
return duration, sample_rate, channels, bitrate
except ImportError:
logger.debug("mutagen not available, using fallback")
except Exception as e:
logger.debug(f"mutagen failed: {e}")
# 回退:基于文件大小估算时长
file_size = os.path.getsize(file_path)
ext = Path(file_path).suffix.lower()
# 估算比特率(粗略估计)
estimated_bitrate = {
'.mp3': 128000, # 128 kbps
'.wav': 1411200, # CD quality
'.aac': 128000,
'.flac': 800000,
}.get(ext, 128000)
duration = (file_size * 8) / estimated_bitrate
return duration, None, None, estimated_bitrate // 1000
# ==================== 播放控制 (需求 6.3, 6.5) ====================
def play(self) -> None:
"""
开始播放
实现播放控制 play (需求 6.3, 6.5)
WHEN 音频文件接收完成 THEN Media_Player SHALL 提供内置播放器进行播放
WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能
Raises:
PlaybackError: 没有加载媒体或播放失败
"""
with self._lock:
if self._current_file is None:
raise PlaybackError("No audio file loaded")
if self._state == PlaybackState.PLAYING:
logger.debug("Already playing")
return
if self._state == PlaybackState.PAUSED:
# 从暂停恢复
self._pause_event.set()
self._notify_state_change(PlaybackState.PLAYING)
logger.info("Playback resumed")
return
# 开始新的播放
self._stop_event.clear()
self._pause_event.set()
self._playback_thread = threading.Thread(
target=self._playback_loop,
daemon=True
)
self._playback_thread.start()
self._notify_state_change(PlaybackState.PLAYING)
logger.info(f"Playback started: {self._current_file}")
def pause(self) -> None:
"""
暂停播放
实现播放控制 pause (需求 6.5)
WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能
"""
with self._lock:
if self._state != PlaybackState.PLAYING:
logger.debug("Not playing, cannot pause")
return
self._pause_event.clear()
self._notify_state_change(PlaybackState.PAUSED)
logger.info("Playback paused")
def stop(self) -> None:
"""
停止播放
实现播放控制 stop (需求 6.5)
"""
with self._lock:
if self._state == PlaybackState.STOPPED:
logger.debug("Already stopped")
return
self._stop_event.set()
self._pause_event.set() # 确保线程不会卡在暂停状态
if self._playback_thread and self._playback_thread.is_alive():
self._playback_thread.join(timeout=1.0)
self._position = 0.0
self._notify_state_change(PlaybackState.STOPPED)
logger.info("Playback stopped")
def seek(self, position: float) -> None:
"""
跳转到指定位置
实现进度拖动 (需求 6.5)
WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能
Args:
position: 目标位置(秒)
Raises:
PlaybackError: 没有加载媒体
"""
with self._lock:
if self._current_file is None:
raise PlaybackError("No audio file loaded")
# 限制在有效范围内
position = max(0.0, min(position, self._duration))
self._position = position
self._notify_position_change(position)
logger.info(f"Seeked to position: {position:.2f}s")
def set_volume(self, volume: float) -> None:
"""
设置音量
实现音量调节 (需求 6.5)
WHILE 播放音视频 THEN Media_Player SHALL 提供播放、暂停、进度拖动、音量调节功能
Args:
volume: 音量值 (0.0 - 1.0)
"""
with self._lock:
self._volume = max(0.0, min(1.0, volume))
logger.debug(f"Volume set to: {self._volume:.2f}")
def get_duration(self) -> float:
"""
获取媒体时长
Returns:
时长(秒)
"""
return self._duration
def get_position(self) -> float:
"""
获取当前播放位置
Returns:
当前位置(秒)
"""
return self._position
def _playback_loop(self) -> None:
"""
播放循环(在单独线程中运行)
模拟音频播放进度更新
"""
update_interval = 0.1 # 100ms 更新一次
while not self._stop_event.is_set():
# 等待暂停事件
self._pause_event.wait()
if self._stop_event.is_set():
break
# 更新播放位置
with self._lock:
self._position += update_interval
if self._position >= self._duration:
# 播放完成
self._position = self._duration
self._notify_position_change(self._position)
self._notify_state_change(PlaybackState.STOPPED)
logger.info("Playback completed")
break
self._notify_position_change(self._position)
time.sleep(update_interval)
# ==================== 工具方法 ====================
def get_supported_formats(self) -> list:
"""
获取支持的音频格式列表
Returns:
支持的格式列表
"""
return [fmt.value for fmt in self.SUPPORTED_FORMATS]
def get_supported_extensions(self) -> list:
"""
获取支持的文件扩展名列表
Returns:
支持的扩展名列表
"""
return list(AUDIO_EXTENSIONS.keys())
def release(self) -> None:
"""
释放资源
"""
self.stop()
self._current_file = None
self._media_info = None
self._duration = 0.0
self._position = 0.0
logger.info("AudioPlayer resources released")
# ==================== 视频播放器 (需求 6.2, 6.4, 6.7) ====================
class VideoPlayer:
"""
视频播放器
实现视频文件的加载和播放控制
需求:
- 6.2: 支持常见视频格式MP4、AVI、MKV、MOV
- 6.4: 提供内置播放器进行播放
- 6.5: 提供播放、暂停、进度拖动、音量调节功能
- 6.7: 支持全屏模式
"""
# 支持的视频格式
SUPPORTED_FORMATS = SUPPORTED_VIDEO_FORMATS
def __init__(self, config: Optional[ClientConfig] = None):
"""
初始化视频播放器
Args:
config: 客户端配置
"""
self.config = config or ClientConfig()
# 播放状态
self._state: PlaybackState = PlaybackState.STOPPED
self._current_file: Optional[str] = None
self._media_info: Optional[MediaInfo] = None
# 播放控制
self._position: float = 0.0 # 当前播放位置(秒)
self._duration: float = 0.0 # 总时长(秒)
self._volume: float = 1.0 # 音量 (0.0 - 1.0)
# 视频属性
self._width: int = 0
self._height: int = 0
self._fps: float = 0.0
self._fullscreen: bool = False
# 线程控制
self._playback_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._pause_event = threading.Event()
self._pause_event.set() # 初始不暂停
# 回调函数
self._state_callback: Optional[StateChangeCallback] = None
self._position_callback: Optional[PositionChangeCallback] = None
self._frame_callback: Optional[Callable[[bytes, int, int], None]] = None
# 锁
self._lock = threading.RLock()
logger.info("VideoPlayer initialized")
# ==================== 属性 ====================
@property
def state(self) -> PlaybackState:
"""获取当前播放状态"""
return self._state
@property
def position(self) -> float:
"""获取当前播放位置(秒)"""
return self._position
@property
def duration(self) -> float:
"""获取媒体总时长(秒)"""
return self._duration
@property
def volume(self) -> float:
"""获取当前音量 (0.0 - 1.0)"""
return self._volume
@property
def is_playing(self) -> bool:
"""是否正在播放"""
return self._state == PlaybackState.PLAYING
@property
def is_paused(self) -> bool:
"""是否已暂停"""
return self._state == PlaybackState.PAUSED
@property
def is_fullscreen(self) -> bool:
"""是否全屏模式"""
return self._fullscreen
@property
def current_file(self) -> Optional[str]:
"""获取当前加载的文件路径"""
return self._current_file
@property
def media_info(self) -> Optional[MediaInfo]:
"""获取当前媒体信息"""
return self._media_info
@property
def video_size(self) -> Tuple[int, int]:
"""获取视频尺寸 (width, height)"""
return (self._width, self._height)
@property
def fps(self) -> float:
"""获取视频帧率"""
return self._fps
# ==================== 回调设置 ====================
def set_state_callback(self, callback: Optional[StateChangeCallback]) -> None:
"""设置状态变化回调"""
self._state_callback = callback
def set_position_callback(self, callback: Optional[PositionChangeCallback]) -> None:
"""设置播放位置变化回调"""
self._position_callback = callback
def set_frame_callback(self, callback: Optional[Callable[[bytes, int, int], None]]) -> None:
"""
设置帧渲染回调
Args:
callback: 回调函数,参数为 (frame_data, width, height)
"""
self._frame_callback = callback
def _notify_state_change(self, state: PlaybackState) -> None:
"""通知状态变化"""
self._state = state
if self._state_callback:
try:
self._state_callback(state)
except Exception as e:
logger.error(f"State callback error: {e}")
def _notify_position_change(self, position: float) -> None:
"""通知播放位置变化"""
self._position = position
if self._position_callback:
try:
self._position_callback(position)
except Exception as e:
logger.error(f"Position callback error: {e}")
# ==================== 格式检测 ====================
def detect_format(self, file_path: str) -> VideoFormat:
"""
检测视频文件格式
Args:
file_path: 文件路径
Returns:
视频格式
"""
ext = Path(file_path).suffix.lower()
return VIDEO_EXTENSIONS.get(ext, VideoFormat.UNKNOWN)
def is_supported_format(self, file_path: str) -> bool:
"""
检查是否为支持的视频格式
Args:
file_path: 文件路径
Returns:
是否支持
"""
fmt = self.detect_format(file_path)
return fmt in self.SUPPORTED_FORMATS
# ==================== 加载视频 (需求 6.2) ====================
def load_video(self, file_path: str) -> bool:
"""
加载视频文件
实现 load_video() 加载视频 (需求 6.2)
WHEN 用户选择发送视频文件 THEN File_Transfer_Module SHALL 支持常见视频格式
Args:
file_path: 视频文件路径
Returns:
加载是否成功
Raises:
MediaNotFoundError: 文件不存在
UnsupportedMediaFormatError: 不支持的格式
MediaLoadError: 加载失败
"""
with self._lock:
# 检查文件是否存在
if not os.path.exists(file_path):
raise MediaNotFoundError(f"Video file not found: {file_path}")
# 检查格式是否支持
video_format = self.detect_format(file_path)
if video_format not in self.SUPPORTED_FORMATS:
raise UnsupportedMediaFormatError(
f"Unsupported video format: {video_format.value}. "
f"Supported formats: {[f.value for f in self.SUPPORTED_FORMATS]}"
)
# 停止当前播放
if self._state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
self.stop()
self._notify_state_change(PlaybackState.LOADING)
try:
# 获取文件信息
file_size = os.path.getsize(file_path)
# 尝试获取视频时长和详细信息
duration, width, height, fps = self._get_video_info(file_path)
self._media_info = MediaInfo(
path=file_path,
media_type=MediaType.VIDEO,
format=video_format.value,
duration=duration,
file_size=file_size,
width=width,
height=height,
fps=fps,
)
self._current_file = file_path
self._duration = duration
self._position = 0.0
self._width = width or 0
self._height = height or 0
self._fps = fps or 0.0
self._notify_state_change(PlaybackState.STOPPED)
logger.info(f"Video loaded: {file_path} (duration: {duration:.2f}s, "
f"size: {width}x{height}, fps: {fps:.2f})")
return True
except Exception as e:
self._notify_state_change(PlaybackState.ERROR)
logger.error(f"Failed to load video: {e}")
raise MediaLoadError(f"Failed to load video: {e}")
def _get_video_info(self, file_path: str) -> Tuple[float, Optional[int], Optional[int], Optional[float]]:
"""
获取视频文件信息
Args:
file_path: 文件路径
Returns:
(duration, width, height, fps)
"""
# 尝试使用 OpenCV 获取视频信息
try:
import cv2
cap = cv2.VideoCapture(file_path)
if cap.isOpened():
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = frame_count / fps if fps > 0 else 0.0
cap.release()
return duration, width, height, fps
except ImportError:
logger.debug("OpenCV not available, using fallback")
except Exception as e:
logger.debug(f"OpenCV failed: {e}")
# 回退:基于文件大小估算
file_size = os.path.getsize(file_path)
# 假设平均比特率为 5 Mbps
estimated_bitrate = 5 * 1024 * 1024
duration = (file_size * 8) / estimated_bitrate
# 默认视频属性
return duration, 1920, 1080, 30.0
# ==================== 播放控制 (需求 6.4, 6.5) ====================
def play(self) -> None:
"""
开始播放
实现视频渲染和播放控制 (需求 6.4)
WHEN 视频文件接收完成 THEN Media_Player SHALL 提供内置播放器进行播放
Raises:
PlaybackError: 没有加载媒体或播放失败
"""
with self._lock:
if self._current_file is None:
raise PlaybackError("No video file loaded")
if self._state == PlaybackState.PLAYING:
logger.debug("Already playing")
return
if self._state == PlaybackState.PAUSED:
# 从暂停恢复
self._pause_event.set()
self._notify_state_change(PlaybackState.PLAYING)
logger.info("Playback resumed")
return
# 开始新的播放
self._stop_event.clear()
self._pause_event.set()
self._playback_thread = threading.Thread(
target=self._playback_loop,
daemon=True
)
self._playback_thread.start()
self._notify_state_change(PlaybackState.PLAYING)
logger.info(f"Playback started: {self._current_file}")
def pause(self) -> None:
"""
暂停播放
实现播放控制 pause (需求 6.5)
"""
with self._lock:
if self._state != PlaybackState.PLAYING:
logger.debug("Not playing, cannot pause")
return
self._pause_event.clear()
self._notify_state_change(PlaybackState.PAUSED)
logger.info("Playback paused")
def stop(self) -> None:
"""
停止播放
实现播放控制 stop (需求 6.5)
"""
with self._lock:
if self._state == PlaybackState.STOPPED:
logger.debug("Already stopped")
return
self._stop_event.set()
self._pause_event.set() # 确保线程不会卡在暂停状态
if self._playback_thread and self._playback_thread.is_alive():
self._playback_thread.join(timeout=1.0)
self._position = 0.0
self._notify_state_change(PlaybackState.STOPPED)
logger.info("Playback stopped")
def seek(self, position: float) -> None:
"""
跳转到指定位置
实现进度拖动 (需求 6.5)
Args:
position: 目标位置(秒)
Raises:
PlaybackError: 没有加载媒体
"""
with self._lock:
if self._current_file is None:
raise PlaybackError("No video file loaded")
# 限制在有效范围内
position = max(0.0, min(position, self._duration))
self._position = position
self._notify_position_change(position)
logger.info(f"Seeked to position: {position:.2f}s")
def set_volume(self, volume: float) -> None:
"""
设置音量
实现音量调节 (需求 6.5)
Args:
volume: 音量值 (0.0 - 1.0)
"""
with self._lock:
self._volume = max(0.0, min(1.0, volume))
logger.debug(f"Volume set to: {self._volume:.2f}")
def get_duration(self) -> float:
"""
获取媒体时长
Returns:
时长(秒)
"""
return self._duration
def get_position(self) -> float:
"""
获取当前播放位置
Returns:
当前位置(秒)
"""
return self._position
# ==================== 全屏模式 (需求 6.7) ====================
def set_fullscreen(self, enabled: bool) -> None:
"""
设置全屏模式
实现全屏模式 (需求 6.7)
WHEN 播放视频 THEN Media_Player SHALL 支持全屏模式
Args:
enabled: 是否启用全屏
"""
with self._lock:
self._fullscreen = enabled
logger.info(f"Fullscreen mode: {'enabled' if enabled else 'disabled'}")
def toggle_fullscreen(self) -> bool:
"""
切换全屏模式
Returns:
切换后的全屏状态
"""
with self._lock:
self._fullscreen = not self._fullscreen
logger.info(f"Fullscreen toggled: {'enabled' if self._fullscreen else 'disabled'}")
return self._fullscreen
# ==================== 播放循环 ====================
def _playback_loop(self) -> None:
"""
播放循环(在单独线程中运行)
模拟视频播放进度更新
"""
frame_interval = 1.0 / max(self._fps, 30.0) if self._fps > 0 else 1.0 / 30.0
while not self._stop_event.is_set():
# 等待暂停事件
self._pause_event.wait()
if self._stop_event.is_set():
break
# 更新播放位置
with self._lock:
self._position += frame_interval
if self._position >= self._duration:
# 播放完成
self._position = self._duration
self._notify_position_change(self._position)
self._notify_state_change(PlaybackState.STOPPED)
logger.info("Playback completed")
break
self._notify_position_change(self._position)
time.sleep(frame_interval)
# ==================== 工具方法 ====================
def get_supported_formats(self) -> list:
"""
获取支持的视频格式列表
Returns:
支持的格式列表
"""
return [fmt.value for fmt in self.SUPPORTED_FORMATS]
def get_supported_extensions(self) -> list:
"""
获取支持的文件扩展名列表
Returns:
支持的扩展名列表
"""
return list(VIDEO_EXTENSIONS.keys())
def release(self) -> None:
"""
释放资源
"""
self.stop()
self._current_file = None
self._media_info = None
self._duration = 0.0
self._position = 0.0
self._width = 0
self._height = 0
self._fps = 0.0
self._fullscreen = False
logger.info("VideoPlayer resources released")
# ==================== 统一媒体播放器 ====================
class MediaPlayer:
"""
统一媒体播放器
整合音频和视频播放功能,提供统一的接口
需求:
- 6.1: 支持常见音频格式
- 6.2: 支持常见视频格式
- 6.3, 6.4: 提供内置播放器
- 6.5: 提供播放控制功能
- 6.7: 支持全屏模式
"""
def __init__(self, config: Optional[ClientConfig] = None):
"""
初始化媒体播放器
Args:
config: 客户端配置
"""
self.config = config or ClientConfig()
# 内部播放器
self._audio_player = AudioPlayer(config)
self._video_player = VideoPlayer(config)
# 当前活动的播放器
self._active_player: Optional[str] = None # "audio" or "video"
logger.info("MediaPlayer initialized")
# ==================== 属性 ====================
@property
def state(self) -> PlaybackState:
"""获取当前播放状态"""
if self._active_player == "audio":
return self._audio_player.state
elif self._active_player == "video":
return self._video_player.state
return PlaybackState.STOPPED
@property
def position(self) -> float:
"""获取当前播放位置(秒)"""
if self._active_player == "audio":
return self._audio_player.position
elif self._active_player == "video":
return self._video_player.position
return 0.0
@property
def duration(self) -> float:
"""获取媒体总时长(秒)"""
if self._active_player == "audio":
return self._audio_player.duration
elif self._active_player == "video":
return self._video_player.duration
return 0.0
@property
def volume(self) -> float:
"""获取当前音量"""
if self._active_player == "audio":
return self._audio_player.volume
elif self._active_player == "video":
return self._video_player.volume
return 1.0
@property
def is_playing(self) -> bool:
"""是否正在播放"""
return self.state == PlaybackState.PLAYING
@property
def is_paused(self) -> bool:
"""是否已暂停"""
return self.state == PlaybackState.PAUSED
@property
def media_type(self) -> Optional[MediaType]:
"""获取当前媒体类型"""
if self._active_player == "audio":
return MediaType.AUDIO
elif self._active_player == "video":
return MediaType.VIDEO
return None
@property
def audio_player(self) -> AudioPlayer:
"""获取音频播放器实例"""
return self._audio_player
@property
def video_player(self) -> VideoPlayer:
"""获取视频播放器实例"""
return self._video_player
# ==================== 媒体检测 ====================
def detect_media_type(self, file_path: str) -> MediaType:
"""
检测媒体文件类型
Args:
file_path: 文件路径
Returns:
媒体类型
"""
ext = Path(file_path).suffix.lower()
if ext in AUDIO_EXTENSIONS:
return MediaType.AUDIO
elif ext in VIDEO_EXTENSIONS:
return MediaType.VIDEO
return MediaType.UNKNOWN
def is_supported(self, file_path: str) -> bool:
"""
检查文件是否为支持的媒体格式
Args:
file_path: 文件路径
Returns:
是否支持
"""
media_type = self.detect_media_type(file_path)
if media_type == MediaType.AUDIO:
return self._audio_player.is_supported_format(file_path)
elif media_type == MediaType.VIDEO:
return self._video_player.is_supported_format(file_path)
return False
# ==================== 加载媒体 ====================
def load_audio(self, file_path: str) -> bool:
"""
加载音频文件
Args:
file_path: 音频文件路径
Returns:
加载是否成功
"""
# 停止当前播放
self.stop()
result = self._audio_player.load_audio(file_path)
if result:
self._active_player = "audio"
return result
def load_video(self, file_path: str) -> bool:
"""
加载视频文件
Args:
file_path: 视频文件路径
Returns:
加载是否成功
"""
# 停止当前播放
self.stop()
result = self._video_player.load_video(file_path)
if result:
self._active_player = "video"
return result
def load(self, file_path: str) -> bool:
"""
自动检测并加载媒体文件
Args:
file_path: 媒体文件路径
Returns:
加载是否成功
Raises:
UnsupportedMediaFormatError: 不支持的格式
"""
media_type = self.detect_media_type(file_path)
if media_type == MediaType.AUDIO:
return self.load_audio(file_path)
elif media_type == MediaType.VIDEO:
return self.load_video(file_path)
else:
raise UnsupportedMediaFormatError(f"Unsupported media format: {file_path}")
# ==================== 播放控制 ====================
def play(self) -> None:
"""开始播放"""
if self._active_player == "audio":
self._audio_player.play()
elif self._active_player == "video":
self._video_player.play()
else:
raise PlaybackError("No media loaded")
def pause(self) -> None:
"""暂停播放"""
if self._active_player == "audio":
self._audio_player.pause()
elif self._active_player == "video":
self._video_player.pause()
def stop(self) -> None:
"""停止播放"""
if self._active_player == "audio":
self._audio_player.stop()
elif self._active_player == "video":
self._video_player.stop()
def seek(self, position: float) -> None:
"""
跳转到指定位置
Args:
position: 目标位置(秒)
"""
if self._active_player == "audio":
self._audio_player.seek(position)
elif self._active_player == "video":
self._video_player.seek(position)
else:
raise PlaybackError("No media loaded")
def set_volume(self, volume: float) -> None:
"""
设置音量
Args:
volume: 音量值 (0.0 - 1.0)
"""
if self._active_player == "audio":
self._audio_player.set_volume(volume)
elif self._active_player == "video":
self._video_player.set_volume(volume)
def get_duration(self) -> float:
"""获取媒体时长"""
return self.duration
def get_position(self) -> float:
"""获取当前播放位置"""
return self.position
def set_fullscreen(self, enabled: bool) -> None:
"""
设置全屏模式(仅视频)
Args:
enabled: 是否启用全屏
"""
if self._active_player == "video":
self._video_player.set_fullscreen(enabled)
# ==================== 回调设置 ====================
def set_state_callback(self, callback: Optional[StateChangeCallback]) -> None:
"""设置状态变化回调"""
self._audio_player.set_state_callback(callback)
self._video_player.set_state_callback(callback)
def set_position_callback(self, callback: Optional[PositionChangeCallback]) -> None:
"""设置播放位置变化回调"""
self._audio_player.set_position_callback(callback)
self._video_player.set_position_callback(callback)
# ==================== 工具方法 ====================
def get_supported_audio_formats(self) -> list:
"""获取支持的音频格式列表"""
return self._audio_player.get_supported_formats()
def get_supported_video_formats(self) -> list:
"""获取支持的视频格式列表"""
return self._video_player.get_supported_formats()
def get_all_supported_extensions(self) -> list:
"""获取所有支持的文件扩展名"""
return (self._audio_player.get_supported_extensions() +
self._video_player.get_supported_extensions())
def release(self) -> None:
"""释放所有资源"""
self._audio_player.release()
self._video_player.release()
self._active_player = None
logger.info("MediaPlayer resources released")