# P2P Network Communication - Data Models """ 数据模型和枚举类型定义 包含消息类型、用户状态、传输状态等核心数据结构 """ import hashlib import json from dataclasses import dataclass, field, asdict from datetime import datetime from enum import Enum from typing import Optional class MessageType(Enum): """消息类型枚举""" TEXT = "text" FILE_REQUEST = "file_request" FILE_CHUNK = "file_chunk" FILE_COMPLETE = "file_complete" IMAGE = "image" AUDIO_STREAM = "audio_stream" VIDEO_STREAM = "video_stream" VOICE_CALL_REQUEST = "voice_call_request" VOICE_CALL_ACCEPT = "voice_call_accept" VOICE_CALL_REJECT = "voice_call_reject" VOICE_CALL_END = "voice_call_end" VOICE_DATA = "voice_data" HEARTBEAT = "heartbeat" USER_REGISTER = "user_register" USER_UNREGISTER = "user_unregister" USER_LIST_REQUEST = "user_list_request" USER_LIST_RESPONSE = "user_list_response" ACK = "ack" ERROR = "error" class UserStatus(Enum): """用户状态枚举""" ONLINE = "online" OFFLINE = "offline" BUSY = "busy" AWAY = "away" class TransferStatus(Enum): """传输状态枚举""" PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" PAUSED = "paused" class ConnectionMode(Enum): """连接模式枚举""" P2P = "p2p" # 局域网直连 RELAY = "relay" # 服务器中转 UNKNOWN = "unknown" # 未知 class NetworkQuality(Enum): """网络质量枚举""" EXCELLENT = "excellent" # 延迟 < 50ms GOOD = "good" # 延迟 50-100ms FAIR = "fair" # 延迟 100-200ms POOR = "poor" # 延迟 200-300ms BAD = "bad" # 延迟 > 300ms @dataclass class Message: """消息数据结构""" msg_type: MessageType sender_id: str receiver_id: str timestamp: float payload: bytes checksum: str = field(default="") message_id: str = field(default="") def __post_init__(self): """初始化后计算校验和和消息ID""" if not self.checksum: self.checksum = self._calculate_checksum() if not self.message_id: self.message_id = self._generate_message_id() def _calculate_checksum(self) -> str: """计算消息校验和""" data = f"{self.msg_type.value}{self.sender_id}{self.receiver_id}{self.timestamp}" data_bytes = data.encode('utf-8') + self.payload return hashlib.md5(data_bytes).hexdigest() def _generate_message_id(self) -> str: """生成消息ID""" data = f"{self.sender_id}{self.receiver_id}{self.timestamp}{self.checksum}" return hashlib.sha256(data.encode('utf-8')).hexdigest()[:32] def verify_checksum(self) -> bool: """验证消息校验和""" return self.checksum == self._calculate_checksum() def to_dict(self) -> dict: """转换为字典""" return { "msg_type": self.msg_type.value, "sender_id": self.sender_id, "receiver_id": self.receiver_id, "timestamp": self.timestamp, "payload": self.payload.hex(), # bytes转hex字符串 "checksum": self.checksum, "message_id": self.message_id, } @classmethod def from_dict(cls, data: dict) -> "Message": """从字典创建Message对象""" return cls( msg_type=MessageType(data["msg_type"]), sender_id=data["sender_id"], receiver_id=data["receiver_id"], timestamp=data["timestamp"], payload=bytes.fromhex(data["payload"]), checksum=data.get("checksum", ""), message_id=data.get("message_id", ""), ) def serialize(self) -> bytes: """序列化消息为字节流""" return json.dumps(self.to_dict()).encode('utf-8') @classmethod def deserialize(cls, data: bytes) -> "Message": """反序列化字节流为消息""" return cls.from_dict(json.loads(data.decode('utf-8'))) @dataclass class UserInfo: """用户信息""" user_id: str username: str display_name: str status: UserStatus = UserStatus.OFFLINE ip_address: str = "" port: int = 0 last_seen: Optional[datetime] = None public_key: bytes = field(default_factory=bytes) def to_dict(self) -> dict: """转换为字典""" return { "user_id": self.user_id, "username": self.username, "display_name": self.display_name, "status": self.status.value, "ip_address": self.ip_address, "port": self.port, "last_seen": self.last_seen.isoformat() if self.last_seen else None, "public_key": self.public_key.hex() if self.public_key else "", } @classmethod def from_dict(cls, data: dict) -> "UserInfo": """从字典创建UserInfo对象""" last_seen = None if data.get("last_seen"): last_seen = datetime.fromisoformat(data["last_seen"]) public_key = bytes() if data.get("public_key"): public_key = bytes.fromhex(data["public_key"]) return cls( user_id=data["user_id"], username=data["username"], display_name=data["display_name"], status=UserStatus(data.get("status", "offline")), ip_address=data.get("ip_address", ""), port=data.get("port", 0), last_seen=last_seen, public_key=public_key, ) def serialize(self) -> bytes: """序列化为字节流""" return json.dumps(self.to_dict()).encode('utf-8') @classmethod def deserialize(cls, data: bytes) -> "UserInfo": """反序列化字节流""" return cls.from_dict(json.loads(data.decode('utf-8'))) @dataclass class ChatMessage: """聊天消息记录""" message_id: str sender_id: str receiver_id: str content_type: MessageType content: str # 文本内容或文件路径 timestamp: datetime = field(default_factory=datetime.now) is_read: bool = False is_sent: bool = False def to_dict(self) -> dict: """转换为字典""" return { "message_id": self.message_id, "sender_id": self.sender_id, "receiver_id": self.receiver_id, "content_type": self.content_type.value, "content": self.content, "timestamp": self.timestamp.isoformat(), "is_read": self.is_read, "is_sent": self.is_sent, } @classmethod def from_dict(cls, data: dict) -> "ChatMessage": """从字典创建ChatMessage对象""" return cls( message_id=data["message_id"], sender_id=data["sender_id"], receiver_id=data["receiver_id"], content_type=MessageType(data["content_type"]), content=data["content"], timestamp=datetime.fromisoformat(data["timestamp"]), is_read=data.get("is_read", False), is_sent=data.get("is_sent", False), ) def serialize(self) -> bytes: """序列化为字节流""" return json.dumps(self.to_dict()).encode('utf-8') @classmethod def deserialize(cls, data: bytes) -> "ChatMessage": """反序列化字节流""" return cls.from_dict(json.loads(data.decode('utf-8'))) @dataclass class FileChunk: """文件块数据结构""" file_id: str chunk_index: int total_chunks: int data: bytes checksum: str = field(default="") def __post_init__(self): """初始化后计算校验和""" if not self.checksum: self.checksum = hashlib.md5(self.data).hexdigest() def verify_checksum(self) -> bool: """验证数据块校验和""" return self.checksum == hashlib.md5(self.data).hexdigest() def to_dict(self) -> dict: """转换为字典""" return { "file_id": self.file_id, "chunk_index": self.chunk_index, "total_chunks": self.total_chunks, "data": self.data.hex(), "checksum": self.checksum, } @classmethod def from_dict(cls, data: dict) -> "FileChunk": """从字典创建FileChunk对象""" return cls( file_id=data["file_id"], chunk_index=data["chunk_index"], total_chunks=data["total_chunks"], data=bytes.fromhex(data["data"]), checksum=data.get("checksum", ""), ) @dataclass class TransferProgress: """传输进度信息""" file_id: str file_name: str total_size: int transferred_size: int speed: float = 0.0 # bytes per second eta: float = 0.0 # estimated time remaining in seconds @property def progress_percent(self) -> float: """获取进度百分比""" if self.total_size == 0: return 0.0 return (self.transferred_size / self.total_size) * 100 def to_dict(self) -> dict: """转换为字典""" return { "file_id": self.file_id, "file_name": self.file_name, "total_size": self.total_size, "transferred_size": self.transferred_size, "speed": self.speed, "eta": self.eta, "progress_percent": self.progress_percent, } @dataclass class FileTransferRecord: """文件传输记录""" transfer_id: str file_name: str file_size: int file_hash: str sender_id: str receiver_id: str status: TransferStatus = TransferStatus.PENDING progress: float = 0.0 start_time: datetime = field(default_factory=datetime.now) end_time: Optional[datetime] = None def to_dict(self) -> dict: """转换为字典""" return { "transfer_id": self.transfer_id, "file_name": self.file_name, "file_size": self.file_size, "file_hash": self.file_hash, "sender_id": self.sender_id, "receiver_id": self.receiver_id, "status": self.status.value, "progress": self.progress, "start_time": self.start_time.isoformat(), "end_time": self.end_time.isoformat() if self.end_time else None, } @classmethod def from_dict(cls, data: dict) -> "FileTransferRecord": """从字典创建FileTransferRecord对象""" end_time = None if data.get("end_time"): end_time = datetime.fromisoformat(data["end_time"]) return cls( transfer_id=data["transfer_id"], file_name=data["file_name"], file_size=data["file_size"], file_hash=data["file_hash"], sender_id=data["sender_id"], receiver_id=data["receiver_id"], status=TransferStatus(data.get("status", "pending")), progress=data.get("progress", 0.0), start_time=datetime.fromisoformat(data["start_time"]), end_time=end_time, ) @dataclass class PeerInfo: """对等端信息(用于局域网发现)""" peer_id: str username: str ip_address: str port: int discovered_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> dict: """转换为字典""" return { "peer_id": self.peer_id, "username": self.username, "ip_address": self.ip_address, "port": self.port, "discovered_at": self.discovered_at.isoformat(), } @classmethod def from_dict(cls, data: dict) -> "PeerInfo": """从字典创建PeerInfo对象""" return cls( peer_id=data["peer_id"], username=data["username"], ip_address=data["ip_address"], port=data["port"], discovered_at=datetime.fromisoformat(data["discovered_at"]), )