From 222864e1f82f3ed5253fad52b26c3c38c685f784 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E5=8D=9A=E6=96=87?= <15549487+FX_YBW@user.noreply.gitee.com> Date: Fri, 26 Dec 2025 21:08:29 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A1=AE=E4=BF=9D=E6=89=80=E6=9C=89=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E9=80=9A=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/app.py | 850 ++++++++++++++++++++++++++++++++++++++ tests/test_integration.py | 643 ++++++++++++++++++++++++++++ 2 files changed, 1493 insertions(+) create mode 100644 client/app.py create mode 100644 tests/test_integration.py diff --git a/client/app.py b/client/app.py new file mode 100644 index 0000000..f92012a --- /dev/null +++ b/client/app.py @@ -0,0 +1,850 @@ +# P2P Network Communication - Client Application Integration +""" +客户端应用程序集成模块 +负责连接所有客户端组件、连接客户端与服务器、实现完整的消息流程 + +需求: 全部 +""" + +import asyncio +import logging +import sys +import time +from typing import Optional, Callable, Dict, Any, List +from datetime import datetime + +from shared.models import ( + Message, MessageType, UserInfo, UserStatus, ChatMessage, + ConnectionMode, TransferProgress +) +from shared.message_handler import MessageHandler +from client.connection_manager import ConnectionManager, ConnectionState +from client.file_transfer import FileTransferModule +from client.image_processor import ImageProcessor +from client.media_player import MediaPlayer +from client.voice_chat import VoiceChatModule +from config import ClientConfig + +logger = logging.getLogger(__name__) + + +class P2PClientApp: + """ + P2P客户端应用程序 + + 集成所有客户端组件,实现完整的消息流程 + + 组件: + - ConnectionManager: 网络连接管理 + - FileTransferModule: 文件传输 + - ImageProcessor: 图片处理 + - MediaPlayer: 媒体播放 + - VoiceChatModule: 语音聊天 + """ + + def __init__(self, config: Optional[ClientConfig] = None): + """ + 初始化客户端应用程序 + + Args: + config: 客户端配置 + """ + self.config = config or ClientConfig() + + # 用户信息 + self._user_info: Optional[UserInfo] = None + self._online_users: Dict[str, UserInfo] = {} + + # 消息历史 + self._chat_history: Dict[str, List[ChatMessage]] = {} + + # 初始化组件 + self._connection_manager = ConnectionManager(self.config) + self._file_transfer = FileTransferModule(self.config) + self._image_processor = ImageProcessor(self.config) + self._media_player = MediaPlayer() + self._voice_chat: Optional[VoiceChatModule] = None + + # 设置文件传输的消息发送函数 + self._file_transfer.set_send_message_func(self._send_message_async) + + # 回调函数 + self._message_callbacks: List[Callable[[Message], None]] = [] + self._state_callbacks: List[Callable[[ConnectionState, Optional[str]], None]] = [] + self._user_list_callbacks: List[Callable[[List[UserInfo]], None]] = [] + + # 设置连接管理器回调 + self._connection_manager.add_message_callback(self._on_message_received) + self._connection_manager.add_state_callback(self._on_connection_state_changed) + + # 事件循环 + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._running = False + + logger.info("P2PClientApp initialized") + + # ==================== 生命周期管理 ==================== + + async def start(self, user_info: UserInfo) -> bool: + """ + 启动客户端应用程序 + + Args: + user_info: 用户信息 + + Returns: + 启动成功返回True,否则返回False + """ + self._user_info = user_info + self._running = True + + try: + # 连接到服务器 + success = await self._connection_manager.connect_to_server(user_info) + + if success: + logger.info(f"Client started for user: {user_info.username}") + + # 初始化语音聊天模块 + self._voice_chat = VoiceChatModule(self._connection_manager) + + # 请求在线用户列表 + await self._request_online_users() + + return True + else: + logger.error("Failed to connect to server") + return False + + except Exception as e: + logger.error(f"Failed to start client: {e}") + return False + + async def stop(self) -> None: + """停止客户端应用程序""" + self._running = False + + # 结束语音通话 + if self._voice_chat and self._voice_chat.is_in_call: + self._voice_chat.end_call() + + # 停止媒体播放 + self._media_player.stop() + + # 断开连接 + await self._connection_manager.disconnect() + + logger.info("Client stopped") + + @property + def is_connected(self) -> bool: + """是否已连接到服务器""" + return self._connection_manager.is_connected + + @property + def user_info(self) -> Optional[UserInfo]: + """获取当前用户信息""" + return self._user_info + + @property + def connection_state(self) -> ConnectionState: + """获取连接状态""" + return self._connection_manager.state + + # ==================== 消息发送 ==================== + + async def _send_message_async(self, peer_id: str, message: Message) -> bool: + """ + 异步发送消息 + + Args: + peer_id: 目标对等端ID + message: 消息对象 + + Returns: + 发送成功返回True,否则返回False + """ + # 填充发送者ID + message.sender_id = self._user_info.user_id if self._user_info else "" + return await self._connection_manager.send_message(peer_id, message) + + async def send_text_message(self, peer_id: str, content: str) -> bool: + """ + 发送文本消息 + + 实现消息发送 (需求 3.1) + WHEN 用户输入文本消息并发送 THEN Message_Handler SHALL 将消息传递给目标用户 + + Args: + peer_id: 目标对等端ID + content: 消息内容 + + Returns: + 发送成功返回True,否则返回False + """ + if not self._user_info: + logger.error("User not logged in") + return False + + message = Message( + msg_type=MessageType.TEXT, + sender_id=self._user_info.user_id, + receiver_id=peer_id, + timestamp=time.time(), + payload=content.encode('utf-8') + ) + + success = await self._connection_manager.send_message(peer_id, message) + + if success: + # 保存到聊天历史 + chat_msg = ChatMessage( + message_id=message.message_id, + sender_id=self._user_info.user_id, + receiver_id=peer_id, + content_type=MessageType.TEXT, + content=content, + timestamp=datetime.now(), + is_sent=True + ) + self._add_to_history(peer_id, chat_msg) + logger.debug(f"Text message sent to {peer_id}") + + return success + + async def send_file(self, peer_id: str, file_path: str, + progress_callback: Optional[Callable[[TransferProgress], None]] = None) -> bool: + """ + 发送文件 + + 实现文件发送 (需求 4.1, 4.2) + + Args: + peer_id: 目标对等端ID + file_path: 文件路径 + progress_callback: 进度回调 + + Returns: + 发送成功返回True,否则返回False + """ + return await self._file_transfer.send_file(peer_id, file_path, progress_callback) + + async def send_image(self, peer_id: str, image_path: str, + compress: bool = False, + progress_callback: Optional[Callable[[TransferProgress], None]] = None) -> bool: + """ + 发送图片 + + 实现图片发送 (需求 5.1, 5.5) + + Args: + peer_id: 目标对等端ID + image_path: 图片路径 + compress: 是否压缩 + progress_callback: 进度回调 + + Returns: + 发送成功返回True,否则返回False + """ + # 检查图片格式 + if not self._image_processor.is_supported_format(image_path): + logger.error(f"Unsupported image format: {image_path}") + return False + + # 如果需要压缩 + if compress: + compressed_path = self._image_processor.compress_image(image_path) + if compressed_path: + image_path = compressed_path + + # 发送图片文件 + return await self._file_transfer.send_file(peer_id, image_path, progress_callback) + + # ==================== 语音通话 ==================== + + async def start_voice_call(self, peer_id: str) -> bool: + """ + 发起语音通话 + + 实现语音通话发起 (需求 7.1) + + Args: + peer_id: 目标对等端ID + + Returns: + 发起成功返回True,否则返回False + """ + if not self._voice_chat: + logger.error("Voice chat module not initialized") + return False + + return await self._voice_chat.start_call(peer_id) + + async def accept_voice_call(self, peer_id: str) -> bool: + """ + 接听语音通话 + + 实现语音通话接听 (需求 7.2) + + Args: + peer_id: 来电对等端ID + + Returns: + 接听成功返回True,否则返回False + """ + if not self._voice_chat: + return False + + return await self._voice_chat.accept_call(peer_id) + + def reject_voice_call(self, peer_id: str) -> None: + """ + 拒绝语音通话 + + Args: + peer_id: 来电对等端ID + """ + if self._voice_chat: + self._voice_chat.reject_call(peer_id) + + def end_voice_call(self) -> None: + """结束语音通话""" + if self._voice_chat: + self._voice_chat.end_call() + + def mute_voice_call(self, muted: bool) -> None: + """ + 设置静音 + + 实现静音功能 (需求 7.5) + + Args: + muted: 是否静音 + """ + if self._voice_chat: + self._voice_chat.mute(muted) + + # ==================== 媒体播放 ==================== + + def play_audio(self, file_path: str) -> bool: + """ + 播放音频文件 + + 实现音频播放 (需求 6.1, 6.3) + + Args: + file_path: 音频文件路径 + + Returns: + 加载成功返回True,否则返回False + """ + if self._media_player.load_audio(file_path): + self._media_player.play() + return True + return False + + def play_video(self, file_path: str) -> bool: + """ + 播放视频文件 + + 实现视频播放 (需求 6.2, 6.4) + + Args: + file_path: 视频文件路径 + + Returns: + 加载成功返回True,否则返回False + """ + if self._media_player.load_video(file_path): + self._media_player.play() + return True + return False + + def pause_media(self) -> None: + """暂停媒体播放""" + self._media_player.pause() + + def stop_media(self) -> None: + """停止媒体播放""" + self._media_player.stop() + + def seek_media(self, position: float) -> None: + """ + 跳转媒体位置 + + Args: + position: 位置(秒) + """ + self._media_player.seek(position) + + def set_volume(self, volume: float) -> None: + """ + 设置音量 + + Args: + volume: 音量 (0.0-1.0) + """ + self._media_player.set_volume(volume) + + # ==================== 消息接收处理 ==================== + + def _on_message_received(self, message: Message) -> None: + """ + 处理接收到的消息 + + 实现消息接收 (需求 3.2) + WHEN P2P_Client 收到文本消息 THEN P2P_Client SHALL 立即显示消息内容和发送者信息 + + Args: + message: 接收到的消息 + """ + logger.debug(f"Message received: {message.msg_type.value} from {message.sender_id}") + + # 根据消息类型处理 + if message.msg_type == MessageType.TEXT: + self._handle_text_message(message) + elif message.msg_type == MessageType.FILE_REQUEST: + self._handle_file_request(message) + elif message.msg_type == MessageType.FILE_CHUNK: + self._handle_file_chunk(message) + elif message.msg_type == MessageType.FILE_COMPLETE: + self._handle_file_complete(message) + elif message.msg_type == MessageType.IMAGE: + self._handle_image_message(message) + elif message.msg_type == MessageType.VOICE_CALL_REQUEST: + self._handle_voice_call_request(message) + elif message.msg_type == MessageType.VOICE_CALL_ACCEPT: + self._handle_voice_call_accept(message) + elif message.msg_type == MessageType.VOICE_CALL_REJECT: + self._handle_voice_call_reject(message) + elif message.msg_type == MessageType.VOICE_CALL_END: + self._handle_voice_call_end(message) + elif message.msg_type == MessageType.VOICE_DATA: + self._handle_voice_data(message) + elif message.msg_type == MessageType.USER_LIST_RESPONSE: + self._handle_user_list_response(message) + elif message.msg_type == MessageType.ACK: + self._handle_ack_message(message) + elif message.msg_type == MessageType.ERROR: + self._handle_error_message(message) + + # 通知所有回调 + for callback in self._message_callbacks: + try: + callback(message) + except Exception as e: + logger.error(f"Message callback error: {e}") + + def _handle_text_message(self, message: Message) -> None: + """处理文本消息""" + content = message.payload.decode('utf-8') + + # 保存到聊天历史 + chat_msg = ChatMessage( + message_id=message.message_id, + sender_id=message.sender_id, + receiver_id=message.receiver_id, + content_type=MessageType.TEXT, + content=content, + timestamp=datetime.fromtimestamp(message.timestamp), + is_read=False, + is_sent=True + ) + self._add_to_history(message.sender_id, chat_msg) + + logger.info(f"Text message from {message.sender_id}: {content[:50]}...") + + def _handle_file_request(self, message: Message) -> None: + """处理文件请求""" + file_id = self._file_transfer.handle_file_request(message) + if file_id: + logger.info(f"File request received: {file_id}") + + def _handle_file_chunk(self, message: Message) -> None: + """处理文件块""" + self._file_transfer.handle_file_chunk(message) + + def _handle_file_complete(self, message: Message) -> None: + """处理文件完成""" + self._file_transfer.handle_file_complete(message) + + def _handle_image_message(self, message: Message) -> None: + """处理图片消息""" + # 图片作为文件传输处理 + logger.info(f"Image received from {message.sender_id}") + + def _handle_voice_call_request(self, message: Message) -> None: + """处理语音通话请求""" + if self._voice_chat: + self._voice_chat._handle_call_request(message) + logger.info(f"Voice call request from {message.sender_id}") + + def _handle_voice_call_accept(self, message: Message) -> None: + """处理语音通话接受""" + if self._voice_chat: + self._voice_chat._handle_call_accept(message) + logger.info(f"Voice call accepted by {message.sender_id}") + + def _handle_voice_call_reject(self, message: Message) -> None: + """处理语音通话拒绝""" + if self._voice_chat: + self._voice_chat._handle_call_reject(message) + logger.info(f"Voice call rejected by {message.sender_id}") + + def _handle_voice_call_end(self, message: Message) -> None: + """处理语音通话结束""" + if self._voice_chat: + self._voice_chat._handle_call_end(message) + logger.info(f"Voice call ended by {message.sender_id}") + + def _handle_voice_data(self, message: Message) -> None: + """处理语音数据""" + if self._voice_chat: + self._voice_chat._handle_voice_data(message) + + def _handle_user_list_response(self, message: Message) -> None: + """处理用户列表响应""" + import json + try: + users_data = json.loads(message.payload.decode('utf-8')) + self._online_users.clear() + + for user_data in users_data: + user_info = UserInfo.from_dict(user_data) + self._online_users[user_info.user_id] = user_info + + logger.info(f"Online users updated: {len(self._online_users)} users") + + # 通知回调 + for callback in self._user_list_callbacks: + try: + callback(list(self._online_users.values())) + except Exception as e: + logger.error(f"User list callback error: {e}") + + except Exception as e: + logger.error(f"Failed to parse user list: {e}") + + def _handle_ack_message(self, message: Message) -> None: + """处理确认消息""" + info = message.payload.decode('utf-8') + logger.debug(f"ACK received: {info}") + + def _handle_error_message(self, message: Message) -> None: + """处理错误消息""" + error = message.payload.decode('utf-8') + logger.error(f"Error from server: {error}") + + def _on_connection_state_changed(self, state: ConnectionState, reason: Optional[str]) -> None: + """ + 处理连接状态变化 + + Args: + state: 新状态 + reason: 变化原因 + """ + logger.info(f"Connection state changed: {state.value}" + + (f" ({reason})" if reason else "")) + + # 通知回调 + for callback in self._state_callbacks: + try: + callback(state, reason) + except Exception as e: + logger.error(f"State callback error: {e}") + + # ==================== 聊天历史管理 ==================== + + def _add_to_history(self, peer_id: str, message: ChatMessage) -> None: + """ + 添加消息到聊天历史 + + Args: + peer_id: 对等端ID + message: 聊天消息 + """ + if peer_id not in self._chat_history: + self._chat_history[peer_id] = [] + + self._chat_history[peer_id].append(message) + + # 按时间排序 + self._chat_history[peer_id].sort(key=lambda m: m.timestamp) + + def get_chat_history(self, peer_id: str) -> List[ChatMessage]: + """ + 获取聊天历史 + + 实现聊天历史加载 (需求 3.5, 9.2) + WHEN 显示消息历史 THEN P2P_Client SHALL 按时间顺序展示所有消息记录 + + Args: + peer_id: 对等端ID + + Returns: + 聊天消息列表(按时间排序) + """ + return self._chat_history.get(peer_id, []) + + def clear_chat_history(self, peer_id: str) -> None: + """ + 清除聊天历史 + + Args: + peer_id: 对等端ID + """ + if peer_id in self._chat_history: + del self._chat_history[peer_id] + + # ==================== 用户管理 ==================== + + async def _request_online_users(self) -> None: + """请求在线用户列表""" + if not self._user_info: + return + + request = Message( + msg_type=MessageType.USER_LIST_REQUEST, + sender_id=self._user_info.user_id, + receiver_id="server", + timestamp=time.time(), + payload=b"" + ) + + await self._connection_manager.send_message("server", request) + + async def refresh_online_users(self) -> List[UserInfo]: + """ + 刷新在线用户列表 + + 实现获取在线用户 (需求 2.3) + + Returns: + 在线用户列表 + """ + await self._request_online_users() + # 等待响应(简化处理) + await asyncio.sleep(0.5) + return list(self._online_users.values()) + + def get_online_users(self) -> List[UserInfo]: + """ + 获取缓存的在线用户列表 + + Returns: + 在线用户列表 + """ + return list(self._online_users.values()) + + def get_user_info(self, user_id: str) -> Optional[UserInfo]: + """ + 获取用户信息 + + Args: + user_id: 用户ID + + Returns: + 用户信息,如果不存在返回None + """ + return self._online_users.get(user_id) + + # ==================== 回调注册 ==================== + + def add_message_callback(self, callback: Callable[[Message], None]) -> None: + """ + 添加消息回调 + + Args: + callback: 回调函数 + """ + self._message_callbacks.append(callback) + + def remove_message_callback(self, callback: Callable[[Message], None]) -> None: + """ + 移除消息回调 + + Args: + callback: 回调函数 + """ + if callback in self._message_callbacks: + self._message_callbacks.remove(callback) + + def add_state_callback(self, callback: Callable[[ConnectionState, Optional[str]], None]) -> None: + """ + 添加状态回调 + + Args: + callback: 回调函数 + """ + self._state_callbacks.append(callback) + + def remove_state_callback(self, callback: Callable[[ConnectionState, Optional[str]], None]) -> None: + """ + 移除状态回调 + + Args: + callback: 回调函数 + """ + if callback in self._state_callbacks: + self._state_callbacks.remove(callback) + + def add_user_list_callback(self, callback: Callable[[List[UserInfo]], None]) -> None: + """ + 添加用户列表回调 + + Args: + callback: 回调函数 + """ + self._user_list_callbacks.append(callback) + + def remove_user_list_callback(self, callback: Callable[[List[UserInfo]], None]) -> None: + """ + 移除用户列表回调 + + Args: + callback: 回调函数 + """ + if callback in self._user_list_callbacks: + self._user_list_callbacks.remove(callback) + + # ==================== 连接模式 ==================== + + def get_connection_mode(self, peer_id: str) -> ConnectionMode: + """ + 获取与对等端的连接模式 + + Args: + peer_id: 对等端ID + + Returns: + 连接模式 + """ + return self._connection_manager.get_connection_mode(peer_id) + + async def discover_lan_peers(self) -> List: + """ + 发现局域网对等端 + + 实现局域网发现 (需求 1.2) + + Returns: + 发现的对等端列表 + """ + return await self._connection_manager.discover_lan_peers() + + async def optimize_connection(self, peer_id: str) -> ConnectionMode: + """ + 优化与对等端的连接 + + 尝试建立P2P直连 (需求 1.1, 1.2, 1.3) + + Args: + peer_id: 对等端ID + + Returns: + 优化后的连接模式 + """ + return await self._connection_manager.auto_select_connection_mode(peer_id) + + # ==================== 组件访问 ==================== + + @property + def connection_manager(self) -> ConnectionManager: + """获取连接管理器""" + return self._connection_manager + + @property + def file_transfer(self) -> FileTransferModule: + """获取文件传输模块""" + return self._file_transfer + + @property + def image_processor(self) -> ImageProcessor: + """获取图片处理器""" + return self._image_processor + + @property + def media_player(self) -> MediaPlayer: + """获取媒体播放器""" + return self._media_player + + @property + def voice_chat(self) -> Optional[VoiceChatModule]: + """获取语音聊天模块""" + return self._voice_chat + + # ==================== 统计信息 ==================== + + def get_stats(self) -> Dict[str, Any]: + """ + 获取应用程序统计信息 + + Returns: + 统计信息字典 + """ + conn_stats = self._connection_manager.get_connection_stats() + + return { + "user": self._user_info.username if self._user_info else None, + "connection": conn_stats, + "online_users": len(self._online_users), + "chat_sessions": len(self._chat_history), + "total_messages": sum(len(msgs) for msgs in self._chat_history.values()), + "active_transfers": len(self._file_transfer._active_transfers), + "voice_call_active": self._voice_chat.is_in_call if self._voice_chat else False + } + + +# ==================== 便捷函数 ==================== + +def create_client(server_host: str = "127.0.0.1", + server_port: int = 8888) -> P2PClientApp: + """ + 创建客户端实例 + + Args: + server_host: 服务器地址 + server_port: 服务器端口 + + Returns: + 客户端实例 + """ + config = ClientConfig( + server_host=server_host, + server_port=server_port + ) + return P2PClientApp(config) + + +async def quick_connect(username: str, display_name: str = "", + server_host: str = "127.0.0.1", + server_port: int = 8888) -> Optional[P2PClientApp]: + """ + 快速连接到服务器 + + Args: + username: 用户名 + display_name: 显示名称 + server_host: 服务器地址 + server_port: 服务器端口 + + Returns: + 连接成功返回客户端实例,否则返回None + """ + import uuid + + client = create_client(server_host, server_port) + + user_info = UserInfo( + user_id=str(uuid.uuid4()), + username=username, + display_name=display_name or username, + status=UserStatus.ONLINE + ) + + success = await client.start(user_info) + + if success: + return client + else: + return None diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..87a3578 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,643 @@ +# P2P Network Communication - Integration Tests +""" +客户端-服务器集成测试 +测试端到端消息传递、文件传输和语音通话流程 + +需求: 全部 +""" + +import asyncio +import pytest +import time +import uuid +import os +import tempfile +from unittest.mock import AsyncMock, MagicMock, patch +from datetime import datetime + +from client.app import P2PClientApp, create_client, quick_connect +from client.connection_manager import ConnectionManager, ConnectionState, Connection +from server.relay_server import RelayServer, ClientConnection +from shared.models import ( + Message, MessageType, UserInfo, UserStatus, + ChatMessage, ConnectionMode, TransferProgress +) +from shared.message_handler import MessageHandler +from config import ClientConfig, ServerConfig + + +class TestP2PClientAppInit: + """测试客户端应用程序初始化""" + + def test_init_with_default_config(self): + """测试使用默认配置初始化""" + app = P2PClientApp() + + assert app.config is not None + assert app.user_info is None + assert app.is_connected is False + assert app.connection_state == ConnectionState.DISCONNECTED + + def test_init_with_custom_config(self): + """测试使用自定义配置初始化""" + config = ClientConfig( + server_host="192.168.1.100", + server_port=9999 + ) + app = P2PClientApp(config) + + assert app.config.server_host == "192.168.1.100" + assert app.config.server_port == 9999 + + def test_components_initialized(self): + """测试组件初始化""" + app = P2PClientApp() + + assert app.connection_manager is not None + assert app.file_transfer is not None + assert app.image_processor is not None + assert app.media_player is not None + # voice_chat 在 start() 后初始化 + assert app.voice_chat is None + + +class TestCallbackRegistration: + """测试回调注册""" + + def test_message_callback_registration(self): + """测试消息回调注册""" + app = P2PClientApp() + callbacks_called = [] + + def callback(msg): + callbacks_called.append(msg) + + app.add_message_callback(callback) + assert callback in app._message_callbacks + + app.remove_message_callback(callback) + assert callback not in app._message_callbacks + + def test_state_callback_registration(self): + """测试状态回调注册""" + app = P2PClientApp() + callbacks_called = [] + + def callback(state, reason): + callbacks_called.append((state, reason)) + + app.add_state_callback(callback) + assert callback in app._state_callbacks + + app.remove_state_callback(callback) + assert callback not in app._state_callbacks + + def test_user_list_callback_registration(self): + """测试用户列表回调注册""" + app = P2PClientApp() + callbacks_called = [] + + def callback(users): + callbacks_called.append(users) + + app.add_user_list_callback(callback) + assert callback in app._user_list_callbacks + + app.remove_user_list_callback(callback) + assert callback not in app._user_list_callbacks + + +class TestChatHistory: + """测试聊天历史管理""" + + def test_add_to_history(self): + """测试添加消息到历史""" + app = P2PClientApp() + + msg = ChatMessage( + message_id="msg1", + sender_id="user1", + receiver_id="user2", + content_type=MessageType.TEXT, + content="Hello", + timestamp=datetime.now() + ) + + app._add_to_history("user1", msg) + + history = app.get_chat_history("user1") + assert len(history) == 1 + assert history[0].content == "Hello" + + def test_history_sorted_by_time(self): + """测试历史按时间排序""" + app = P2PClientApp() + + msg1 = ChatMessage( + message_id="msg1", + sender_id="user1", + receiver_id="user2", + content_type=MessageType.TEXT, + content="First", + timestamp=datetime(2024, 1, 1, 10, 0, 0) + ) + + msg2 = ChatMessage( + message_id="msg2", + sender_id="user1", + receiver_id="user2", + content_type=MessageType.TEXT, + content="Second", + timestamp=datetime(2024, 1, 1, 9, 0, 0) # 更早的时间 + ) + + app._add_to_history("user1", msg1) + app._add_to_history("user1", msg2) + + history = app.get_chat_history("user1") + assert len(history) == 2 + assert history[0].content == "Second" # 更早的消息在前 + assert history[1].content == "First" + + def test_clear_history(self): + """测试清除历史""" + app = P2PClientApp() + + msg = ChatMessage( + message_id="msg1", + sender_id="user1", + receiver_id="user2", + content_type=MessageType.TEXT, + content="Hello", + timestamp=datetime.now() + ) + + app._add_to_history("user1", msg) + assert len(app.get_chat_history("user1")) == 1 + + app.clear_chat_history("user1") + assert len(app.get_chat_history("user1")) == 0 + + def test_get_empty_history(self): + """测试获取空历史""" + app = P2PClientApp() + + history = app.get_chat_history("nonexistent") + assert history == [] + + +class TestMessageHandling: + """测试消息处理""" + + def test_handle_text_message(self): + """测试处理文本消息""" + app = P2PClientApp() + app._user_info = UserInfo( + user_id="user2", + username="user2", + display_name="User 2" + ) + + message = Message( + msg_type=MessageType.TEXT, + sender_id="user1", + receiver_id="user2", + timestamp=time.time(), + payload=b"Hello, World!" + ) + + app._handle_text_message(message) + + history = app.get_chat_history("user1") + assert len(history) == 1 + assert history[0].content == "Hello, World!" + assert history[0].sender_id == "user1" + + def test_handle_user_list_response(self): + """测试处理用户列表响应""" + import json + + app = P2PClientApp() + + users_data = [ + { + "user_id": "user1", + "username": "user1", + "display_name": "User 1", + "status": "online" + }, + { + "user_id": "user2", + "username": "user2", + "display_name": "User 2", + "status": "online" + } + ] + + message = Message( + msg_type=MessageType.USER_LIST_RESPONSE, + sender_id="server", + receiver_id="me", + timestamp=time.time(), + payload=json.dumps(users_data).encode('utf-8') + ) + + app._handle_user_list_response(message) + + assert len(app._online_users) == 2 + assert "user1" in app._online_users + assert "user2" in app._online_users + + def test_handle_ack_message(self): + """测试处理确认消息""" + app = P2PClientApp() + + message = Message( + msg_type=MessageType.ACK, + sender_id="server", + receiver_id="me", + timestamp=time.time(), + payload=b"Message delivered" + ) + + # 不应该抛出异常 + app._handle_ack_message(message) + + def test_handle_error_message(self): + """测试处理错误消息""" + app = P2PClientApp() + + message = Message( + msg_type=MessageType.ERROR, + sender_id="server", + receiver_id="me", + timestamp=time.time(), + payload=b"User not found" + ) + + # 不应该抛出异常 + app._handle_error_message(message) + + +class TestOnlineUsers: + """测试在线用户管理""" + + def test_get_online_users_empty(self): + """测试获取空的在线用户列表""" + app = P2PClientApp() + + users = app.get_online_users() + assert users == [] + + def test_get_user_info(self): + """测试获取用户信息""" + app = P2PClientApp() + + user = UserInfo( + user_id="user1", + username="user1", + display_name="User 1" + ) + app._online_users["user1"] = user + + info = app.get_user_info("user1") + assert info is not None + assert info.username == "user1" + + info = app.get_user_info("nonexistent") + assert info is None + + +class TestConnectionMode: + """测试连接模式""" + + def test_get_connection_mode(self): + """测试获取连接模式""" + app = P2PClientApp() + + # 默认应该是中转模式 + mode = app.get_connection_mode("unknown_peer") + assert mode == ConnectionMode.RELAY + + +class TestStats: + """测试统计信息""" + + def test_get_stats_initial(self): + """测试获取初始统计信息""" + app = P2PClientApp() + + stats = app.get_stats() + + assert stats["user"] is None + assert stats["online_users"] == 0 + assert stats["chat_sessions"] == 0 + assert stats["total_messages"] == 0 + assert stats["voice_call_active"] is False + + def test_get_stats_with_data(self): + """测试获取有数据的统计信息""" + app = P2PClientApp() + app._user_info = UserInfo( + user_id="user1", + username="testuser", + display_name="Test User" + ) + + # 添加一些聊天历史 + msg = ChatMessage( + message_id="msg1", + sender_id="user1", + receiver_id="user2", + content_type=MessageType.TEXT, + content="Hello", + timestamp=datetime.now() + ) + app._add_to_history("user2", msg) + + stats = app.get_stats() + + assert stats["user"] == "testuser" + assert stats["chat_sessions"] == 1 + assert stats["total_messages"] == 1 + + +class TestCreateClient: + """测试便捷函数""" + + def test_create_client_default(self): + """测试使用默认参数创建客户端""" + client = create_client() + + assert client is not None + assert client.config.server_host == "127.0.0.1" + assert client.config.server_port == 8888 + + def test_create_client_custom(self): + """测试使用自定义参数创建客户端""" + client = create_client( + server_host="192.168.1.100", + server_port=9999 + ) + + assert client.config.server_host == "192.168.1.100" + assert client.config.server_port == 9999 + + +class TestMessageFlow: + """测试完整消息流程""" + + @pytest.fixture + def mock_connection_manager(self): + """创建模拟的连接管理器""" + cm = MagicMock(spec=ConnectionManager) + cm.is_connected = True + cm.state = ConnectionState.CONNECTED + cm.send_message = AsyncMock(return_value=True) + cm.get_connection_mode = MagicMock(return_value=ConnectionMode.RELAY) + return cm + + @pytest.mark.asyncio + async def test_send_text_message_flow(self, mock_connection_manager): + """测试发送文本消息流程""" + app = P2PClientApp() + app._connection_manager = mock_connection_manager + app._user_info = UserInfo( + user_id="sender", + username="sender", + display_name="Sender" + ) + + success = await app.send_text_message("receiver", "Hello!") + + assert success + mock_connection_manager.send_message.assert_called_once() + + # 检查消息被添加到历史 + history = app.get_chat_history("receiver") + assert len(history) == 1 + assert history[0].content == "Hello!" + + @pytest.mark.asyncio + async def test_send_text_message_not_logged_in(self): + """测试未登录时发送消息""" + app = P2PClientApp() + app._user_info = None + + success = await app.send_text_message("receiver", "Hello!") + + assert not success + + def test_receive_text_message_flow(self): + """测试接收文本消息流程""" + app = P2PClientApp() + app._user_info = UserInfo( + user_id="receiver", + username="receiver", + display_name="Receiver" + ) + + # 模拟接收消息 + message = Message( + msg_type=MessageType.TEXT, + sender_id="sender", + receiver_id="receiver", + timestamp=time.time(), + payload=b"Hello from sender!" + ) + + # 触发消息处理 + app._on_message_received(message) + + # 检查消息被添加到历史 + history = app.get_chat_history("sender") + assert len(history) == 1 + assert history[0].content == "Hello from sender!" + + def test_message_callback_triggered(self): + """测试消息回调被触发""" + app = P2PClientApp() + received_messages = [] + + def callback(msg): + received_messages.append(msg) + + app.add_message_callback(callback) + + message = Message( + msg_type=MessageType.TEXT, + sender_id="sender", + receiver_id="receiver", + timestamp=time.time(), + payload=b"Test" + ) + + app._on_message_received(message) + + assert len(received_messages) == 1 + assert received_messages[0].msg_type == MessageType.TEXT + + +class TestStateManagement: + """测试状态管理""" + + def test_state_callback_triggered(self): + """测试状态回调被触发""" + app = P2PClientApp() + state_changes = [] + + def callback(state, reason): + state_changes.append((state, reason)) + + app.add_state_callback(callback) + + # 模拟状态变化 + app._on_connection_state_changed(ConnectionState.CONNECTING, "Test") + + assert len(state_changes) == 1 + assert state_changes[0][0] == ConnectionState.CONNECTING + assert state_changes[0][1] == "Test" + + +class TestMediaPlayback: + """测试媒体播放""" + + def test_pause_media(self): + """测试暂停媒体""" + app = P2PClientApp() + + # 不应该抛出异常 + app.pause_media() + + def test_stop_media(self): + """测试停止媒体""" + app = P2PClientApp() + + # 不应该抛出异常 + app.stop_media() + + def test_set_volume(self): + """测试设置音量""" + app = P2PClientApp() + + # 不应该抛出异常 + app.set_volume(0.5) + + +class TestVoiceCall: + """测试语音通话""" + + def test_voice_call_not_initialized(self): + """测试语音模块未初始化时的操作""" + app = P2PClientApp() + + # voice_chat 在 start() 后才初始化 + assert app.voice_chat is None + + # 这些操作不应该抛出异常 + app.reject_voice_call("peer") + app.end_voice_call() + app.mute_voice_call(True) + + +class TestEndToEndIntegration: + """端到端集成测试""" + + @pytest.fixture + def server_config(self): + """服务器配置""" + return ServerConfig( + host="127.0.0.1", + port=18888, # 使用不同端口避免冲突 + max_connections=10 + ) + + @pytest.fixture + def client_config(self, server_config): + """客户端配置""" + return ClientConfig( + server_host=server_config.host, + server_port=server_config.port + ) + + @pytest.mark.asyncio + async def test_client_server_message_relay(self, server_config, client_config): + """测试客户端-服务器消息中转""" + # 这是一个集成测试框架,实际测试需要启动真实服务器 + # 这里只验证组件可以正确初始化 + + server = RelayServer(server_config) + client1 = P2PClientApp(client_config) + client2 = P2PClientApp(client_config) + + assert server is not None + assert client1 is not None + assert client2 is not None + + # 验证配置正确 + assert server.port == client_config.server_port + assert client1.config.server_host == server_config.host + + +class TestComponentIntegration: + """组件集成测试""" + + def test_file_transfer_integration(self): + """测试文件传输模块集成""" + app = P2PClientApp() + + # 验证文件传输模块已集成 + assert app.file_transfer is not None + assert app.file_transfer._send_message is not None + + def test_image_processor_integration(self): + """测试图片处理器集成""" + app = P2PClientApp() + + # 验证图片处理器已集成 + assert app.image_processor is not None + + def test_media_player_integration(self): + """测试媒体播放器集成""" + app = P2PClientApp() + + # 验证媒体播放器已集成 + assert app.media_player is not None + + +class TestUserListCallback: + """测试用户列表回调""" + + def test_user_list_callback_triggered(self): + """测试用户列表回调被触发""" + import json + + app = P2PClientApp() + received_users = [] + + def callback(users): + received_users.extend(users) + + app.add_user_list_callback(callback) + + users_data = [ + { + "user_id": "user1", + "username": "user1", + "display_name": "User 1", + "status": "online" + } + ] + + message = Message( + msg_type=MessageType.USER_LIST_RESPONSE, + sender_id="server", + receiver_id="me", + timestamp=time.time(), + payload=json.dumps(users_data).encode('utf-8') + ) + + app._handle_user_list_response(message) + + assert len(received_users) == 1 + assert received_users[0].user_id == "user1"