You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.6 KiB

5 months ago
import socket
import struct
def format_mac_address(mac_bytes):
"""
Format a MAC address from bytes to a human-readable string.
Args:
mac_bytes (bytes): The MAC address in bytes format.
Returns:
str: The MAC address in human-readable format.
"""
mac_str = mac_bytes.hex()
mac_formatted = ':'.join(mac_str[i:i+2] for i in range(0, 12, 2))
return mac_formatted
class AgreementUtil():
@staticmethod
def create_udp_packet(data, dest_port, src_port):
"""
创建UDP数据包
Returns:
bytes: `UDP数据包`.
"""
# UDP header fields
udp_header = struct.pack('!HHHH', src_port, dest_port, 8 + len(data), 0) # Length includes header length
return udp_header + data.encode()
@staticmethod
def create_ip_packet(udp_packet, dest_ip, src_ip):
"""
创建IP数据包
Returns:
bytes: IP数据包
"""
# IP header fields
version = 4
ihl = 5 # Internet Header Length
type_of_service = 0
total_length = 20 + len(udp_packet) # 20 bytes IP header + UDP packet length
identification = 54321
flags = 0 # Don't fragment
fragment_offset = 0
ttl = 64 # Time to live
protocol = 17 # UDP
checksum = 0 # For simplicity, set to 0
source_ip = socket.inet_aton(src_ip)
dest_ip = socket.inet_aton(dest_ip)
# IP数据包头部字段
ip_header = struct.pack('!BBHHHBBH4s4s',
(version << 4) + ihl, type_of_service, total_length,
identification, (flags << 13) + fragment_offset,
ttl, protocol, checksum, source_ip, dest_ip)
return ip_header + udp_packet
@staticmethod
def create_ethernet_frame(ip_packet, dest_mac, src_mac):
"""
创建以太网帧数据包
Returns:
bytes: 以太网帧数据包
"""
# Convert MAC addresses from string format to bytes
dest_mac_bytes = bytes.fromhex(dest_mac.replace(':', ''))
src_mac_bytes = bytes.fromhex(src_mac.replace(':', ''))
# Ethernet type for IP (0x0800)
ethernet_type = 0x0800
# Pack Ethernet frame fields
ethernet_frame = struct.pack('!6s6sH', dest_mac_bytes, src_mac_bytes, ethernet_type)
return ethernet_frame + ip_packet
@staticmethod
def parse_ethernet_frame(frame):
"""
解析以太网帧数据包
Returns: 发送端MAC接收端MAC以太网类型IP数据包
"""
dest_mac, src_mac, ethertype = struct.unpack('!6s6sH', frame[:14])
payload = frame[14:]
return format_mac_address(src_mac), format_mac_address(dest_mac), ethertype, payload
@staticmethod
def parse_ip_packet(packet):
"""
解析 IP 数据包
Returns: 发送端IP接收端IP协议UDP数据包
"""
version_ihl, type_of_service, total_length, identification, flags_fragment_offset, \
ttl, protocol, checksum, source_ip, dest_ip = struct.unpack('!BBHHHBBH4s4s', packet[:20])
payload = packet[20:]
return socket.inet_ntoa(source_ip), socket.inet_ntoa(dest_ip), protocol, payload
@staticmethod
def parse_udp_packet(packet):
"""
解析UDP数据包
Returns:
tuple: 发送主机端口接收主机端口数据
"""
src_port, dest_port, length, checksum = struct.unpack('!HHHH', packet[:8])
data = packet[8:]
return src_port, dest_port, data.decode()