You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

905 lines
29 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# P2P Network Communication - Client Application Integration
"""
客户端应用程序集成模块
负责连接所有客户端组件、连接客户端与服务器、实现完整的消息流程
需求: 全部
"""
import asyncio
import logging
import os
import sys
import time
from typing import Optional, Callable, Dict, Any, List
from datetime import datetime
from shared.models import (
Message, MessageType, UserInfo, UserStatus, ChatMessage,
ConnectionMode, TransferProgress
)
from shared.message_handler import MessageHandler
from client.connection_manager import ConnectionManager, ConnectionState
from client.file_transfer import FileTransferModule
from client.image_processor import ImageProcessor
from client.media_player import MediaPlayer
from client.voice_chat import VoiceChatModule
from config import ClientConfig
logger = logging.getLogger(__name__)
class P2PClientApp:
"""
P2P客户端应用程序
集成所有客户端组件,实现完整的消息流程
组件:
- ConnectionManager: 网络连接管理
- FileTransferModule: 文件传输
- ImageProcessor: 图片处理
- MediaPlayer: 媒体播放
- VoiceChatModule: 语音聊天
"""
def __init__(self, config: Optional[ClientConfig] = None):
"""
初始化客户端应用程序
Args:
config: 客户端配置
"""
self.config = config or ClientConfig()
# 用户信息
self._user_info: Optional[UserInfo] = None
self._online_users: Dict[str, UserInfo] = {}
# 消息历史
self._chat_history: Dict[str, List[ChatMessage]] = {}
# 初始化组件
self._connection_manager = ConnectionManager(self.config)
self._file_transfer = FileTransferModule(self.config)
self._image_processor = ImageProcessor(self.config)
self._media_player = MediaPlayer()
self._voice_chat: Optional[VoiceChatModule] = None
# 设置文件传输的消息发送函数
self._file_transfer.set_send_message_func(self._send_message_async)
# 文件接收回调
self._file_received_callbacks: List[Callable[[str, str, str], None]] = []
self._file_transfer.add_file_received_callback(self._on_file_received)
# 回调函数
self._message_callbacks: List[Callable[[Message], None]] = []
self._state_callbacks: List[Callable[[ConnectionState, Optional[str]], None]] = []
self._user_list_callbacks: List[Callable[[List[UserInfo]], None]] = []
# 设置连接管理器回调
self._connection_manager.add_message_callback(self._on_message_received)
self._connection_manager.add_state_callback(self._on_connection_state_changed)
# 事件循环
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._running = False
logger.info("P2PClientApp initialized")
# ==================== 生命周期管理 ====================
async def start(self, user_info: UserInfo) -> bool:
"""
启动客户端应用程序
Args:
user_info: 用户信息
Returns:
启动成功返回True否则返回False
"""
self._user_info = user_info
self._running = True
try:
# 连接到服务器
success = await self._connection_manager.connect_to_server(user_info)
if success:
logger.info(f"Client started for user: {user_info.username}")
# 初始化语音聊天模块
self._voice_chat = VoiceChatModule(self.config)
self._voice_chat.set_user_info(user_info.user_id, user_info.username)
self._voice_chat.set_send_message_callback(self._send_message_async)
logger.info(f"Voice chat module initialized for user: {user_info.user_id}")
# 请求在线用户列表
await self._request_online_users()
return True
else:
logger.error("Failed to connect to server")
return False
except Exception as e:
logger.error(f"Failed to start client: {e}")
return False
async def stop(self) -> None:
"""停止客户端应用程序"""
self._running = False
# 结束语音通话
if self._voice_chat and self._voice_chat.is_in_call:
self._voice_chat.end_call()
# 停止媒体播放
self._media_player.stop()
# 断开连接
await self._connection_manager.disconnect()
logger.info("Client stopped")
@property
def is_connected(self) -> bool:
"""是否已连接到服务器"""
return self._connection_manager.is_connected
@property
def user_info(self) -> Optional[UserInfo]:
"""获取当前用户信息"""
return self._user_info
@property
def connection_state(self) -> ConnectionState:
"""获取连接状态"""
return self._connection_manager.state
# ==================== 消息发送 ====================
async def _send_message_async(self, peer_id: str, message: Message) -> bool:
"""
异步发送消息
Args:
peer_id: 目标对等端ID
message: 消息对象
Returns:
发送成功返回True否则返回False
"""
# 填充发送者ID
message.sender_id = self._user_info.user_id if self._user_info else ""
return await self._connection_manager.send_message(peer_id, message)
async def send_text_message(self, peer_id: str, content: str) -> bool:
"""
发送文本消息
实现消息发送 (需求 3.1)
WHEN 用户输入文本消息并发送 THEN Message_Handler SHALL 将消息传递给目标用户
Args:
peer_id: 目标对等端ID
content: 消息内容
Returns:
发送成功返回True否则返回False
"""
if not self._user_info:
logger.error("User not logged in")
return False
message = Message(
msg_type=MessageType.TEXT,
sender_id=self._user_info.user_id,
receiver_id=peer_id,
timestamp=time.time(),
payload=content.encode('utf-8')
)
success = await self._connection_manager.send_message(peer_id, message)
if success:
# 保存到聊天历史
chat_msg = ChatMessage(
message_id=message.message_id,
sender_id=self._user_info.user_id,
receiver_id=peer_id,
content_type=MessageType.TEXT,
content=content,
timestamp=datetime.now(),
is_sent=True
)
self._add_to_history(peer_id, chat_msg)
logger.debug(f"Text message sent to {peer_id}")
return success
async def send_file(self, peer_id: str, file_path: str,
progress_callback: Optional[Callable[[TransferProgress], None]] = None) -> bool:
"""
发送文件
实现文件发送 (需求 4.1, 4.2)
Args:
peer_id: 目标对等端ID
file_path: 文件路径
progress_callback: 进度回调
Returns:
发送成功返回True否则返回False
"""
return await self._file_transfer.send_file(peer_id, file_path, progress_callback)
async def send_image(self, peer_id: str, image_path: str,
compress: bool = False,
progress_callback: Optional[Callable[[TransferProgress], None]] = None) -> bool:
"""
发送图片
实现图片发送 (需求 5.1, 5.5)
Args:
peer_id: 目标对等端ID
image_path: 图片路径
compress: 是否压缩
progress_callback: 进度回调
Returns:
发送成功返回True否则返回False
"""
# 检查图片格式
if not self._image_processor.is_supported_format(image_path):
logger.error(f"Unsupported image format: {image_path}")
return False
# 如果需要压缩
if compress:
compressed_path = self._image_processor.compress_image(image_path)
if compressed_path:
image_path = compressed_path
# 发送图片文件
return await self._file_transfer.send_file(peer_id, image_path, progress_callback)
# ==================== 语音通话 ====================
async def start_voice_call(self, peer_id: str) -> bool:
"""
发起语音通话
实现语音通话发起 (需求 7.1)
Args:
peer_id: 目标对等端ID
Returns:
发起成功返回True否则返回False
"""
if not self._voice_chat:
logger.error("Voice chat module not initialized")
return False
return await self._voice_chat.start_call(peer_id)
async def accept_voice_call(self, peer_id: str) -> bool:
"""
接听语音通话
实现语音通话接听 (需求 7.2)
Args:
peer_id: 来电对等端ID
Returns:
接听成功返回True否则返回False
"""
if not self._voice_chat:
return False
return await self._voice_chat.accept_call(peer_id)
def reject_voice_call(self, peer_id: str) -> None:
"""
拒绝语音通话
Args:
peer_id: 来电对等端ID
"""
if self._voice_chat:
self._voice_chat.reject_call(peer_id)
def end_voice_call(self) -> None:
"""结束语音通话"""
if self._voice_chat:
self._voice_chat.end_call()
def mute_voice_call(self, muted: bool) -> None:
"""
设置静音
实现静音功能 (需求 7.5)
Args:
muted: 是否静音
"""
if self._voice_chat:
self._voice_chat.mute(muted)
# ==================== 媒体播放 ====================
def play_audio(self, file_path: str) -> bool:
"""
播放音频文件
实现音频播放 (需求 6.1, 6.3)
Args:
file_path: 音频文件路径
Returns:
加载成功返回True否则返回False
"""
if self._media_player.load_audio(file_path):
self._media_player.play()
return True
return False
def play_video(self, file_path: str) -> bool:
"""
播放视频文件
实现视频播放 (需求 6.2, 6.4)
Args:
file_path: 视频文件路径
Returns:
加载成功返回True否则返回False
"""
if self._media_player.load_video(file_path):
self._media_player.play()
return True
return False
def pause_media(self) -> None:
"""暂停媒体播放"""
self._media_player.pause()
def stop_media(self) -> None:
"""停止媒体播放"""
self._media_player.stop()
def seek_media(self, position: float) -> None:
"""
跳转媒体位置
Args:
position: 位置(秒)
"""
self._media_player.seek(position)
def set_volume(self, volume: float) -> None:
"""
设置音量
Args:
volume: 音量 (0.0-1.0)
"""
self._media_player.set_volume(volume)
# ==================== 消息接收处理 ====================
def _on_message_received(self, message: Message) -> None:
"""
处理接收到的消息
实现消息接收 (需求 3.2)
WHEN P2P_Client 收到文本消息 THEN P2P_Client SHALL 立即显示消息内容和发送者信息
Args:
message: 接收到的消息
"""
logger.debug(f"Message received: {message.msg_type.value} from {message.sender_id}")
# 根据消息类型处理
if message.msg_type == MessageType.TEXT:
self._handle_text_message(message)
elif message.msg_type == MessageType.FILE_REQUEST:
self._handle_file_request(message)
elif message.msg_type == MessageType.FILE_CHUNK:
self._handle_file_chunk(message)
elif message.msg_type == MessageType.FILE_COMPLETE:
self._handle_file_complete(message)
elif message.msg_type == MessageType.IMAGE:
self._handle_image_message(message)
elif message.msg_type == MessageType.VOICE_CALL_REQUEST:
self._handle_voice_call_request(message)
elif message.msg_type == MessageType.VOICE_CALL_ACCEPT:
self._handle_voice_call_accept(message)
elif message.msg_type == MessageType.VOICE_CALL_REJECT:
self._handle_voice_call_reject(message)
elif message.msg_type == MessageType.VOICE_CALL_END:
self._handle_voice_call_end(message)
elif message.msg_type == MessageType.VOICE_DATA:
self._handle_voice_data(message)
elif message.msg_type == MessageType.USER_LIST_RESPONSE:
self._handle_user_list_response(message)
elif message.msg_type == MessageType.ACK:
self._handle_ack_message(message)
elif message.msg_type == MessageType.ERROR:
self._handle_error_message(message)
# 通知所有回调
for callback in self._message_callbacks:
try:
callback(message)
except Exception as e:
logger.error(f"Message callback error: {e}")
def _handle_text_message(self, message: Message) -> None:
"""处理文本消息"""
content = message.payload.decode('utf-8')
# 保存到聊天历史
chat_msg = ChatMessage(
message_id=message.message_id,
sender_id=message.sender_id,
receiver_id=message.receiver_id,
content_type=MessageType.TEXT,
content=content,
timestamp=datetime.fromtimestamp(message.timestamp),
is_read=False,
is_sent=True
)
self._add_to_history(message.sender_id, chat_msg)
logger.info(f"Text message from {message.sender_id}: {content[:50]}...")
def _handle_file_request(self, message: Message) -> None:
"""处理文件请求"""
file_id = self._file_transfer.handle_file_request(message)
if file_id:
logger.info(f"File request received: {file_id}")
def _handle_file_chunk(self, message: Message) -> None:
"""处理文件块"""
self._file_transfer.handle_file_chunk(message)
def _handle_file_complete(self, message: Message) -> None:
"""处理文件完成"""
self._file_transfer.handle_file_complete(message)
def _handle_image_message(self, message: Message) -> None:
"""处理图片消息"""
# 图片作为文件传输处理
logger.info(f"Image received from {message.sender_id}")
def _handle_voice_call_request(self, message: Message) -> None:
"""处理语音通话请求"""
if self._voice_chat:
asyncio.create_task(self._voice_chat._handle_call_request(message))
logger.info(f"Voice call request from {message.sender_id}")
def _handle_voice_call_accept(self, message: Message) -> None:
"""处理语音通话接受"""
if self._voice_chat:
asyncio.create_task(self._voice_chat._handle_call_accept(message))
logger.info(f"Voice call accepted by {message.sender_id}")
def _handle_voice_call_reject(self, message: Message) -> None:
"""处理语音通话拒绝"""
if self._voice_chat:
asyncio.create_task(self._voice_chat._handle_call_reject(message))
logger.info(f"Voice call rejected by {message.sender_id}")
def _handle_voice_call_end(self, message: Message) -> None:
"""处理语音通话结束"""
if self._voice_chat:
asyncio.create_task(self._voice_chat._handle_call_end(message))
logger.info(f"Voice call ended by {message.sender_id}")
def _handle_voice_data(self, message: Message) -> None:
"""处理语音数据"""
if self._voice_chat:
self._voice_chat._handle_voice_data(message)
def _handle_user_list_response(self, message: Message) -> None:
"""处理用户列表响应"""
import json
try:
users_data = json.loads(message.payload.decode('utf-8'))
self._online_users.clear()
for user_data in users_data:
user_info = UserInfo.from_dict(user_data)
self._online_users[user_info.user_id] = user_info
logger.info(f"Online users updated: {len(self._online_users)} users")
# 通知回调
for callback in self._user_list_callbacks:
try:
callback(list(self._online_users.values()))
except Exception as e:
logger.error(f"User list callback error: {e}")
except Exception as e:
logger.error(f"Failed to parse user list: {e}")
def _handle_ack_message(self, message: Message) -> None:
"""处理确认消息"""
info = message.payload.decode('utf-8')
logger.debug(f"ACK received: {info}")
def _handle_error_message(self, message: Message) -> None:
"""处理错误消息"""
error = message.payload.decode('utf-8')
logger.error(f"Error from server: {error}")
def _on_file_received(self, sender_id: str, file_name: str, file_path: str) -> None:
"""
处理文件接收完成
Args:
sender_id: 发送者ID
file_name: 文件名
file_path: 保存路径
"""
logger.info(f"P2PClientApp: File received from {sender_id}: {file_name} -> {file_path}")
# 判断是否是图片
image_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp'}
ext = os.path.splitext(file_name)[1].lower()
content_type = MessageType.IMAGE if ext in image_extensions else MessageType.FILE_REQUEST
# 保存到聊天历史
chat_msg = ChatMessage(
message_id=f"file_{time.time()}",
sender_id=sender_id,
receiver_id=self._user_info.user_id if self._user_info else "",
content_type=content_type,
content=file_path, # 保存文件路径
timestamp=datetime.now(),
is_read=False,
is_sent=True
)
self._add_to_history(sender_id, chat_msg)
# 通知回调
logger.info(f"P2PClientApp: Notifying {len(self._file_received_callbacks)} callbacks")
for callback in self._file_received_callbacks:
try:
callback(sender_id, file_name, file_path)
except Exception as e:
logger.error(f"File received callback error: {e}")
def add_file_received_callback(self, callback: Callable[[str, str, str], None]) -> None:
"""添加文件接收回调"""
self._file_received_callbacks.append(callback)
def remove_file_received_callback(self, callback: Callable[[str, str, str], None]) -> None:
"""移除文件接收回调"""
if callback in self._file_received_callbacks:
self._file_received_callbacks.remove(callback)
def _on_connection_state_changed(self, state: ConnectionState, reason: Optional[str]) -> None:
"""
处理连接状态变化
Args:
state: 新状态
reason: 变化原因
"""
logger.info(f"Connection state changed: {state.value}" +
(f" ({reason})" if reason else ""))
# 通知回调
for callback in self._state_callbacks:
try:
callback(state, reason)
except Exception as e:
logger.error(f"State callback error: {e}")
# ==================== 聊天历史管理 ====================
def _add_to_history(self, peer_id: str, message: ChatMessage) -> None:
"""
添加消息到聊天历史
Args:
peer_id: 对等端ID
message: 聊天消息
"""
if peer_id not in self._chat_history:
self._chat_history[peer_id] = []
self._chat_history[peer_id].append(message)
# 按时间排序
self._chat_history[peer_id].sort(key=lambda m: m.timestamp)
def get_chat_history(self, peer_id: str) -> List[ChatMessage]:
"""
获取聊天历史
实现聊天历史加载 (需求 3.5, 9.2)
WHEN 显示消息历史 THEN P2P_Client SHALL 按时间顺序展示所有消息记录
Args:
peer_id: 对等端ID
Returns:
聊天消息列表(按时间排序)
"""
return self._chat_history.get(peer_id, [])
def clear_chat_history(self, peer_id: str) -> None:
"""
清除聊天历史
Args:
peer_id: 对等端ID
"""
if peer_id in self._chat_history:
del self._chat_history[peer_id]
# ==================== 用户管理 ====================
async def _request_online_users(self) -> None:
"""请求在线用户列表"""
if not self._user_info:
return
request = Message(
msg_type=MessageType.USER_LIST_REQUEST,
sender_id=self._user_info.user_id,
receiver_id="server",
timestamp=time.time(),
payload=b""
)
await self._connection_manager.send_message("server", request)
async def refresh_online_users(self) -> List[UserInfo]:
"""
刷新在线用户列表
实现获取在线用户 (需求 2.3)
Returns:
在线用户列表
"""
await self._request_online_users()
# 等待响应(简化处理)
await asyncio.sleep(0.5)
return list(self._online_users.values())
def get_online_users(self) -> List[UserInfo]:
"""
获取缓存的在线用户列表
Returns:
在线用户列表
"""
return list(self._online_users.values())
def get_user_info(self, user_id: str) -> Optional[UserInfo]:
"""
获取用户信息
Args:
user_id: 用户ID
Returns:
用户信息如果不存在返回None
"""
return self._online_users.get(user_id)
# ==================== 回调注册 ====================
def add_message_callback(self, callback: Callable[[Message], None]) -> None:
"""
添加消息回调
Args:
callback: 回调函数
"""
self._message_callbacks.append(callback)
def remove_message_callback(self, callback: Callable[[Message], None]) -> None:
"""
移除消息回调
Args:
callback: 回调函数
"""
if callback in self._message_callbacks:
self._message_callbacks.remove(callback)
def add_state_callback(self, callback: Callable[[ConnectionState, Optional[str]], None]) -> None:
"""
添加状态回调
Args:
callback: 回调函数
"""
self._state_callbacks.append(callback)
def remove_state_callback(self, callback: Callable[[ConnectionState, Optional[str]], None]) -> None:
"""
移除状态回调
Args:
callback: 回调函数
"""
if callback in self._state_callbacks:
self._state_callbacks.remove(callback)
def add_user_list_callback(self, callback: Callable[[List[UserInfo]], None]) -> None:
"""
添加用户列表回调
Args:
callback: 回调函数
"""
self._user_list_callbacks.append(callback)
def remove_user_list_callback(self, callback: Callable[[List[UserInfo]], None]) -> None:
"""
移除用户列表回调
Args:
callback: 回调函数
"""
if callback in self._user_list_callbacks:
self._user_list_callbacks.remove(callback)
# ==================== 连接模式 ====================
def get_connection_mode(self, peer_id: str) -> ConnectionMode:
"""
获取与对等端的连接模式
Args:
peer_id: 对等端ID
Returns:
连接模式
"""
return self._connection_manager.get_connection_mode(peer_id)
async def discover_lan_peers(self) -> List:
"""
发现局域网对等端
实现局域网发现 (需求 1.2)
Returns:
发现的对等端列表
"""
return await self._connection_manager.discover_lan_peers()
async def optimize_connection(self, peer_id: str) -> ConnectionMode:
"""
优化与对等端的连接
尝试建立P2P直连 (需求 1.1, 1.2, 1.3)
Args:
peer_id: 对等端ID
Returns:
优化后的连接模式
"""
return await self._connection_manager.auto_select_connection_mode(peer_id)
# ==================== 组件访问 ====================
@property
def connection_manager(self) -> ConnectionManager:
"""获取连接管理器"""
return self._connection_manager
@property
def file_transfer(self) -> FileTransferModule:
"""获取文件传输模块"""
return self._file_transfer
@property
def image_processor(self) -> ImageProcessor:
"""获取图片处理器"""
return self._image_processor
@property
def media_player(self) -> MediaPlayer:
"""获取媒体播放器"""
return self._media_player
@property
def voice_chat(self) -> Optional[VoiceChatModule]:
"""获取语音聊天模块"""
return self._voice_chat
# ==================== 统计信息 ====================
def get_stats(self) -> Dict[str, Any]:
"""
获取应用程序统计信息
Returns:
统计信息字典
"""
conn_stats = self._connection_manager.get_connection_stats()
return {
"user": self._user_info.username if self._user_info else None,
"connection": conn_stats,
"online_users": len(self._online_users),
"chat_sessions": len(self._chat_history),
"total_messages": sum(len(msgs) for msgs in self._chat_history.values()),
"active_transfers": len(self._file_transfer._active_transfers),
"voice_call_active": self._voice_chat.is_in_call if self._voice_chat else False
}
# ==================== 便捷函数 ====================
def create_client(server_host: str = "127.0.0.1",
server_port: int = 8888) -> P2PClientApp:
"""
创建客户端实例
Args:
server_host: 服务器地址
server_port: 服务器端口
Returns:
客户端实例
"""
config = ClientConfig(
server_host=server_host,
server_port=server_port
)
return P2PClientApp(config)
async def quick_connect(username: str, display_name: str = "",
server_host: str = "127.0.0.1",
server_port: int = 8888) -> Optional[P2PClientApp]:
"""
快速连接到服务器
Args:
username: 用户名
display_name: 显示名称
server_host: 服务器地址
server_port: 服务器端口
Returns:
连接成功返回客户端实例否则返回None
"""
import uuid
client = create_client(server_host, server_port)
user_info = UserInfo(
user_id=str(uuid.uuid4()),
username=username,
display_name=display_name or username,
status=UserStatus.ONLINE
)
success = await client.start(user_info)
if success:
return client
else:
return None