wangjun_branch
liguanwei 3 months ago
parent a2c5cd8d90
commit 017aa9c35d

@ -9,51 +9,30 @@ import ctypes
import os import os
# 定义 sockaddr 结构体,以便与底层 socket API 交互
class sockaddr(ctypes.Structure): class sockaddr(ctypes.Structure):
_fields_ = [ _fields_ = [("sa_family", ctypes.c_short),
("sa_family", ctypes.c_short), # 地址族,例如 AF_INET 或 AF_INET6 ("__pad1", ctypes.c_ushort),
("__pad1", ctypes.c_ushort), # 填充字段用于对齐 ("ipv4_addr", ctypes.c_byte * 4),
("ipv4_addr", ctypes.c_byte * 4), # IPv4 地址4 字节) ("ipv6_addr", ctypes.c_byte * 16),
("ipv6_addr", ctypes.c_byte * 16), # IPv6 地址16 字节) ("__pad2", ctypes.c_ulong)]
("__pad2", ctypes.c_ulong) # 额外填充字段
]
# 检测是否在 Windows 平台上,并加载相应的 Windows socket 函数
if hasattr(ctypes, 'windll'): if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA # 字符串转换为地址 WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA # 地址转换为字符串 WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else: else:
def not_windows(): def not_windows():
raise SystemError( raise SystemError(
"Invalid platform. ctypes.windll must be available." "Invalid platform. ctypes.windll must be available."
) )
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows
WSAStringToAddressA = not_windows # 非 Windows 平台时的占位符
WSAAddressToStringA = not_windows # 非 Windows 平台时的占位符
def inet_pton(address_family, ip_string): def inet_pton(address_family, ip_string):
""" addr = sockaddr()
Convert an IP address from text to binary format. addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
Args:
address_family: The address family (AF_INET for IPv4 or AF_INET6 for IPv6).
ip_string: The IP address in string form (e.g., "192.168.1.1").
Returns:
A packed binary representation of the IP address.
Raises:
socket.error: If the conversion fails.
"""
addr = sockaddr() # 创建 sockaddr 实例
addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 地址结构的大小
# 调用 WSAStringToAddressA 将字符串 IP 转换为二进制地址
if WSAStringToAddressA( if WSAStringToAddressA(
ip_string, ip_string,
address_family, address_family,
@ -61,50 +40,34 @@ def inet_pton(address_family, ip_string):
ctypes.byref(addr), ctypes.byref(addr),
ctypes.byref(addr_size) ctypes.byref(addr_size)
) != 0: ) != 0:
raise socket.error(ctypes.FormatError()) # 转换失败时抛出异常 raise socket.error(ctypes.FormatError())
# 根据地址族返回相应的打包 IP 地址
if address_family == socket.AF_INET: if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4) # IPv4 地址 return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6: if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16) # IPv6 地址 return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family') # 抛出未知地址族的异常 raise socket.error('unknown address family')
def inet_ntop(address_family, packed_ip): def inet_ntop(address_family, packed_ip):
""" addr = sockaddr()
Convert a binary IP address to its text representation. addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
Args: ip_string = ctypes.create_string_buffer(128)
address_family: The address family (AF_INET for IPv4 or AF_INET6 for IPv6). ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
packed_ip: The packed binary IP address.
Returns:
The IP address in string form.
Raises:
socket.error: If the conversion fails.
"""
addr = sockaddr() # 创建 sockaddr 实例
addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 地址结构的大小
ip_string = ctypes.create_string_buffer(128) # 用于存储 IP 字符串
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) # 字符串大小
# 根据地址族填充 sockaddr 结构
if address_family == socket.AF_INET: if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa') # IPv4 地址长度错误 raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4) # 复制 IPv4 地址 ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6: elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa') # IPv6 地址长度错误 raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16) # 复制 IPv6 地址 ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else: else:
raise socket.error('unknown address family') # 抛出未知地址族的异常 raise socket.error('unknown address family')
# 调用 WSAAddressToStringA 将二进制地址转换为字符串
if WSAAddressToStringA( if WSAAddressToStringA(
ctypes.byref(addr), ctypes.byref(addr),
addr_size, addr_size,
@ -112,12 +75,11 @@ def inet_ntop(address_family, packed_ip):
ip_string, ip_string,
ctypes.byref(ip_string_size) ctypes.byref(ip_string_size)
) != 0: ) != 0:
raise socket.error(ctypes.FormatError()) # 转换失败时抛出异常 raise socket.error(ctypes.FormatError())
return ip_string[:ip_string_size.value - 1] # 返回字符串表示形式,移除尾部的空字符
return ip_string[:ip_string_size.value - 1]
# 如果在 Windows 平台,将自定义的函数添加到 socket 库中 # Adding our two functions to the socket library
if os.name == 'nt': if os.name == 'nt':
socket.inet_pton = inet_pton # 添加 inet_pton 函数 socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop # 添加 inet_ntop 函数 socket.inet_ntop = inet_ntop

Loading…
Cancel
Save