From 5ba37f1381ae467a84757fc3ae888f5a8b49a29c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E5=8D=9A=E6=96=87?= <15549487+FX_YBW@user.noreply.gitee.com> Date: Sun, 28 Dec 2025 23:58:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E9=AA=8C=E6=8A=A5=E5=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 实现报告.md | 691 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 691 insertions(+) create mode 100644 实现报告.md diff --git a/实现报告.md b/实现报告.md new file mode 100644 index 0000000..76633ef --- /dev/null +++ b/实现报告.md @@ -0,0 +1,691 @@ +# P2P Chat 点对点通信应用 - 实现报告 + +## 一、项目概述 + +本项目是一个基于 Python 的 P2P 网络通信应用程序,实现了文本消息、文件传输、图片分享和语音通话等功能。项目采用客户端-服务器混合架构,支持局域网直连(P2P)和服务器中转两种通信模式。 + +### 技术栈 +- **后端框架**: Python asyncio 异步编程 +- **GUI框架**: PyQt6 +- **数据库**: MySQL (aiomysql异步驱动) +- **音频处理**: PyAudio + Opus编解码 +- **加密**: AES-256-GCM + TLS/SSL +- **网络协议**: TCP (可靠传输) + UDP (语音数据) + +--- + +## 二、实现思路 + +### 2.1 整体架构设计 + +项目采用分层架构设计,主要分为以下几层: + +``` +┌─────────────────────────────────────────────────────────┐ +│ GUI 层 (PyQt6) │ +│ MainWindow / ChatWidget / ContactListWidget / ... │ +├─────────────────────────────────────────────────────────┤ +│ 应用层 (P2PClientApp) │ +│ 集成所有客户端组件,实现完整的消息流程 │ +├─────────────────────────────────────────────────────────┤ +│ 功能模块层 │ +│ ConnectionManager / FileTransfer / VoiceChat / ... │ +├─────────────────────────────────────────────────────────┤ +│ 共享层 (shared) │ +│ Models / MessageHandler / Security │ +├─────────────────────────────────────────────────────────┤ +│ 服务器层 (server) │ +│ RelayServer / Database / Repositories │ +└─────────────────────────────────────────────────────────┘ +``` + +### 2.2 核心设计思想 + +1. **异步编程**: 全面采用 Python asyncio 实现非阻塞 I/O,提高并发性能 +2. **回调机制**: 通过回调函数实现模块间解耦,便于扩展和维护 +3. **状态机模式**: 连接状态、通话状态等采用状态机管理 +4. **分块传输**: 大文件采用分块传输,支持断点续传 +5. **混合通信**: 自动选择 P2P 直连或服务器中转 + +--- + +## 三、核心功能实现 + +### 3.1 网络连接管理 + +#### 实现思路 +连接管理器负责管理所有网络连接,包括服务器连接和 P2P 直连。采用状态机模式管理连接状态,支持自动重连。 + +#### 核心代码 + +```python +# client/connection_manager.py + +class ConnectionState(Enum): + """连接状态枚举""" + DISCONNECTED = "disconnected" + CONNECTING = "connecting" + CONNECTED = "connected" + RECONNECTING = "reconnecting" + ERROR = "error" + +class ConnectionManager: + """连接管理器 - 负责管理网络连接、自动选择通信模式、心跳机制和重连逻辑""" + + async def connect_to_server(self, user_info: UserInfo) -> bool: + """连接到中转服务器""" + self._set_state(ConnectionState.CONNECTING, "Connecting to server") + + try: + # 建立TCP连接 + self._server_reader, self._server_writer = await asyncio.wait_for( + asyncio.open_connection( + self.config.server_host, + self.config.server_port + ), + timeout=self.config.connection_timeout + ) + + # 发送注册消息 + register_msg = Message( + msg_type=MessageType.USER_REGISTER, + sender_id=self._user_id, + receiver_id="server", + timestamp=time.time(), + payload=user_info.serialize() + ) + await self._send_to_server(register_msg) + + # 等待注册响应... + self._server_connected = True + self._set_state(ConnectionState.CONNECTED, "Connected to server") + + # 启动心跳任务和消息接收任务 + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + self._receive_task = asyncio.create_task(self._receive_loop()) + + return True + except Exception as e: + self._set_state(ConnectionState.ERROR, str(e)) + return False +``` + +#### 局域网发现机制 + +```python +# client/connection_manager.py + +async def discover_lan_peers(self) -> List[PeerInfo]: + """发现局域网内的其他客户端 - 使用UDP广播""" + discovered = [] + + # 创建UDP广播socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + # 构建发现请求 + discovery_data = { + "magic": self.LAN_DISCOVERY_MAGIC.decode('utf-8'), + "user_id": self._user_id, + "username": self._user_info.username, + "port": self._p2p_listen_port + } + + # 发送广播 + await loop.sock_sendto(sock, json.dumps(discovery_data).encode(), + ('', self.config.lan_broadcast_port)) + + # 等待响应并收集发现的对等端... + return discovered +``` + +### 3.2 消息处理系统 + +#### 实现思路 +消息处理器负责消息的序列化、反序列化、校验和路由。采用统一的消息格式,支持多种消息类型。 + +#### 消息格式设计 + +```python +# shared/models.py + +class MessageType(Enum): + """消息类型枚举""" + TEXT = "text" # 文本消息 + FILE_REQUEST = "file_request" # 文件请求 + FILE_CHUNK = "file_chunk" # 文件块 + FILE_COMPLETE = "file_complete" # 文件完成 + IMAGE = "image" # 图片 + VOICE_CALL_REQUEST = "voice_call_request" # 语音通话请求 + VOICE_CALL_ACCEPT = "voice_call_accept" # 接听 + VOICE_CALL_REJECT = "voice_call_reject" # 拒绝 + VOICE_CALL_END = "voice_call_end" # 结束 + VOICE_DATA = "voice_data" # 语音数据 + HEARTBEAT = "heartbeat" # 心跳 + USER_REGISTER = "user_register" # 用户注册 + # ... + +@dataclass +class Message: + """消息数据结构""" + msg_type: MessageType + sender_id: str + receiver_id: str + timestamp: float + payload: bytes + checksum: str = field(default="") + message_id: str = field(default="") + + def _calculate_checksum(self) -> str: + """计算消息校验和 - 使用MD5""" + data = f"{self.msg_type.value}{self.sender_id}{self.receiver_id}{self.timestamp}" + data_bytes = data.encode('utf-8') + self.payload + return hashlib.md5(data_bytes).hexdigest() +``` + +#### 消息序列化协议 + +```python +# shared/message_handler.py + +class MessageHandler: + """消息处理器""" + + # 消息头格式: 4字节长度 + 4字节版本 + HEADER_FORMAT = "!II" # Network byte order + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) + + def serialize(self, message: Message) -> bytes: + """序列化消息为字节流""" + # 格式: [4字节长度][4字节版本][JSON数据] + json_data = json.dumps(message.to_dict()).encode('utf-8') + header = struct.pack(self.HEADER_FORMAT, len(json_data), self.PROTOCOL_VERSION) + return header + json_data + + def deserialize(self, data: bytes) -> Message: + """反序列化字节流为消息""" + # 解析消息头 + header = data[:self.HEADER_SIZE] + payload_length, version = struct.unpack(self.HEADER_FORMAT, header) + + # 解析JSON数据 + json_data = data[self.HEADER_SIZE:self.HEADER_SIZE + payload_length] + return Message.from_dict(json.loads(json_data.decode('utf-8'))) +``` + +### 3.3 文件传输模块 + +#### 实现思路 +文件传输采用分块传输机制,每块 64KB,支持断点续传和完整性校验。 + +#### 核心实现 + +```python +# client/file_transfer.py + +class FileTransferModule: + """文件传输模块 - 负责文件分块传输、断点续传和完整性校验""" + + CHUNK_SIZE = 64 * 1024 # 64KB per chunk + + async def send_file(self, peer_id: str, file_path: str, + progress_callback: Optional[ProgressCallback] = None) -> bool: + """发送文件""" + # 获取文件信息 + file_name = os.path.basename(file_path) + file_size = os.path.getsize(file_path) + file_hash = self.calculate_file_hash(file_path) # SHA256 + total_chunks = self._get_total_chunks(file_size) + + # 发送文件请求消息 + file_request_payload = json.dumps({ + "file_id": file_id, + "file_name": file_name, + "file_size": file_size, + "file_hash": file_hash, + "total_chunks": total_chunks + }).encode('utf-8') + + await self._send_message(peer_id, request_msg) + + # 发送所有数据块 + for chunk_index in range(total_chunks): + chunk_data = self._read_chunk(file_path, chunk_index) + chunk_hash = self.calculate_chunk_hash(chunk_data) # MD5 + + chunk_payload = json.dumps({ + "file_id": file_id, + "chunk_index": chunk_index, + "total_chunks": total_chunks, + "checksum": chunk_hash, + "data": chunk_data.hex() + }).encode('utf-8') + + await self._send_message(peer_id, chunk_msg) + + # 更新进度 + state.completed_chunks.append(chunk_index) + progress = self._calculate_progress(state, start_time) + self._notify_progress(file_id, progress) + + # 发送完成消息 + await self._send_message(peer_id, complete_msg) + return True + + def calculate_file_hash(self, file_path: str, algorithm: str = "sha256") -> str: + """计算文件哈希值 - 用于完整性校验""" + hasher = hashlib.sha256() + with open(file_path, 'rb') as f: + while True: + data = f.read(self.CHUNK_SIZE) + if not data: + break + hasher.update(data) + return hasher.hexdigest() +``` + +#### 断点续传实现 + +```python +# client/file_transfer.py + +@dataclass +class TransferState: + """传输状态(用于断点续传)""" + file_id: str + file_path: str + file_name: str + file_size: int + file_hash: str + total_chunks: int + completed_chunks: List[int] = field(default_factory=list) + status: TransferStatus = TransferStatus.PENDING + + def to_dict(self) -> dict: + """转换为字典(用于持久化)""" + return { + "file_id": self.file_id, + "completed_chunks": self.completed_chunks, + "status": self.status.value, + # ... + } + +class FileTransferModule: + def _save_transfer_state(self, state: TransferState) -> None: + """保存传输状态到文件 - 用于断点续传""" + state_file = self._state_dir / f"{state.file_id}.json" + with open(state_file, 'w') as f: + json.dump(state.to_dict(), f) + + async def resume_transfer(self, file_id: str) -> bool: + """恢复中断的传输""" + state = self._active_transfers[file_id] + # 从上次中断的位置继续发送... +``` + +### 3.4 语音通话模块 + +#### 实现思路 +语音通话采用 UDP 传输音频数据,使用 Opus 编解码器压缩音频,通过抖动缓冲区平滑网络抖动。 + +#### 音频采集与播放 + +```python +# client/voice_chat.py + +class AudioCapture: + """音频采集器 - 使用PyAudio""" + + def start(self) -> None: + """开始音频采集""" + import pyaudio + + self._pyaudio = pyaudio.PyAudio() + self._stream = self._pyaudio.open( + format=pyaudio.paInt16, + channels=self._config.channels, + rate=self._config.sample_rate, + input=True, + frames_per_buffer=self._config.chunk_size + ) + + # 启动采集线程 + self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True) + self._capture_thread.start() + + def _capture_loop(self) -> None: + """音频采集循环""" + while self._is_capturing: + data = self._stream.read(self._config.chunk_size, exception_on_overflow=False) + self._audio_queue.append(data) + +class JitterBuffer: + """抖动缓冲区 - 用于平滑网络抖动""" + + def __init__(self, target_delay: float = 0.06, max_delay: float = 0.2): + self._buffer: deque = deque() + self._target_delay = target_delay # 60ms + + def push(self, sequence: int, data: bytes, timestamp: float) -> None: + """将音频数据包放入缓冲区 - 按序列号排序""" + # 丢弃过期的包 + if sequence <= self._last_played_seq: + return + # 按序列号排序插入... + + def pop(self) -> Optional[bytes]: + """从缓冲区取出下一个音频数据包""" + if not self._buffer: + return None + # 检查是否有足够的缓冲... + seq, data, _ = self._buffer.popleft() + return data +``` + +#### Opus 编解码 + +```python +# client/voice_chat.py + +class AudioEncoder: + """音频编码器 - 使用Opus编码器进行音频压缩""" + + def __init__(self, config: AudioConfig, bitrate: int = 24000): + import opuslib + self._encoder = opuslib.Encoder( + config.sample_rate, + config.channels, + self.APPLICATION_VOIP + ) + self._encoder.bitrate = bitrate # 24kbps + + def encode(self, pcm_data: bytes) -> bytes: + """编码PCM音频数据""" + return self._encoder.encode(pcm_data, self._config.chunk_size) + + def set_bitrate(self, bitrate: int) -> None: + """设置编码比特率 - 用于自适应调整""" + self._encoder.bitrate = bitrate +``` + +#### 通话控制 + +```python +# client/voice_chat.py + +class VoiceChatModule: + """语音聊天模块""" + + async def start_call(self, peer_id: str) -> bool: + """发起语音通话""" + # 初始化UDP + if not await self._init_udp_socket(): + return False + + # 发送通话请求 + call_data = { + "caller_id": self._user_id, + "caller_name": self._username, + "udp_port": self._udp_port + } + + message = Message( + msg_type=MessageType.VOICE_CALL_REQUEST, + sender_id=self._user_id, + receiver_id=peer_id, + payload=json.dumps(call_data).encode('utf-8') + ) + + await self._send_message_callback(peer_id, message) + self._set_state(CallState.CALLING) + return True + + async def accept_call(self, peer_id: str) -> bool: + """接听语音通话""" + # 发送接听响应 + message = Message( + msg_type=MessageType.VOICE_CALL_ACCEPT, + # ... + ) + await self._send_message_callback(peer_id, message) + + # 开始音频会话 + await self._start_audio_session() + return True +``` + +### 3.5 中转服务器 + +#### 实现思路 +中转服务器负责用户注册、消息转发和离线消息缓存。采用异步 I/O 处理大量并发连接。 + +#### 核心实现 + +```python +# server/relay_server.py + +class RelayServer: + """中转服务器""" + + def __init__(self, config: ServerConfig): + self._connections: Dict[str, ClientConnection] = {} # user_id -> connection + self._users: Dict[str, UserInfo] = {} + self._offline_messages: Dict[str, deque] = {} # 离线消息缓存 + + async def start(self) -> None: + """启动服务器""" + self._server = await asyncio.start_server( + self._handle_client, + self.host, + self.port + ) + + # 启动心跳检查任务 + self._heartbeat_task = asyncio.create_task(self._heartbeat_checker()) + + async with self._server: + await self._server.serve_forever() + + async def _handle_client(self, reader: StreamReader, writer: StreamWriter) -> None: + """处理客户端连接""" + while self._running: + message = await self._read_message(reader) + if message is None: + break + + if message.msg_type == MessageType.USER_REGISTER: + await self._process_register(message, temp_conn) + else: + await self._process_message(message, user_id) + + async def _relay_message_async(self, message: Message) -> bool: + """转发消息到目标客户端""" + receiver_id = message.receiver_id + + if self.is_user_online(receiver_id): + # 在线,直接转发 + conn = self._connections[receiver_id] + return await self._send_message(conn.writer, message) + else: + # 离线,缓存消息 + self.cache_offline_message(receiver_id, message) + return True + + def cache_offline_message(self, user_id: str, message: Message) -> None: + """缓存离线消息""" + if user_id not in self._offline_messages: + self._offline_messages[user_id] = deque(maxlen=1000) + self._offline_messages[user_id].append(message) + + async def _deliver_offline_messages(self, user_id: str) -> None: + """用户上线时投递离线消息""" + if user_id not in self._offline_messages: + return + messages = self._offline_messages.pop(user_id) + for message in messages: + await self._send_message(self._connections[user_id].writer, message) +``` + +### 3.6 安全模块 + +#### 实现思路 +采用 AES-256-GCM 加密算法保护数据传输和本地存储,支持 TLS/SSL 连接。 + +#### AES-256-GCM 加密 + +```python +# shared/security.py + +class AESCipher: + """AES-256-GCM 加密器""" + + KEY_SIZE = 32 # 256 bits + IV_SIZE = 12 # 96 bits + TAG_SIZE = 16 # 128 bits + + def encrypt(self, plaintext: bytes) -> EncryptedData: + """加密数据""" + iv = self.generate_iv() + + cipher = Cipher( + algorithms.AES(self._key), + modes.GCM(iv), + backend=default_backend() + ) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(plaintext) + encryptor.finalize() + + return EncryptedData( + ciphertext=ciphertext, + iv=iv, + tag=encryptor.tag + ) + + @staticmethod + def derive_key_from_password(password: str, salt: bytes = None) -> Tuple[bytes, bytes]: + """从密码派生密钥 (PBKDF2)""" + if salt is None: + salt = secrets.token_bytes(16) + + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100000 + ) + key = kdf.derive(password.encode('utf-8')) + return key, salt +``` + +--- + +## 四、功能介绍 + +### 4.1 文本消息通信 +- 支持实时文本消息发送和接收 +- 消息带有校验和,确保完整性 +- 支持离线消息缓存,用户上线后自动投递 + +### 4.2 文件传输 +- 支持任意类型文件传输 +- 64KB 分块传输,支持大文件 +- SHA256 完整性校验 +- 断点续传功能 +- 实时进度显示 + +### 4.3 图片传输与显示 +- 支持常见图片格式 (PNG, JPG, GIF, BMP) +- 可选图片压缩 +- 缩略图预览 + +### 4.4 语音通话 +- 实时语音通话 +- Opus 编解码,低带宽高质量 +- 抖动缓冲区,平滑网络抖动 +- 静音功能 +- 自适应比特率调整 + +### 4.5 局域网自动发现 +- UDP 广播发现局域网内用户 +- 自动选择 P2P 直连或服务器中转 +- P2P 直连延迟更低 + +### 4.6 安全特性 +- AES-256-GCM 加密传输 +- TLS/SSL 连接支持 +- 本地数据加密存储 +- PBKDF2 密钥派生 + +--- + +## 五、心得体会 + +### 5.1 异步编程的重要性 + +在网络应用开发中,异步编程是提高性能的关键。Python 的 asyncio 提供了优雅的异步编程模型,使得处理大量并发连接变得简单。通过 `async/await` 语法,代码既保持了可读性,又实现了高效的非阻塞 I/O。 + +```python +# 异步编程让并发处理变得简洁 +async def handle_multiple_clients(): + tasks = [handle_client(client) for client in clients] + await asyncio.gather(*tasks) +``` + +### 5.2 模块化设计的价值 + +项目采用了清晰的模块化设计,每个模块职责单一: +- `ConnectionManager` 专注于连接管理 +- `FileTransferModule` 专注于文件传输 +- `VoiceChatModule` 专注于语音通话 + +这种设计使得代码易于维护和扩展,也便于单元测试。 + +### 5.3 回调机制的灵活性 + +通过回调机制实现模块间通信,降低了耦合度: + +```python +# 注册回调 +self.client.add_message_callback(self._on_message) +self.client.add_state_callback(self._on_state_change) + +# 模块内部触发回调 +for callback in self._message_callbacks: + callback(message) +``` + +### 5.4 网络编程的挑战 + +实现语音通话时,网络抖动是一个主要挑战。通过实现抖动缓冲区(JitterBuffer),可以有效平滑网络延迟变化,保证音频播放的连续性。 + +### 5.5 安全性考虑 + +在设计之初就考虑安全性是非常重要的。项目采用了: +- 消息校验和防止篡改 +- AES-256-GCM 加密保护数据 +- PBKDF2 安全地从密码派生密钥 + +### 5.6 用户体验优化 + +GUI 设计中,将网络操作放在独立线程中执行,避免阻塞 UI 线程,保证界面响应流畅: + +```python +class AsyncWorker(QThread): + """异步工作线程,用于运行 asyncio 事件循环""" + + def run(self): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + # 在独立线程中运行异步任务... +``` + +### 5.7 总结 + +通过这个项目的开发,深入理解了: +1. 网络编程的核心概念(TCP/UDP、异步I/O、协议设计) +2. 实时通信的技术挑战(延迟、抖动、丢包) +3. 安全编程的最佳实践(加密、密钥管理) +4. GUI 应用的多线程设计 +5. 软件工程的模块化思想 + +这些经验对于未来开发更复杂的分布式系统和实时通信应用都有很大帮助。