import socket import struct def format_mac_address(mac_bytes): """ Format a MAC address from bytes to a human-readable string. Args: mac_bytes (bytes): The MAC address in bytes format. Returns: str: The MAC address in human-readable format. """ mac_str = mac_bytes.hex() mac_formatted = ':'.join(mac_str[i:i+2] for i in range(0, 12, 2)) return mac_formatted class AgreementUtil(): @staticmethod def create_udp_packet(data, dest_port, src_port): """ 创建UDP数据包 Returns: bytes: `UDP数据包`. """ # UDP header fields udp_header = struct.pack('!HHHH', src_port, dest_port, 8 + len(data), 0) # Length includes header length return udp_header + data.encode() @staticmethod def create_ip_packet(udp_packet, dest_ip, src_ip): """ 创建IP数据包 Returns: bytes: IP数据包 """ # IP header fields version = 4 ihl = 5 # Internet Header Length type_of_service = 0 total_length = 20 + len(udp_packet) # 20 bytes IP header + UDP packet length identification = 54321 flags = 0 # Don't fragment fragment_offset = 0 ttl = 64 # Time to live protocol = 17 # UDP checksum = 0 # For simplicity, set to 0 source_ip = socket.inet_aton(src_ip) dest_ip = socket.inet_aton(dest_ip) # IP数据包头部字段 ip_header = struct.pack('!BBHHHBBH4s4s', (version << 4) + ihl, type_of_service, total_length, identification, (flags << 13) + fragment_offset, ttl, protocol, checksum, source_ip, dest_ip) return ip_header + udp_packet @staticmethod def create_ethernet_frame(ip_packet, dest_mac, src_mac): """ 创建以太网帧数据包 Returns: bytes: 以太网帧数据包 """ # Convert MAC addresses from string format to bytes dest_mac_bytes = bytes.fromhex(dest_mac.replace(':', '')) src_mac_bytes = bytes.fromhex(src_mac.replace(':', '')) # Ethernet type for IP (0x0800) ethernet_type = 0x0800 # Pack Ethernet frame fields ethernet_frame = struct.pack('!6s6sH', dest_mac_bytes, src_mac_bytes, ethernet_type) return ethernet_frame + ip_packet @staticmethod def parse_ethernet_frame(frame): """ 解析以太网帧数据包 Returns: 发送端MAC,接收端MAC,以太网类型,IP数据包 """ dest_mac, src_mac, ethertype = struct.unpack('!6s6sH', frame[:14]) payload = frame[14:] return format_mac_address(src_mac), format_mac_address(dest_mac), ethertype, payload @staticmethod def parse_ip_packet(packet): """ 解析 IP 数据包 Returns: 发送端IP,接收端IP,协议,UDP数据包 """ version_ihl, type_of_service, total_length, identification, flags_fragment_offset, \ ttl, protocol, checksum, source_ip, dest_ip = struct.unpack('!BBHHHBBH4s4s', packet[:20]) payload = packet[20:] return socket.inet_ntoa(source_ip), socket.inet_ntoa(dest_ip), protocol, payload @staticmethod def parse_udp_packet(packet): """ 解析UDP数据包 Returns: tuple: 发送主机端口,接收主机端口,数据 """ src_port, dest_port, length, checksum = struct.unpack('!HHHH', packet[:8]) data = packet[8:] return src_port, dest_port, data.decode()