# P2P Network Communication - Client Application Integration """ 客户端应用程序集成模块 负责连接所有客户端组件、连接客户端与服务器、实现完整的消息流程 需求: 全部 """ import asyncio import logging import os import sys import time from typing import Optional, Callable, Dict, Any, List from datetime import datetime from shared.models import ( Message, MessageType, UserInfo, UserStatus, ChatMessage, ConnectionMode, TransferProgress ) from shared.message_handler import MessageHandler from client.connection_manager import ConnectionManager, ConnectionState from client.file_transfer import FileTransferModule from client.image_processor import ImageProcessor from client.media_player import MediaPlayer from client.voice_chat import VoiceChatModule from config import ClientConfig logger = logging.getLogger(__name__) class P2PClientApp: """ P2P客户端应用程序 集成所有客户端组件,实现完整的消息流程 组件: - ConnectionManager: 网络连接管理 - FileTransferModule: 文件传输 - ImageProcessor: 图片处理 - MediaPlayer: 媒体播放 - VoiceChatModule: 语音聊天 """ def __init__(self, config: Optional[ClientConfig] = None): """ 初始化客户端应用程序 Args: config: 客户端配置 """ self.config = config or ClientConfig() # 用户信息 self._user_info: Optional[UserInfo] = None self._online_users: Dict[str, UserInfo] = {} # 消息历史 self._chat_history: Dict[str, List[ChatMessage]] = {} # 初始化组件 self._connection_manager = ConnectionManager(self.config) self._file_transfer = FileTransferModule(self.config) self._image_processor = ImageProcessor(self.config) self._media_player = MediaPlayer() self._voice_chat: Optional[VoiceChatModule] = None # 设置文件传输的消息发送函数 self._file_transfer.set_send_message_func(self._send_message_async) # 文件接收回调 self._file_received_callbacks: List[Callable[[str, str, str], None]] = [] self._file_transfer.add_file_received_callback(self._on_file_received) # 回调函数 self._message_callbacks: List[Callable[[Message], None]] = [] self._state_callbacks: List[Callable[[ConnectionState, Optional[str]], None]] = [] self._user_list_callbacks: List[Callable[[List[UserInfo]], None]] = [] # 设置连接管理器回调 self._connection_manager.add_message_callback(self._on_message_received) self._connection_manager.add_state_callback(self._on_connection_state_changed) # 事件循环 self._loop: Optional[asyncio.AbstractEventLoop] = None self._running = False logger.info("P2PClientApp initialized") # ==================== 生命周期管理 ==================== async def start(self, user_info: UserInfo) -> bool: """ 启动客户端应用程序 Args: user_info: 用户信息 Returns: 启动成功返回True,否则返回False """ self._user_info = user_info self._running = True try: # 连接到服务器 success = await self._connection_manager.connect_to_server(user_info) if success: logger.info(f"Client started for user: {user_info.username}") # 初始化语音聊天模块 self._voice_chat = VoiceChatModule(self.config) self._voice_chat.set_user_info(user_info.user_id, user_info.username) self._voice_chat.set_send_message_callback(self._send_message_async) logger.info(f"Voice chat module initialized for user: {user_info.user_id}") # 请求在线用户列表 await self._request_online_users() return True else: logger.error("Failed to connect to server") return False except Exception as e: logger.error(f"Failed to start client: {e}") return False async def stop(self) -> None: """停止客户端应用程序""" self._running = False # 结束语音通话 if self._voice_chat and self._voice_chat.is_in_call: self._voice_chat.end_call() # 停止媒体播放 self._media_player.stop() # 断开连接 await self._connection_manager.disconnect() logger.info("Client stopped") @property def is_connected(self) -> bool: """是否已连接到服务器""" return self._connection_manager.is_connected @property def user_info(self) -> Optional[UserInfo]: """获取当前用户信息""" return self._user_info @property def connection_state(self) -> ConnectionState: """获取连接状态""" return self._connection_manager.state # ==================== 消息发送 ==================== async def _send_message_async(self, peer_id: str, message: Message) -> bool: """ 异步发送消息 Args: peer_id: 目标对等端ID message: 消息对象 Returns: 发送成功返回True,否则返回False """ # 填充发送者ID message.sender_id = self._user_info.user_id if self._user_info else "" return await self._connection_manager.send_message(peer_id, message) async def send_text_message(self, peer_id: str, content: str) -> bool: """ 发送文本消息 实现消息发送 (需求 3.1) WHEN 用户输入文本消息并发送 THEN Message_Handler SHALL 将消息传递给目标用户 Args: peer_id: 目标对等端ID content: 消息内容 Returns: 发送成功返回True,否则返回False """ if not self._user_info: logger.error("User not logged in") return False message = Message( msg_type=MessageType.TEXT, sender_id=self._user_info.user_id, receiver_id=peer_id, timestamp=time.time(), payload=content.encode('utf-8') ) success = await self._connection_manager.send_message(peer_id, message) if success: # 保存到聊天历史 chat_msg = ChatMessage( message_id=message.message_id, sender_id=self._user_info.user_id, receiver_id=peer_id, content_type=MessageType.TEXT, content=content, timestamp=datetime.now(), is_sent=True ) self._add_to_history(peer_id, chat_msg) logger.debug(f"Text message sent to {peer_id}") return success async def send_file(self, peer_id: str, file_path: str, progress_callback: Optional[Callable[[TransferProgress], None]] = None) -> bool: """ 发送文件 实现文件发送 (需求 4.1, 4.2) Args: peer_id: 目标对等端ID file_path: 文件路径 progress_callback: 进度回调 Returns: 发送成功返回True,否则返回False """ return await self._file_transfer.send_file(peer_id, file_path, progress_callback) async def send_image(self, peer_id: str, image_path: str, compress: bool = False, progress_callback: Optional[Callable[[TransferProgress], None]] = None) -> bool: """ 发送图片 实现图片发送 (需求 5.1, 5.5) Args: peer_id: 目标对等端ID image_path: 图片路径 compress: 是否压缩 progress_callback: 进度回调 Returns: 发送成功返回True,否则返回False """ # 检查图片格式 if not self._image_processor.is_supported_format(image_path): logger.error(f"Unsupported image format: {image_path}") return False # 如果需要压缩 if compress: compressed_path = self._image_processor.compress_image(image_path) if compressed_path: image_path = compressed_path # 发送图片文件 return await self._file_transfer.send_file(peer_id, image_path, progress_callback) # ==================== 语音通话 ==================== async def start_voice_call(self, peer_id: str) -> bool: """ 发起语音通话 实现语音通话发起 (需求 7.1) Args: peer_id: 目标对等端ID Returns: 发起成功返回True,否则返回False """ if not self._voice_chat: logger.error("Voice chat module not initialized") return False return await self._voice_chat.start_call(peer_id) async def accept_voice_call(self, peer_id: str) -> bool: """ 接听语音通话 实现语音通话接听 (需求 7.2) Args: peer_id: 来电对等端ID Returns: 接听成功返回True,否则返回False """ if not self._voice_chat: return False return await self._voice_chat.accept_call(peer_id) def reject_voice_call(self, peer_id: str) -> None: """ 拒绝语音通话 Args: peer_id: 来电对等端ID """ if self._voice_chat: self._voice_chat.reject_call(peer_id) def end_voice_call(self) -> None: """结束语音通话""" if self._voice_chat: self._voice_chat.end_call() def mute_voice_call(self, muted: bool) -> None: """ 设置静音 实现静音功能 (需求 7.5) Args: muted: 是否静音 """ if self._voice_chat: self._voice_chat.mute(muted) # ==================== 媒体播放 ==================== def play_audio(self, file_path: str) -> bool: """ 播放音频文件 实现音频播放 (需求 6.1, 6.3) Args: file_path: 音频文件路径 Returns: 加载成功返回True,否则返回False """ if self._media_player.load_audio(file_path): self._media_player.play() return True return False def play_video(self, file_path: str) -> bool: """ 播放视频文件 实现视频播放 (需求 6.2, 6.4) Args: file_path: 视频文件路径 Returns: 加载成功返回True,否则返回False """ if self._media_player.load_video(file_path): self._media_player.play() return True return False def pause_media(self) -> None: """暂停媒体播放""" self._media_player.pause() def stop_media(self) -> None: """停止媒体播放""" self._media_player.stop() def seek_media(self, position: float) -> None: """ 跳转媒体位置 Args: position: 位置(秒) """ self._media_player.seek(position) def set_volume(self, volume: float) -> None: """ 设置音量 Args: volume: 音量 (0.0-1.0) """ self._media_player.set_volume(volume) # ==================== 消息接收处理 ==================== def _on_message_received(self, message: Message) -> None: """ 处理接收到的消息 实现消息接收 (需求 3.2) WHEN P2P_Client 收到文本消息 THEN P2P_Client SHALL 立即显示消息内容和发送者信息 Args: message: 接收到的消息 """ logger.debug(f"Message received: {message.msg_type.value} from {message.sender_id}") # 根据消息类型处理 if message.msg_type == MessageType.TEXT: self._handle_text_message(message) elif message.msg_type == MessageType.FILE_REQUEST: self._handle_file_request(message) elif message.msg_type == MessageType.FILE_CHUNK: self._handle_file_chunk(message) elif message.msg_type == MessageType.FILE_COMPLETE: self._handle_file_complete(message) elif message.msg_type == MessageType.IMAGE: self._handle_image_message(message) elif message.msg_type == MessageType.VOICE_CALL_REQUEST: self._handle_voice_call_request(message) elif message.msg_type == MessageType.VOICE_CALL_ACCEPT: self._handle_voice_call_accept(message) elif message.msg_type == MessageType.VOICE_CALL_REJECT: self._handle_voice_call_reject(message) elif message.msg_type == MessageType.VOICE_CALL_END: self._handle_voice_call_end(message) elif message.msg_type == MessageType.VOICE_DATA: self._handle_voice_data(message) elif message.msg_type == MessageType.USER_LIST_RESPONSE: self._handle_user_list_response(message) elif message.msg_type == MessageType.ACK: self._handle_ack_message(message) elif message.msg_type == MessageType.ERROR: self._handle_error_message(message) # 通知所有回调 for callback in self._message_callbacks: try: callback(message) except Exception as e: logger.error(f"Message callback error: {e}") def _handle_text_message(self, message: Message) -> None: """处理文本消息""" content = message.payload.decode('utf-8') # 保存到聊天历史 chat_msg = ChatMessage( message_id=message.message_id, sender_id=message.sender_id, receiver_id=message.receiver_id, content_type=MessageType.TEXT, content=content, timestamp=datetime.fromtimestamp(message.timestamp), is_read=False, is_sent=True ) self._add_to_history(message.sender_id, chat_msg) logger.info(f"Text message from {message.sender_id}: {content[:50]}...") def _handle_file_request(self, message: Message) -> None: """处理文件请求""" file_id = self._file_transfer.handle_file_request(message) if file_id: logger.info(f"File request received: {file_id}") def _handle_file_chunk(self, message: Message) -> None: """处理文件块""" self._file_transfer.handle_file_chunk(message) def _handle_file_complete(self, message: Message) -> None: """处理文件完成""" self._file_transfer.handle_file_complete(message) def _handle_image_message(self, message: Message) -> None: """处理图片消息""" # 图片作为文件传输处理 logger.info(f"Image received from {message.sender_id}") def _handle_voice_call_request(self, message: Message) -> None: """处理语音通话请求""" if self._voice_chat: asyncio.create_task(self._voice_chat._handle_call_request(message)) logger.info(f"Voice call request from {message.sender_id}") def _handle_voice_call_accept(self, message: Message) -> None: """处理语音通话接受""" if self._voice_chat: asyncio.create_task(self._voice_chat._handle_call_accept(message)) logger.info(f"Voice call accepted by {message.sender_id}") def _handle_voice_call_reject(self, message: Message) -> None: """处理语音通话拒绝""" if self._voice_chat: asyncio.create_task(self._voice_chat._handle_call_reject(message)) logger.info(f"Voice call rejected by {message.sender_id}") def _handle_voice_call_end(self, message: Message) -> None: """处理语音通话结束""" if self._voice_chat: asyncio.create_task(self._voice_chat._handle_call_end(message)) logger.info(f"Voice call ended by {message.sender_id}") def _handle_voice_data(self, message: Message) -> None: """处理语音数据""" if self._voice_chat: self._voice_chat._handle_voice_data(message) def _handle_user_list_response(self, message: Message) -> None: """处理用户列表响应""" import json try: users_data = json.loads(message.payload.decode('utf-8')) self._online_users.clear() for user_data in users_data: user_info = UserInfo.from_dict(user_data) self._online_users[user_info.user_id] = user_info logger.info(f"Online users updated: {len(self._online_users)} users") # 通知回调 for callback in self._user_list_callbacks: try: callback(list(self._online_users.values())) except Exception as e: logger.error(f"User list callback error: {e}") except Exception as e: logger.error(f"Failed to parse user list: {e}") def _handle_ack_message(self, message: Message) -> None: """处理确认消息""" info = message.payload.decode('utf-8') logger.debug(f"ACK received: {info}") def _handle_error_message(self, message: Message) -> None: """处理错误消息""" error = message.payload.decode('utf-8') logger.error(f"Error from server: {error}") def _on_file_received(self, sender_id: str, file_name: str, file_path: str) -> None: """ 处理文件接收完成 Args: sender_id: 发送者ID file_name: 文件名 file_path: 保存路径 """ logger.info(f"P2PClientApp: File received from {sender_id}: {file_name} -> {file_path}") # 判断是否是图片 image_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp'} ext = os.path.splitext(file_name)[1].lower() content_type = MessageType.IMAGE if ext in image_extensions else MessageType.FILE_REQUEST # 保存到聊天历史 chat_msg = ChatMessage( message_id=f"file_{time.time()}", sender_id=sender_id, receiver_id=self._user_info.user_id if self._user_info else "", content_type=content_type, content=file_path, # 保存文件路径 timestamp=datetime.now(), is_read=False, is_sent=True ) self._add_to_history(sender_id, chat_msg) # 通知回调 logger.info(f"P2PClientApp: Notifying {len(self._file_received_callbacks)} callbacks") for callback in self._file_received_callbacks: try: callback(sender_id, file_name, file_path) except Exception as e: logger.error(f"File received callback error: {e}") def add_file_received_callback(self, callback: Callable[[str, str, str], None]) -> None: """添加文件接收回调""" self._file_received_callbacks.append(callback) def remove_file_received_callback(self, callback: Callable[[str, str, str], None]) -> None: """移除文件接收回调""" if callback in self._file_received_callbacks: self._file_received_callbacks.remove(callback) def _on_connection_state_changed(self, state: ConnectionState, reason: Optional[str]) -> None: """ 处理连接状态变化 Args: state: 新状态 reason: 变化原因 """ logger.info(f"Connection state changed: {state.value}" + (f" ({reason})" if reason else "")) # 通知回调 for callback in self._state_callbacks: try: callback(state, reason) except Exception as e: logger.error(f"State callback error: {e}") # ==================== 聊天历史管理 ==================== def _add_to_history(self, peer_id: str, message: ChatMessage) -> None: """ 添加消息到聊天历史 Args: peer_id: 对等端ID message: 聊天消息 """ if peer_id not in self._chat_history: self._chat_history[peer_id] = [] self._chat_history[peer_id].append(message) # 按时间排序 self._chat_history[peer_id].sort(key=lambda m: m.timestamp) def get_chat_history(self, peer_id: str) -> List[ChatMessage]: """ 获取聊天历史 实现聊天历史加载 (需求 3.5, 9.2) WHEN 显示消息历史 THEN P2P_Client SHALL 按时间顺序展示所有消息记录 Args: peer_id: 对等端ID Returns: 聊天消息列表(按时间排序) """ return self._chat_history.get(peer_id, []) def clear_chat_history(self, peer_id: str) -> None: """ 清除聊天历史 Args: peer_id: 对等端ID """ if peer_id in self._chat_history: del self._chat_history[peer_id] # ==================== 用户管理 ==================== async def _request_online_users(self) -> None: """请求在线用户列表""" if not self._user_info: return request = Message( msg_type=MessageType.USER_LIST_REQUEST, sender_id=self._user_info.user_id, receiver_id="server", timestamp=time.time(), payload=b"" ) await self._connection_manager.send_message("server", request) async def refresh_online_users(self) -> List[UserInfo]: """ 刷新在线用户列表 实现获取在线用户 (需求 2.3) Returns: 在线用户列表 """ await self._request_online_users() # 等待响应(简化处理) await asyncio.sleep(0.5) return list(self._online_users.values()) def get_online_users(self) -> List[UserInfo]: """ 获取缓存的在线用户列表 Returns: 在线用户列表 """ return list(self._online_users.values()) def get_user_info(self, user_id: str) -> Optional[UserInfo]: """ 获取用户信息 Args: user_id: 用户ID Returns: 用户信息,如果不存在返回None """ return self._online_users.get(user_id) # ==================== 回调注册 ==================== def add_message_callback(self, callback: Callable[[Message], None]) -> None: """ 添加消息回调 Args: callback: 回调函数 """ self._message_callbacks.append(callback) def remove_message_callback(self, callback: Callable[[Message], None]) -> None: """ 移除消息回调 Args: callback: 回调函数 """ if callback in self._message_callbacks: self._message_callbacks.remove(callback) def add_state_callback(self, callback: Callable[[ConnectionState, Optional[str]], None]) -> None: """ 添加状态回调 Args: callback: 回调函数 """ self._state_callbacks.append(callback) def remove_state_callback(self, callback: Callable[[ConnectionState, Optional[str]], None]) -> None: """ 移除状态回调 Args: callback: 回调函数 """ if callback in self._state_callbacks: self._state_callbacks.remove(callback) def add_user_list_callback(self, callback: Callable[[List[UserInfo]], None]) -> None: """ 添加用户列表回调 Args: callback: 回调函数 """ self._user_list_callbacks.append(callback) def remove_user_list_callback(self, callback: Callable[[List[UserInfo]], None]) -> None: """ 移除用户列表回调 Args: callback: 回调函数 """ if callback in self._user_list_callbacks: self._user_list_callbacks.remove(callback) # ==================== 连接模式 ==================== def get_connection_mode(self, peer_id: str) -> ConnectionMode: """ 获取与对等端的连接模式 Args: peer_id: 对等端ID Returns: 连接模式 """ return self._connection_manager.get_connection_mode(peer_id) async def discover_lan_peers(self) -> List: """ 发现局域网对等端 实现局域网发现 (需求 1.2) Returns: 发现的对等端列表 """ return await self._connection_manager.discover_lan_peers() async def optimize_connection(self, peer_id: str) -> ConnectionMode: """ 优化与对等端的连接 尝试建立P2P直连 (需求 1.1, 1.2, 1.3) Args: peer_id: 对等端ID Returns: 优化后的连接模式 """ return await self._connection_manager.auto_select_connection_mode(peer_id) # ==================== 组件访问 ==================== @property def connection_manager(self) -> ConnectionManager: """获取连接管理器""" return self._connection_manager @property def file_transfer(self) -> FileTransferModule: """获取文件传输模块""" return self._file_transfer @property def image_processor(self) -> ImageProcessor: """获取图片处理器""" return self._image_processor @property def media_player(self) -> MediaPlayer: """获取媒体播放器""" return self._media_player @property def voice_chat(self) -> Optional[VoiceChatModule]: """获取语音聊天模块""" return self._voice_chat # ==================== 统计信息 ==================== def get_stats(self) -> Dict[str, Any]: """ 获取应用程序统计信息 Returns: 统计信息字典 """ conn_stats = self._connection_manager.get_connection_stats() return { "user": self._user_info.username if self._user_info else None, "connection": conn_stats, "online_users": len(self._online_users), "chat_sessions": len(self._chat_history), "total_messages": sum(len(msgs) for msgs in self._chat_history.values()), "active_transfers": len(self._file_transfer._active_transfers), "voice_call_active": self._voice_chat.is_in_call if self._voice_chat else False } # ==================== 便捷函数 ==================== def create_client(server_host: str = "127.0.0.1", server_port: int = 8888) -> P2PClientApp: """ 创建客户端实例 Args: server_host: 服务器地址 server_port: 服务器端口 Returns: 客户端实例 """ config = ClientConfig( server_host=server_host, server_port=server_port ) return P2PClientApp(config) async def quick_connect(username: str, display_name: str = "", server_host: str = "127.0.0.1", server_port: int = 8888) -> Optional[P2PClientApp]: """ 快速连接到服务器 Args: username: 用户名 display_name: 显示名称 server_host: 服务器地址 server_port: 服务器端口 Returns: 连接成功返回客户端实例,否则返回None """ import uuid client = create_client(server_host, server_port) user_info = UserInfo( user_id=str(uuid.uuid4()), username=username, display_name=display_name or username, status=UserStatus.ONLINE ) success = await client.start(user_info) if success: return client else: return None