You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import socket
import struct
def format_mac_address(mac_bytes):
"""
Format a MAC address from bytes to a human-readable string.
Args:
mac_bytes (bytes): The MAC address in bytes format.
Returns:
str: The MAC address in human-readable format.
"""
mac_str = mac_bytes.hex()
mac_formatted = ':'.join(mac_str[i:i+2] for i in range(0, 12, 2))
return mac_formatted
class AgreementUtil():
@staticmethod
def create_udp_packet(data, dest_port, src_port):
"""
创建UDP数据包
Returns:
bytes: `UDP数据包`.
"""
# UDP header fields
udp_header = struct.pack('!HHHH', src_port, dest_port, 8 + len(data), 0) # Length includes header length
return udp_header + data.encode()
@staticmethod
def create_ip_packet(udp_packet, dest_ip, src_ip):
"""
创建IP数据包
Returns:
bytes: IP数据包
"""
# IP header fields
version = 4
ihl = 5 # Internet Header Length
type_of_service = 0
total_length = 20 + len(udp_packet) # 20 bytes IP header + UDP packet length
identification = 54321
flags = 0 # Don't fragment
fragment_offset = 0
ttl = 64 # Time to live
protocol = 17 # UDP
checksum = 0 # For simplicity, set to 0
source_ip = socket.inet_aton(src_ip)
dest_ip = socket.inet_aton(dest_ip)
# IP数据包头部字段
ip_header = struct.pack('!BBHHHBBH4s4s',
(version << 4) + ihl, type_of_service, total_length,
identification, (flags << 13) + fragment_offset,
ttl, protocol, checksum, source_ip, dest_ip)
return ip_header + udp_packet
@staticmethod
def create_ethernet_frame(ip_packet, dest_mac, src_mac):
"""
创建以太网帧数据包
Returns:
bytes: 以太网帧数据包
"""
# Convert MAC addresses from string format to bytes
dest_mac_bytes = bytes.fromhex(dest_mac.replace(':', ''))
src_mac_bytes = bytes.fromhex(src_mac.replace(':', ''))
# Ethernet type for IP (0x0800)
ethernet_type = 0x0800
# Pack Ethernet frame fields
ethernet_frame = struct.pack('!6s6sH', dest_mac_bytes, src_mac_bytes, ethernet_type)
return ethernet_frame + ip_packet
@staticmethod
def parse_ethernet_frame(frame):
"""
解析以太网帧数据包
Returns: 发送端MAC接收端MAC以太网类型IP数据包
"""
dest_mac, src_mac, ethertype = struct.unpack('!6s6sH', frame[:14])
payload = frame[14:]
return format_mac_address(src_mac), format_mac_address(dest_mac), ethertype, payload
@staticmethod
def parse_ip_packet(packet):
"""
解析 IP 数据包
Returns: 发送端IP接收端IP协议UDP数据包
"""
version_ihl, type_of_service, total_length, identification, flags_fragment_offset, \
ttl, protocol, checksum, source_ip, dest_ip = struct.unpack('!BBHHHBBH4s4s', packet[:20])
payload = packet[20:]
return socket.inet_ntoa(source_ip), socket.inet_ntoa(dest_ip), protocol, payload
@staticmethod
def parse_udp_packet(packet):
"""
解析UDP数据包
Returns:
tuple: 发送主机端口,接收主机端口,数据
"""
src_port, dest_port, length, checksum = struct.unpack('!HHHH', packet[:8])
data = packet[8:]
return src_port, dest_port, data.decode()