diff --git a/src/sqlmap-master/thirdparty/socks/socks.py b/src/sqlmap-master/thirdparty/socks/socks.py index 4005cab..af914a4 100644 --- a/src/sqlmap-master/thirdparty/socks/socks.py +++ b/src/sqlmap-master/thirdparty/socks/socks.py @@ -121,6 +121,7 @@ def unwrapmodule(module): module.socket.socket = _orgsocket module.socket.create_connection = _orgcreateconnection + class socksocket(socket.socket): """socksocket([family[, type[, proto]]]) -> socket object Open a SOCKS enabled socket. The parameters are the same as @@ -129,11 +130,17 @@ class socksocket(socket.socket): """ def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None): + # 初始化父类 socket _orgsocket.__init__(self, family, type, proto, _sock) + + # 根据默认代理设置代理 if _defaultproxy != None: self.__proxy = _defaultproxy else: + # 初始化代理信息,如果没有默认代理则设为空元组 self.__proxy = (None, None, None, None, None, None) + + # 用于保存代理关联的本地socket信息 self.__proxysockname = None self.__proxypeername = None @@ -142,121 +149,111 @@ class socksocket(socket.socket): Receive EXACTLY the number of bytes requested from the socket. Blocks until the required number of bytes have been received. """ + # 从socket接收指定字节数数据 data = self.recv(count) + # 当接收的数据少于请求的字节数时继续接收 while len(data) < count: - d = self.recv(count-len(data)) - if not d: raise GeneralProxyError((0, "connection closed unexpectedly")) - data = data + d + d = self.recv(count - len(data)) + if not d: + raise GeneralProxyError((0, "connection closed unexpectedly")) # 如果连接关闭则抛出异常 + data = data + d # 累加接收到的数据 return data def setproxy(self, proxytype=None, addr=None, port=None, rdns=True, username=None, password=None): """setproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) Sets the proxy to be used. - proxytype - The type of the proxy to be used. Three types - are supported: PROXY_TYPE_SOCKS4 (including socks4a), - PROXY_TYPE_SOCKS5 and PROXY_TYPE_HTTP - addr - The address of the server (IP or DNS). - port - The port of the server. Defaults to 1080 for SOCKS - servers and 8080 for HTTP proxy servers. - rdns - Should DNS queries be preformed on the remote side - (rather than the local side). The default is True. - Note: This has no effect with SOCKS4 servers. - username - Username to authenticate with to the server. - The default is no authentication. - password - Password to authenticate with to the server. - Only relevant when username is also provided. """ + # 设置代理类型、地址、端口等参数 self.__proxy = (proxytype, addr, port, rdns, username, password) def __negotiatesocks5(self, destaddr, destport): """__negotiatesocks5(self,destaddr,destport) Negotiates a connection through a SOCKS5 server. """ - # First we'll send the authentication packages we support. - if (self.__proxy[4]!=None) and (self.__proxy[5]!=None): - # The username/password details were supplied to the - # setproxy method so we support the USERNAME/PASSWORD - # authentication (in addition to the standard none). - self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02)) + # 发送支持的认证方法 + if (self.__proxy[4] != None) and (self.__proxy[5] != None): + # 如果提供了用户名和密码,则支持用户名/密码认证 + self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02)) # 0x02 代表用户名/密码认证 else: - # No username/password were entered, therefore we - # only support connections with no authentication. - self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00)) - # We'll receive the server's response to determine which - # method was selected + # 仅支持无认证连接 + self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00)) # 0x01 代表无认证 + + # 接收服务器的响应以确定选定的身份验证方法 chosenauth = self.__recvall(2) if chosenauth[0:1] != b'\x05': self.close() - raise GeneralProxyError((1, _generalerrors[1])) - # Check the chosen authentication method + raise GeneralProxyError((1, _generalerrors[1])) # 无效的协议版本 + + # 检查所选的身份验证方法 if chosenauth[1:2] == b'\x00': - # No authentication is required + # 无需身份验证 pass elif chosenauth[1:2] == b'\x02': - # Okay, we need to perform a basic username/password - # authentication. - self.sendall(b'\x01' + chr(len(self.__proxy[4])).encode() + self.__proxy[4].encode() + chr(len(self.__proxy[5])).encode() + self.__proxy[5].encode()) - authstat = self.__recvall(2) + # 需要进行用户名/密码身份验证 + self.sendall(b'\x01' + chr(len(self.__proxy[4])).encode() + self.__proxy[4].encode() + chr( + len(self.__proxy[5])).encode() + self.__proxy[5].encode()) + authstat = self.__recvall(2) # 接收身份验证状态 if authstat[0:1] != b'\x01': - # Bad response self.close() - raise GeneralProxyError((1, _generalerrors[1])) + raise GeneralProxyError((1, _generalerrors[1])) # 无效的认证响应 if authstat[1:2] != b'\x00': - # Authentication failed self.close() - raise Socks5AuthError((3, _socks5autherrors[3])) - # Authentication succeeded + raise Socks5AuthError((3, _socks5autherrors[3])) # 身份验证失败 else: - # Reaching here is always bad + # 非法的身份验证方法 self.close() if chosenauth[1:2] == b'\xff': - raise Socks5AuthError((2, _socks5autherrors[2])) + raise Socks5AuthError((2, _socks5autherrors[2])) # 不支持的身份验证 else: raise GeneralProxyError((1, _generalerrors[1])) - # Now we can request the actual connection - req = struct.pack('BBB', 0x05, 0x01, 0x00) - # If the given destination address is an IP address, we'll - # use the IPv4 address request even if remote resolving was specified. + + # 请求实际连接 + req = struct.pack('BBB', 0x05, 0x01, 0x00) # 0x01 表示连接请求 try: + # 尝试将目标地址作为IP地址处理 ipaddr = socket.inet_aton(destaddr) - req = req + b'\x01' + ipaddr + req = req + b'\x01' + ipaddr # IPv4地址请求 except socket.error: - # Well it's not an IP number, so it's probably a DNS name. - if self.__proxy[3]: - # Resolve remotely + # 目标地址不是IP地址,可能是DNS名称 + if self.__proxy[3]: # 检查是否需要远程解析 ipaddr = None - req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + (destaddr if isinstance(destaddr, bytes) else destaddr.encode()) + req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + ( + destaddr if isinstance(destaddr, bytes) else destaddr.encode()) else: - # Resolve locally + # 本地解析 ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) req = req + chr(0x01).encode() + ipaddr - req = req + struct.pack(">H", destport) - self.sendall(req) - # Get the response + req = req + struct.pack(">H", destport) # 添加目标端口 + self.sendall(req) # 发送请求 + + # 获取响应 resp = self.__recvall(4) if resp[0:1] != chr(0x05).encode(): self.close() - raise GeneralProxyError((1, _generalerrors[1])) + raise GeneralProxyError((1, _generalerrors[1])) # 协议错误 elif resp[1:2] != chr(0x00).encode(): - # Connection failed + # 连接失败 self.close() - if ord(resp[1:2])<=8: + if ord(resp[1:2]) <= 8: raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])])) else: raise Socks5Error((9, _socks5errors[9])) - # Get the bound address/port + + # 解析绑定地址/端口 elif resp[3:4] == chr(0x01).encode(): - boundaddr = self.__recvall(4) + boundaddr = self.__recvall(4) # IPv4绑定地址 elif resp[3:4] == chr(0x03).encode(): - resp = resp + self.recv(1) - boundaddr = self.__recvall(ord(resp[4:5])) + resp = resp + self.recv(1) # 读取后续字节 + boundaddr = self.__recvall(ord(resp[4:5])) # DNS名 else: self.close() - raise GeneralProxyError((1,_generalerrors[1])) + raise GeneralProxyError((1, _generalerrors[1])) + + # 获取绑定端口 boundport = struct.unpack(">H", self.__recvall(2))[0] - self.__proxysockname = (boundaddr, boundport) + self.__proxysockname = (boundaddr, boundport) # 保存绑定的socket信息 if ipaddr != None: - self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) + self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) # 保存peer名称 else: self.__proxypeername = (destaddr, destport) @@ -264,135 +261,153 @@ class socksocket(socket.socket): """getsockname() -> address info Returns the bound IP address and port number at the proxy. """ - return self.__proxysockname + return self.__proxysockname # 返回代理的socket信息 def getproxypeername(self): """getproxypeername() -> address info Returns the IP and port number of the proxy. """ - return _orgsocket.getpeername(self) + return _orgsocket.getpeername(self) # 返回代理的peer信息 def getpeername(self): """getpeername() -> address info Returns the IP address and port number of the destination machine (note: getproxypeername returns the proxy) """ - return self.__proxypeername + return self.__proxypeername # 返回目标机器的peer名称 - def __negotiatesocks4(self,destaddr,destport): + def __negotiatesocks4(self, destaddr, destport): """__negotiatesocks4(self,destaddr,destport) Negotiates a connection through a SOCKS4 server. """ - # Check if the destination address provided is an IP address + # 检查目标地址是否为IP地址 rmtrslv = False try: ipaddr = socket.inet_aton(destaddr) except socket.error: - # It's a DNS name. Check where it should be resolved. + # 目标是DNS名称, 检查是否应进行远程解析 if self.__proxy[3]: - ipaddr = struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01) + ipaddr = struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01) # 默认IP地址 rmtrslv = True else: - ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) - # Construct the request packet - req = struct.pack(">BBH", 0x04, 0x01, destport) + ipaddr - # The username parameter is considered userid for SOCKS4 + ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) # 本地解析 + + # 构造请求报文 + req = struct.pack(">BBH", 0x04, 0x01, destport) + ipaddr # SOCKS4 请求 + # 用户名参数视为userid if self.__proxy[4] != None: req = req + self.__proxy[4] - req = req + chr(0x00).encode() - # DNS name if remote resolving is required - # NOTE: This is actually an extension to the SOCKS4 protocol - # called SOCKS4A and may not be supported in all cases. + req = req + chr(0x00).encode() # 请求结束 + + # 需要远程解析的情况下使用DNS名称 if rmtrslv: - req = req + destaddr + chr(0x00).encode() - self.sendall(req) - # Get the response from the server + req = req + destaddr + chr(0x00).encode() # SOCKS4A扩展 + self.sendall(req) # 发送请求 + + # 获取服务器响应 resp = self.__recvall(8) if resp[0:1] != chr(0x00).encode(): - # Bad data + # 出现错误 self.close() - raise GeneralProxyError((1,_generalerrors[1])) + raise GeneralProxyError((1, _generalerrors[1])) # 无效数据 if resp[1:2] != chr(0x5A).encode(): - # Server returned an error + # 服务器返回错误 self.close() if ord(resp[1:2]) in (91, 92, 93): self.close() - raise Socks4Error((ord(resp[1:2]), _socks4errors[ord(resp[1:2]) - 90])) + raise Socks4Error((ord(resp[1:2]), _socks4errors[ord(resp[1:2]) - 90])) # 处理错误代码 else: - raise Socks4Error((94, _socks4errors[4])) - # Get the bound address/port - self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0]) + raise Socks4Error((94, _socks4errors[4])) # 一般性错误 + + # 获取绑定的地址和端口 + self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0]) # 保存绑定信息 if rmtrslv != None: - self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) + self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) # 目标地址 else: - self.__proxypeername = (destaddr, destport) + self.__proxypeername = (destaddr, destport) # 目标地址 def __negotiatehttp(self, destaddr, destport): """__negotiatehttp(self,destaddr,destport) Negotiates a connection through an HTTP server. """ - # If we need to resolve locally, we do this now + # 本地解析 if not self.__proxy[3]: addr = socket.gethostbyname(destaddr) else: addr = destaddr - self.sendall(("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + "Host: " + destaddr + "\r\n\r\n").encode()) - # We read the response until we get the string "\r\n\r\n" + + # 发送HTTP的CONNECT请求 + self.sendall( + ("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + "Host: " + destaddr + "\r\n\r\n").encode()) + + # 读取响应直到遇到"\r\n\r\n" resp = self.recv(1) while resp.find("\r\n\r\n".encode()) == -1: resp = resp + self.recv(1) - # We just need the first line to check if the connection - # was successful + + # 仅需第一行检查连接是否成功 statusline = resp.splitlines()[0].split(" ".encode(), 2) if statusline[0] not in ("HTTP/1.0".encode(), "HTTP/1.1".encode()): self.close() - raise GeneralProxyError((1, _generalerrors[1])) + raise GeneralProxyError((1, _generalerrors[1])) # 无效的HTTP协议 try: - statuscode = int(statusline[1]) + statuscode = int(statusline[1]) # 获取状态码 except ValueError: self.close() - raise GeneralProxyError((1, _generalerrors[1])) + raise GeneralProxyError((1, _generalerrors[1])) # 状态码无效 if statuscode != 200: self.close() - raise HTTPError((statuscode, statusline[2])) - self.__proxysockname = ("0.0.0.0", 0) - self.__proxypeername = (addr, destport) + raise HTTPError((statuscode, statusline[2])) # 显示状态码错误 + + # 连接成功后的代理信息 + self.__proxysockname = ("0.0.0.0", 0) # 绑定在0.0.0.0 + self.__proxypeername = (addr, destport) # 保存peer信息 def connect(self, destpair): - """connect(self, despair) + """connect(self, destpair) Connects to the specified destination through a proxy. - destpar - A tuple of the IP/DNS address and the port number. + destpair - A tuple of the IP/DNS address and the port number. (identical to socket's connect). To select the proxy server use setproxy(). """ - # Do a minimal input check first - if (not type(destpair) in (list,tuple)) or (len(destpair) < 2) or (type(destpair[0]) != type('')) or (type(destpair[1]) != int): + # 校验输入参数 + if (not type(destpair) in (list, tuple)) or (len(destpair) < 2) or (type(destpair[0]) != type('')) or ( + type(destpair[1]) != int): raise GeneralProxyError((5, _generalerrors[5])) + + # 根据代理类型选择相应的握手协议 if self.__proxy[0] == PROXY_TYPE_SOCKS5: if self.__proxy[2] != None: portnum = self.__proxy[2] else: - portnum = 1080 - _orgsocket.connect(self, (self.__proxy[1], portnum)) - self.__negotiatesocks5(destpair[0], destpair[1]) + portnum = 1080 # 默认端口 + + _orgsocket.connect(self, (self.__proxy[1], portnum)) # 连接到代理 + self.__negotiatesocks5(destpair[0], destpair[1]) # 握手 + elif self.__proxy[0] == PROXY_TYPE_SOCKS4: if self.__proxy[2] != None: portnum = self.__proxy[2] else: - portnum = 1080 - _orgsocket.connect(self,(self.__proxy[1], portnum)) - self.__negotiatesocks4(destpair[0], destpair[1]) + portnum = 1080 # 默认端口 + + _orgsocket.connect(self, (self.__proxy[1], portnum)) # 连接到代理 + self.__negotiatesocks4(destpair[0], destpair[1]) # 握手 + elif self.__proxy[0] == PROXY_TYPE_HTTP: if self.__proxy[2] != None: portnum = self.__proxy[2] else: - portnum = 8080 - _orgsocket.connect(self,(self.__proxy[1], portnum)) - self.__negotiatehttp(destpair[0], destpair[1]) + portnum = 8080 # 默认端口 + + _orgsocket.connect(self, (self.__proxy[1], portnum)) # 连接到代理 + self.__negotiatehttp(destpair[0], destpair[1]) # 握手 + elif self.__proxy[0] == None: + # 没有代理时直接连接目标 _orgsocket.connect(self, (destpair[0], destpair[1])) else: - raise GeneralProxyError((4, _generalerrors[4])) + raise GeneralProxyError((4, _generalerrors[4])) # 不支持的代理类型 def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): diff --git a/src/sqlmap-master/thirdparty/termcolor/termcolor.py b/src/sqlmap-master/thirdparty/termcolor/termcolor.py index ddea6dd..2ba380b 100644 --- a/src/sqlmap-master/thirdparty/termcolor/termcolor.py +++ b/src/sqlmap-master/thirdparty/termcolor/termcolor.py @@ -79,12 +79,14 @@ COLORS = dict( )) ) +# 添加亮色到 COLORS 字典 COLORS.update(dict(("light%s" % color, COLORS[color] + 60) for color in COLORS)) -# Reference: https://misc.flogisoft.com/bash/tip_colors_and_formatting -COLORS["lightgrey"] = 37 -COLORS["darkgrey"] = 90 +# 带有参考的颜色定义 +COLORS["lightgrey"] = 37 # 亮灰色 +COLORS["darkgrey"] = 90 # 深灰色 +# 定义重置颜色的转义序列 RESET = '\033[0m' @@ -104,34 +106,37 @@ def colored(text, color=None, on_color=None, attrs=None): colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink']) colored('Hello, World!', 'green') """ + # 检查环境变量,以决定是否支持 ANSI 颜色 if os.getenv('ANSI_COLORS_DISABLED') is None: - fmt_str = '\033[%dm%s' + fmt_str = '\033[%dm%s' # ANSI 格式化字符串 if color is not None: - text = fmt_str % (COLORS[color], text) + text = fmt_str % (COLORS[color], text) # 将文本颜色应用于文本 if on_color is not None: - text = fmt_str % (HIGHLIGHTS[on_color], text) + text = fmt_str % (HIGHLIGHTS[on_color], text) # 将背景颜色应用于文本 if attrs is not None: for attr in attrs: - text = fmt_str % (ATTRIBUTES[attr], text) + text = fmt_str % (ATTRIBUTES[attr], text) # 应用文本属性 - text += RESET + text += RESET # 重置颜色到默认值 return text def cprint(text, color=None, on_color=None, attrs=None, **kwargs): - """Print colorize text. + """打印带颜色的文本。 - It accepts arguments of print function. + 支持 print 函数的所有参数。 """ - - print((colored(text, color, on_color, attrs)), **kwargs) + print((colored(text, color, on_color, attrs)), **kwargs) # 调用 colored 函数并进行打印 if __name__ == '__main__': - print('Current terminal type: %s' % os.getenv('TERM')) + # 用于测试的主模块 + print('Current terminal type: %s' % os.getenv('TERM')) # 打印当前终端类型 print('Test basic colors:') + + # 测试基本颜色 cprint('Grey color', 'grey') cprint('Red color', 'red') cprint('Green color', 'green') @@ -143,6 +148,7 @@ if __name__ == '__main__': print(('-' * 78)) print('Test highlights:') + # 测试背景高亮 cprint('On grey color', on_color='on_grey') cprint('On red color', on_color='on_red') cprint('On green color', on_color='on_green') @@ -154,6 +160,7 @@ if __name__ == '__main__': print('-' * 78) print('Test attributes:') + # 测试文本属性 cprint('Bold grey color', 'grey', attrs=['bold']) cprint('Dark red color', 'red', attrs=['dark']) cprint('Underline green color', 'green', attrs=['underline']) @@ -161,13 +168,12 @@ if __name__ == '__main__': cprint('Reversed blue color', 'blue', attrs=['reverse']) cprint('Concealed Magenta color', 'magenta', attrs=['concealed']) cprint('Bold underline reverse cyan color', 'cyan', - attrs=['bold', 'underline', 'reverse']) + attrs=['bold', 'underline', 'reverse']) cprint('Dark blink concealed white color', 'white', - attrs=['dark', 'blink', 'concealed']) + attrs=['dark', 'blink', 'concealed']) print(('-' * 78)) print('Test mixing:') - cprint('Underline red on grey color', 'red', 'on_grey', - ['underline']) - cprint('Reversed green on red color', 'green', 'on_red', ['reverse']) - + # 测试混合使用颜色和属性 + cprint('Underline red on grey color', 'red', 'on_grey', ['underline']) + cprint('Reversed green on red color', 'green', 'on_red', ['reverse']) \ No newline at end of file diff --git a/src/sqlmap-master/thirdparty/wininetpton/win_inet_pton.py b/src/sqlmap-master/thirdparty/wininetpton/win_inet_pton.py index 50ae621..cfe5b3b 100644 --- a/src/sqlmap-master/thirdparty/wininetpton/win_inet_pton.py +++ b/src/sqlmap-master/thirdparty/wininetpton/win_inet_pton.py @@ -9,30 +9,51 @@ import ctypes import os +# 定义 sockaddr 结构体,以便与底层 socket API 交互 class sockaddr(ctypes.Structure): - _fields_ = [("sa_family", ctypes.c_short), - ("__pad1", ctypes.c_ushort), - ("ipv4_addr", ctypes.c_byte * 4), - ("ipv6_addr", ctypes.c_byte * 16), - ("__pad2", ctypes.c_ulong)] + _fields_ = [ + ("sa_family", ctypes.c_short), # 地址族,例如 AF_INET 或 AF_INET6 + ("__pad1", ctypes.c_ushort), # 填充字段用于对齐 + ("ipv4_addr", ctypes.c_byte * 4), # IPv4 地址(4 字节) + ("ipv6_addr", ctypes.c_byte * 16), # IPv6 地址(16 字节) + ("__pad2", ctypes.c_ulong) # 额外填充字段 + ] + +# 检测是否在 Windows 平台上,并加载相应的 Windows socket 函数 if hasattr(ctypes, 'windll'): - WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA - WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA + WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA # 字符串转换为地址 + WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA # 地址转换为字符串 else: def not_windows(): raise SystemError( "Invalid platform. ctypes.windll must be available." ) - WSAStringToAddressA = not_windows - WSAAddressToStringA = not_windows + + + WSAStringToAddressA = not_windows # 非 Windows 平台时的占位符 + WSAAddressToStringA = not_windows # 非 Windows 平台时的占位符 def inet_pton(address_family, ip_string): - addr = sockaddr() - addr.sa_family = address_family - addr_size = ctypes.c_int(ctypes.sizeof(addr)) + """ + Convert an IP address from text to binary format. + + Args: + address_family: The address family (AF_INET for IPv4 or AF_INET6 for IPv6). + ip_string: The IP address in string form (e.g., "192.168.1.1"). + + Returns: + A packed binary representation of the IP address. + Raises: + socket.error: If the conversion fails. + """ + addr = sockaddr() # 创建 sockaddr 实例 + addr.sa_family = address_family # 设置地址族 + addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 地址结构的大小 + + # 调用 WSAStringToAddressA 将字符串 IP 转换为二进制地址 if WSAStringToAddressA( ip_string, address_family, @@ -40,34 +61,50 @@ def inet_pton(address_family, ip_string): ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: - raise socket.error(ctypes.FormatError()) + raise socket.error(ctypes.FormatError()) # 转换失败时抛出异常 + # 根据地址族返回相应的打包 IP 地址 if address_family == socket.AF_INET: - return ctypes.string_at(addr.ipv4_addr, 4) + return ctypes.string_at(addr.ipv4_addr, 4) # IPv4 地址 if address_family == socket.AF_INET6: - return ctypes.string_at(addr.ipv6_addr, 16) + return ctypes.string_at(addr.ipv6_addr, 16) # IPv6 地址 - raise socket.error('unknown address family') + raise socket.error('unknown address family') # 抛出未知地址族的异常 def inet_ntop(address_family, packed_ip): - addr = sockaddr() - addr.sa_family = address_family - addr_size = ctypes.c_int(ctypes.sizeof(addr)) - ip_string = ctypes.create_string_buffer(128) - ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) + """ + Convert a binary IP address to its text representation. + + Args: + address_family: The address family (AF_INET for IPv4 or AF_INET6 for IPv6). + packed_ip: The packed binary IP address. + + Returns: + The IP address in string form. + Raises: + socket.error: If the conversion fails. + """ + addr = sockaddr() # 创建 sockaddr 实例 + addr.sa_family = address_family # 设置地址族 + addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 地址结构的大小 + ip_string = ctypes.create_string_buffer(128) # 用于存储 IP 字符串 + ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) # 字符串大小 + + # 根据地址族填充 sockaddr 结构 if address_family == socket.AF_INET: if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): - raise socket.error('packed IP wrong length for inet_ntoa') - ctypes.memmove(addr.ipv4_addr, packed_ip, 4) + raise socket.error('packed IP wrong length for inet_ntoa') # IPv4 地址长度错误 + ctypes.memmove(addr.ipv4_addr, packed_ip, 4) # 复制 IPv4 地址 elif address_family == socket.AF_INET6: if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): - raise socket.error('packed IP wrong length for inet_ntoa') - ctypes.memmove(addr.ipv6_addr, packed_ip, 16) + raise socket.error('packed IP wrong length for inet_ntoa') # IPv6 地址长度错误 + ctypes.memmove(addr.ipv6_addr, packed_ip, 16) # 复制 IPv6 地址 else: - raise socket.error('unknown address family') + raise socket.error('unknown address family') # 抛出未知地址族的异常 + # 调用 WSAAddressToStringA 将二进制地址转换为字符串 if WSAAddressToStringA( ctypes.byref(addr), addr_size, @@ -75,11 +112,12 @@ def inet_ntop(address_family, packed_ip): ip_string, ctypes.byref(ip_string_size) ) != 0: - raise socket.error(ctypes.FormatError()) + raise socket.error(ctypes.FormatError()) # 转换失败时抛出异常 + + return ip_string[:ip_string_size.value - 1] # 返回字符串表示形式,移除尾部的空字符 - return ip_string[:ip_string_size.value - 1] -# Adding our two functions to the socket library +# 如果在 Windows 平台,将自定义的函数添加到 socket 库中 if os.name == 'nt': - socket.inet_pton = inet_pton - socket.inet_ntop = inet_ntop + socket.inet_pton = inet_pton # 添加 inet_pton 函数 + socket.inet_ntop = inet_ntop # 添加 inet_ntop 函数 \ No newline at end of file