liguanwei_change

liguanwei_branch
liguanwei 3 months ago
parent 5472a4aaa3
commit c1bd472c8d

@ -121,6 +121,7 @@ def unwrapmodule(module):
module.socket.socket = _orgsocket module.socket.socket = _orgsocket
module.socket.create_connection = _orgcreateconnection module.socket.create_connection = _orgcreateconnection
class socksocket(socket.socket): class socksocket(socket.socket):
"""socksocket([family[, type[, proto]]]) -> socket object """socksocket([family[, type[, proto]]]) -> socket object
Open a SOCKS enabled socket. The parameters are the same as Open a SOCKS enabled socket. The parameters are the same as
@ -129,11 +130,17 @@ class socksocket(socket.socket):
""" """
def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None): def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None):
# 初始化父类 socket
_orgsocket.__init__(self, family, type, proto, _sock) _orgsocket.__init__(self, family, type, proto, _sock)
# 根据默认代理设置代理
if _defaultproxy != None: if _defaultproxy != None:
self.__proxy = _defaultproxy self.__proxy = _defaultproxy
else: else:
# 初始化代理信息,如果没有默认代理则设为空元组
self.__proxy = (None, None, None, None, None, None) self.__proxy = (None, None, None, None, None, None)
# 用于保存代理关联的本地socket信息
self.__proxysockname = None self.__proxysockname = None
self.__proxypeername = None self.__proxypeername = None
@ -142,121 +149,111 @@ class socksocket(socket.socket):
Receive EXACTLY the number of bytes requested from the socket. Receive EXACTLY the number of bytes requested from the socket.
Blocks until the required number of bytes have been received. Blocks until the required number of bytes have been received.
""" """
# 从socket接收指定字节数数据
data = self.recv(count) data = self.recv(count)
# 当接收的数据少于请求的字节数时继续接收
while len(data) < count: while len(data) < count:
d = self.recv(count - len(data)) d = self.recv(count - len(data))
if not d: raise GeneralProxyError((0, "connection closed unexpectedly")) if not d:
data = data + d raise GeneralProxyError((0, "connection closed unexpectedly")) # 如果连接关闭则抛出异常
data = data + d # 累加接收到的数据
return data return data
def setproxy(self, proxytype=None, addr=None, port=None, rdns=True, username=None, password=None): def setproxy(self, proxytype=None, addr=None, port=None, rdns=True, username=None, password=None):
"""setproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) """setproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
Sets the proxy to be used. Sets the proxy to be used.
proxytype - The type of the proxy to be used. Three types
are supported: PROXY_TYPE_SOCKS4 (including socks4a),
PROXY_TYPE_SOCKS5 and PROXY_TYPE_HTTP
addr - The address of the server (IP or DNS).
port - The port of the server. Defaults to 1080 for SOCKS
servers and 8080 for HTTP proxy servers.
rdns - Should DNS queries be preformed on the remote side
(rather than the local side). The default is True.
Note: This has no effect with SOCKS4 servers.
username - Username to authenticate with to the server.
The default is no authentication.
password - Password to authenticate with to the server.
Only relevant when username is also provided.
""" """
# 设置代理类型、地址、端口等参数
self.__proxy = (proxytype, addr, port, rdns, username, password) self.__proxy = (proxytype, addr, port, rdns, username, password)
def __negotiatesocks5(self, destaddr, destport): def __negotiatesocks5(self, destaddr, destport):
"""__negotiatesocks5(self,destaddr,destport) """__negotiatesocks5(self,destaddr,destport)
Negotiates a connection through a SOCKS5 server. Negotiates a connection through a SOCKS5 server.
""" """
# First we'll send the authentication packages we support. # 发送支持的认证方法
if (self.__proxy[4] != None) and (self.__proxy[5] != None): if (self.__proxy[4] != None) and (self.__proxy[5] != None):
# The username/password details were supplied to the # 如果提供了用户名和密码,则支持用户名/密码认证
# setproxy method so we support the USERNAME/PASSWORD self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02)) # 0x02 代表用户名/密码认证
# authentication (in addition to the standard none).
self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02))
else: else:
# No username/password were entered, therefore we # 仅支持无认证连接
# only support connections with no authentication. self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00)) # 0x01 代表无认证
self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00))
# We'll receive the server's response to determine which # 接收服务器的响应以确定选定的身份验证方法
# method was selected
chosenauth = self.__recvall(2) chosenauth = self.__recvall(2)
if chosenauth[0:1] != b'\x05': if chosenauth[0:1] != b'\x05':
self.close() self.close()
raise GeneralProxyError((1, _generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1])) # 无效的协议版本
# Check the chosen authentication method
# 检查所选的身份验证方法
if chosenauth[1:2] == b'\x00': if chosenauth[1:2] == b'\x00':
# No authentication is required # 无需身份验证
pass pass
elif chosenauth[1:2] == b'\x02': elif chosenauth[1:2] == b'\x02':
# Okay, we need to perform a basic username/password # 需要进行用户名/密码身份验证
# authentication. self.sendall(b'\x01' + chr(len(self.__proxy[4])).encode() + self.__proxy[4].encode() + chr(
self.sendall(b'\x01' + chr(len(self.__proxy[4])).encode() + self.__proxy[4].encode() + chr(len(self.__proxy[5])).encode() + self.__proxy[5].encode()) len(self.__proxy[5])).encode() + self.__proxy[5].encode())
authstat = self.__recvall(2) authstat = self.__recvall(2) # 接收身份验证状态
if authstat[0:1] != b'\x01': if authstat[0:1] != b'\x01':
# Bad response
self.close() self.close()
raise GeneralProxyError((1, _generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1])) # 无效的认证响应
if authstat[1:2] != b'\x00': if authstat[1:2] != b'\x00':
# Authentication failed
self.close() self.close()
raise Socks5AuthError((3, _socks5autherrors[3])) raise Socks5AuthError((3, _socks5autherrors[3])) # 身份验证失败
# Authentication succeeded
else: else:
# Reaching here is always bad # 非法的身份验证方法
self.close() self.close()
if chosenauth[1:2] == b'\xff': if chosenauth[1:2] == b'\xff':
raise Socks5AuthError((2, _socks5autherrors[2])) raise Socks5AuthError((2, _socks5autherrors[2])) # 不支持的身份验证
else: else:
raise GeneralProxyError((1, _generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1]))
# Now we can request the actual connection
req = struct.pack('BBB', 0x05, 0x01, 0x00) # 请求实际连接
# If the given destination address is an IP address, we'll req = struct.pack('BBB', 0x05, 0x01, 0x00) # 0x01 表示连接请求
# use the IPv4 address request even if remote resolving was specified.
try: try:
# 尝试将目标地址作为IP地址处理
ipaddr = socket.inet_aton(destaddr) ipaddr = socket.inet_aton(destaddr)
req = req + b'\x01' + ipaddr req = req + b'\x01' + ipaddr # IPv4地址请求
except socket.error: except socket.error:
# Well it's not an IP number, so it's probably a DNS name. # 目标地址不是IP地址可能是DNS名称
if self.__proxy[3]: if self.__proxy[3]: # 检查是否需要远程解析
# Resolve remotely
ipaddr = None ipaddr = None
req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + (destaddr if isinstance(destaddr, bytes) else destaddr.encode()) req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + (
destaddr if isinstance(destaddr, bytes) else destaddr.encode())
else: else:
# Resolve locally # 本地解析
ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) ipaddr = socket.inet_aton(socket.gethostbyname(destaddr))
req = req + chr(0x01).encode() + ipaddr req = req + chr(0x01).encode() + ipaddr
req = req + struct.pack(">H", destport) req = req + struct.pack(">H", destport) # 添加目标端口
self.sendall(req) self.sendall(req) # 发送请求
# Get the response
# 获取响应
resp = self.__recvall(4) resp = self.__recvall(4)
if resp[0:1] != chr(0x05).encode(): if resp[0:1] != chr(0x05).encode():
self.close() self.close()
raise GeneralProxyError((1, _generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1])) # 协议错误
elif resp[1:2] != chr(0x00).encode(): elif resp[1:2] != chr(0x00).encode():
# Connection failed # 连接失败
self.close() self.close()
if ord(resp[1:2]) <= 8: if ord(resp[1:2]) <= 8:
raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])])) raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])]))
else: else:
raise Socks5Error((9, _socks5errors[9])) raise Socks5Error((9, _socks5errors[9]))
# Get the bound address/port
# 解析绑定地址/端口
elif resp[3:4] == chr(0x01).encode(): elif resp[3:4] == chr(0x01).encode():
boundaddr = self.__recvall(4) boundaddr = self.__recvall(4) # IPv4绑定地址
elif resp[3:4] == chr(0x03).encode(): elif resp[3:4] == chr(0x03).encode():
resp = resp + self.recv(1) resp = resp + self.recv(1) # 读取后续字节
boundaddr = self.__recvall(ord(resp[4:5])) boundaddr = self.__recvall(ord(resp[4:5])) # DNS名
else: else:
self.close() self.close()
raise GeneralProxyError((1, _generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1]))
# 获取绑定端口
boundport = struct.unpack(">H", self.__recvall(2))[0] boundport = struct.unpack(">H", self.__recvall(2))[0]
self.__proxysockname = (boundaddr, boundport) self.__proxysockname = (boundaddr, boundport) # 保存绑定的socket信息
if ipaddr != None: if ipaddr != None:
self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) # 保存peer名称
else: else:
self.__proxypeername = (destaddr, destport) self.__proxypeername = (destaddr, destport)
@ -264,135 +261,153 @@ class socksocket(socket.socket):
"""getsockname() -> address info """getsockname() -> address info
Returns the bound IP address and port number at the proxy. Returns the bound IP address and port number at the proxy.
""" """
return self.__proxysockname return self.__proxysockname # 返回代理的socket信息
def getproxypeername(self): def getproxypeername(self):
"""getproxypeername() -> address info """getproxypeername() -> address info
Returns the IP and port number of the proxy. Returns the IP and port number of the proxy.
""" """
return _orgsocket.getpeername(self) return _orgsocket.getpeername(self) # 返回代理的peer信息
def getpeername(self): def getpeername(self):
"""getpeername() -> address info """getpeername() -> address info
Returns the IP address and port number of the destination Returns the IP address and port number of the destination
machine (note: getproxypeername returns the proxy) machine (note: getproxypeername returns the proxy)
""" """
return self.__proxypeername return self.__proxypeername # 返回目标机器的peer名称
def __negotiatesocks4(self, destaddr, destport): def __negotiatesocks4(self, destaddr, destport):
"""__negotiatesocks4(self,destaddr,destport) """__negotiatesocks4(self,destaddr,destport)
Negotiates a connection through a SOCKS4 server. Negotiates a connection through a SOCKS4 server.
""" """
# Check if the destination address provided is an IP address # 检查目标地址是否为IP地址
rmtrslv = False rmtrslv = False
try: try:
ipaddr = socket.inet_aton(destaddr) ipaddr = socket.inet_aton(destaddr)
except socket.error: except socket.error:
# It's a DNS name. Check where it should be resolved. # 目标是DNS名称, 检查是否应进行远程解析
if self.__proxy[3]: if self.__proxy[3]:
ipaddr = struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01) ipaddr = struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01) # 默认IP地址
rmtrslv = True rmtrslv = True
else: else:
ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) # 本地解析
# Construct the request packet
req = struct.pack(">BBH", 0x04, 0x01, destport) + ipaddr # 构造请求报文
# The username parameter is considered userid for SOCKS4 req = struct.pack(">BBH", 0x04, 0x01, destport) + ipaddr # SOCKS4 请求
# 用户名参数视为userid
if self.__proxy[4] != None: if self.__proxy[4] != None:
req = req + self.__proxy[4] req = req + self.__proxy[4]
req = req + chr(0x00).encode() req = req + chr(0x00).encode() # 请求结束
# DNS name if remote resolving is required
# NOTE: This is actually an extension to the SOCKS4 protocol # 需要远程解析的情况下使用DNS名称
# called SOCKS4A and may not be supported in all cases.
if rmtrslv: if rmtrslv:
req = req + destaddr + chr(0x00).encode() req = req + destaddr + chr(0x00).encode() # SOCKS4A扩展
self.sendall(req) self.sendall(req) # 发送请求
# Get the response from the server
# 获取服务器响应
resp = self.__recvall(8) resp = self.__recvall(8)
if resp[0:1] != chr(0x00).encode(): if resp[0:1] != chr(0x00).encode():
# Bad data # 出现错误
self.close() self.close()
raise GeneralProxyError((1,_generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1])) # 无效数据
if resp[1:2] != chr(0x5A).encode(): if resp[1:2] != chr(0x5A).encode():
# Server returned an error # 服务器返回错误
self.close() self.close()
if ord(resp[1:2]) in (91, 92, 93): if ord(resp[1:2]) in (91, 92, 93):
self.close() self.close()
raise Socks4Error((ord(resp[1:2]), _socks4errors[ord(resp[1:2]) - 90])) raise Socks4Error((ord(resp[1:2]), _socks4errors[ord(resp[1:2]) - 90])) # 处理错误代码
else: else:
raise Socks4Error((94, _socks4errors[4])) raise Socks4Error((94, _socks4errors[4])) # 一般性错误
# Get the bound address/port
self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0]) # 获取绑定的地址和端口
self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0]) # 保存绑定信息
if rmtrslv != None: if rmtrslv != None:
self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) # 目标地址
else: else:
self.__proxypeername = (destaddr, destport) self.__proxypeername = (destaddr, destport) # 目标地址
def __negotiatehttp(self, destaddr, destport): def __negotiatehttp(self, destaddr, destport):
"""__negotiatehttp(self,destaddr,destport) """__negotiatehttp(self,destaddr,destport)
Negotiates a connection through an HTTP server. Negotiates a connection through an HTTP server.
""" """
# If we need to resolve locally, we do this now # 本地解析
if not self.__proxy[3]: if not self.__proxy[3]:
addr = socket.gethostbyname(destaddr) addr = socket.gethostbyname(destaddr)
else: else:
addr = destaddr addr = destaddr
self.sendall(("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + "Host: " + destaddr + "\r\n\r\n").encode())
# We read the response until we get the string "\r\n\r\n" # 发送HTTP的CONNECT请求
self.sendall(
("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + "Host: " + destaddr + "\r\n\r\n").encode())
# 读取响应直到遇到"\r\n\r\n"
resp = self.recv(1) resp = self.recv(1)
while resp.find("\r\n\r\n".encode()) == -1: while resp.find("\r\n\r\n".encode()) == -1:
resp = resp + self.recv(1) resp = resp + self.recv(1)
# We just need the first line to check if the connection
# was successful # 仅需第一行检查连接是否成功
statusline = resp.splitlines()[0].split(" ".encode(), 2) statusline = resp.splitlines()[0].split(" ".encode(), 2)
if statusline[0] not in ("HTTP/1.0".encode(), "HTTP/1.1".encode()): if statusline[0] not in ("HTTP/1.0".encode(), "HTTP/1.1".encode()):
self.close() self.close()
raise GeneralProxyError((1, _generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1])) # 无效的HTTP协议
try: try:
statuscode = int(statusline[1]) statuscode = int(statusline[1]) # 获取状态码
except ValueError: except ValueError:
self.close() self.close()
raise GeneralProxyError((1, _generalerrors[1])) raise GeneralProxyError((1, _generalerrors[1])) # 状态码无效
if statuscode != 200: if statuscode != 200:
self.close() self.close()
raise HTTPError((statuscode, statusline[2])) raise HTTPError((statuscode, statusline[2])) # 显示状态码错误
self.__proxysockname = ("0.0.0.0", 0)
self.__proxypeername = (addr, destport) # 连接成功后的代理信息
self.__proxysockname = ("0.0.0.0", 0) # 绑定在0.0.0.0
self.__proxypeername = (addr, destport) # 保存peer信息
def connect(self, destpair): def connect(self, destpair):
"""connect(self, despair) """connect(self, destpair)
Connects to the specified destination through a proxy. Connects to the specified destination through a proxy.
destpar - A tuple of the IP/DNS address and the port number. destpair - A tuple of the IP/DNS address and the port number.
(identical to socket's connect). (identical to socket's connect).
To select the proxy server use setproxy(). To select the proxy server use setproxy().
""" """
# Do a minimal input check first # 校验输入参数
if (not type(destpair) in (list,tuple)) or (len(destpair) < 2) or (type(destpair[0]) != type('')) or (type(destpair[1]) != int): if (not type(destpair) in (list, tuple)) or (len(destpair) < 2) or (type(destpair[0]) != type('')) or (
type(destpair[1]) != int):
raise GeneralProxyError((5, _generalerrors[5])) raise GeneralProxyError((5, _generalerrors[5]))
# 根据代理类型选择相应的握手协议
if self.__proxy[0] == PROXY_TYPE_SOCKS5: if self.__proxy[0] == PROXY_TYPE_SOCKS5:
if self.__proxy[2] != None: if self.__proxy[2] != None:
portnum = self.__proxy[2] portnum = self.__proxy[2]
else: else:
portnum = 1080 portnum = 1080 # 默认端口
_orgsocket.connect(self, (self.__proxy[1], portnum))
self.__negotiatesocks5(destpair[0], destpair[1]) _orgsocket.connect(self, (self.__proxy[1], portnum)) # 连接到代理
self.__negotiatesocks5(destpair[0], destpair[1]) # 握手
elif self.__proxy[0] == PROXY_TYPE_SOCKS4: elif self.__proxy[0] == PROXY_TYPE_SOCKS4:
if self.__proxy[2] != None: if self.__proxy[2] != None:
portnum = self.__proxy[2] portnum = self.__proxy[2]
else: else:
portnum = 1080 portnum = 1080 # 默认端口
_orgsocket.connect(self,(self.__proxy[1], portnum))
self.__negotiatesocks4(destpair[0], destpair[1]) _orgsocket.connect(self, (self.__proxy[1], portnum)) # 连接到代理
self.__negotiatesocks4(destpair[0], destpair[1]) # 握手
elif self.__proxy[0] == PROXY_TYPE_HTTP: elif self.__proxy[0] == PROXY_TYPE_HTTP:
if self.__proxy[2] != None: if self.__proxy[2] != None:
portnum = self.__proxy[2] portnum = self.__proxy[2]
else: else:
portnum = 8080 portnum = 8080 # 默认端口
_orgsocket.connect(self,(self.__proxy[1], portnum))
self.__negotiatehttp(destpair[0], destpair[1]) _orgsocket.connect(self, (self.__proxy[1], portnum)) # 连接到代理
self.__negotiatehttp(destpair[0], destpair[1]) # 握手
elif self.__proxy[0] == None: elif self.__proxy[0] == None:
# 没有代理时直接连接目标
_orgsocket.connect(self, (destpair[0], destpair[1])) _orgsocket.connect(self, (destpair[0], destpair[1]))
else: else:
raise GeneralProxyError((4, _generalerrors[4])) raise GeneralProxyError((4, _generalerrors[4])) # 不支持的代理类型
def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None): source_address=None):

@ -79,12 +79,14 @@ COLORS = dict(
)) ))
) )
# 添加亮色到 COLORS 字典
COLORS.update(dict(("light%s" % color, COLORS[color] + 60) for color in COLORS)) COLORS.update(dict(("light%s" % color, COLORS[color] + 60) for color in COLORS))
# Reference: https://misc.flogisoft.com/bash/tip_colors_and_formatting # 带有参考的颜色定义
COLORS["lightgrey"] = 37 COLORS["lightgrey"] = 37 # 亮灰色
COLORS["darkgrey"] = 90 COLORS["darkgrey"] = 90 # 深灰色
# 定义重置颜色的转义序列
RESET = '\033[0m' RESET = '\033[0m'
@ -104,34 +106,37 @@ def colored(text, color=None, on_color=None, attrs=None):
colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink']) colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink'])
colored('Hello, World!', 'green') colored('Hello, World!', 'green')
""" """
# 检查环境变量,以决定是否支持 ANSI 颜色
if os.getenv('ANSI_COLORS_DISABLED') is None: if os.getenv('ANSI_COLORS_DISABLED') is None:
fmt_str = '\033[%dm%s' fmt_str = '\033[%dm%s' # ANSI 格式化字符串
if color is not None: if color is not None:
text = fmt_str % (COLORS[color], text) text = fmt_str % (COLORS[color], text) # 将文本颜色应用于文本
if on_color is not None: if on_color is not None:
text = fmt_str % (HIGHLIGHTS[on_color], text) text = fmt_str % (HIGHLIGHTS[on_color], text) # 将背景颜色应用于文本
if attrs is not None: if attrs is not None:
for attr in attrs: for attr in attrs:
text = fmt_str % (ATTRIBUTES[attr], text) text = fmt_str % (ATTRIBUTES[attr], text) # 应用文本属性
text += RESET text += RESET # 重置颜色到默认值
return text return text
def cprint(text, color=None, on_color=None, attrs=None, **kwargs): def cprint(text, color=None, on_color=None, attrs=None, **kwargs):
"""Print colorize text. """打印带颜色的文本。
It accepts arguments of print function. 支持 print 函数的所有参数
""" """
print((colored(text, color, on_color, attrs)), **kwargs) # 调用 colored 函数并进行打印
print((colored(text, color, on_color, attrs)), **kwargs)
if __name__ == '__main__': if __name__ == '__main__':
print('Current terminal type: %s' % os.getenv('TERM')) # 用于测试的主模块
print('Current terminal type: %s' % os.getenv('TERM')) # 打印当前终端类型
print('Test basic colors:') print('Test basic colors:')
# 测试基本颜色
cprint('Grey color', 'grey') cprint('Grey color', 'grey')
cprint('Red color', 'red') cprint('Red color', 'red')
cprint('Green color', 'green') cprint('Green color', 'green')
@ -143,6 +148,7 @@ if __name__ == '__main__':
print(('-' * 78)) print(('-' * 78))
print('Test highlights:') print('Test highlights:')
# 测试背景高亮
cprint('On grey color', on_color='on_grey') cprint('On grey color', on_color='on_grey')
cprint('On red color', on_color='on_red') cprint('On red color', on_color='on_red')
cprint('On green color', on_color='on_green') cprint('On green color', on_color='on_green')
@ -154,6 +160,7 @@ if __name__ == '__main__':
print('-' * 78) print('-' * 78)
print('Test attributes:') print('Test attributes:')
# 测试文本属性
cprint('Bold grey color', 'grey', attrs=['bold']) cprint('Bold grey color', 'grey', attrs=['bold'])
cprint('Dark red color', 'red', attrs=['dark']) cprint('Dark red color', 'red', attrs=['dark'])
cprint('Underline green color', 'green', attrs=['underline']) cprint('Underline green color', 'green', attrs=['underline'])
@ -167,7 +174,6 @@ if __name__ == '__main__':
print(('-' * 78)) print(('-' * 78))
print('Test mixing:') print('Test mixing:')
cprint('Underline red on grey color', 'red', 'on_grey', # 测试混合使用颜色和属性
['underline']) cprint('Underline red on grey color', 'red', 'on_grey', ['underline'])
cprint('Reversed green on red color', 'green', 'on_red', ['reverse']) cprint('Reversed green on red color', 'green', 'on_red', ['reverse'])

@ -9,30 +9,51 @@ import ctypes
import os import os
# 定义 sockaddr 结构体,以便与底层 socket API 交互
class sockaddr(ctypes.Structure): class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short), _fields_ = [
("__pad1", ctypes.c_ushort), ("sa_family", ctypes.c_short), # 地址族,例如 AF_INET 或 AF_INET6
("ipv4_addr", ctypes.c_byte * 4), ("__pad1", ctypes.c_ushort), # 填充字段用于对齐
("ipv6_addr", ctypes.c_byte * 16), ("ipv4_addr", ctypes.c_byte * 4), # IPv4 地址4 字节)
("__pad2", ctypes.c_ulong)] ("ipv6_addr", ctypes.c_byte * 16), # IPv6 地址16 字节)
("__pad2", ctypes.c_ulong) # 额外填充字段
]
# 检测是否在 Windows 平台上,并加载相应的 Windows socket 函数
if hasattr(ctypes, 'windll'): if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA # 字符串转换为地址
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA # 地址转换为字符串
else: else:
def not_windows(): def not_windows():
raise SystemError( raise SystemError(
"Invalid platform. ctypes.windll must be available." "Invalid platform. ctypes.windll must be available."
) )
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows
WSAStringToAddressA = not_windows # 非 Windows 平台时的占位符
WSAAddressToStringA = not_windows # 非 Windows 平台时的占位符
def inet_pton(address_family, ip_string): def inet_pton(address_family, ip_string):
addr = sockaddr() """
addr.sa_family = address_family Convert an IP address from text to binary format.
addr_size = ctypes.c_int(ctypes.sizeof(addr))
Args:
address_family: The address family (AF_INET for IPv4 or AF_INET6 for IPv6).
ip_string: The IP address in string form (e.g., "192.168.1.1").
Returns:
A packed binary representation of the IP address.
Raises:
socket.error: If the conversion fails.
"""
addr = sockaddr() # 创建 sockaddr 实例
addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 地址结构的大小
# 调用 WSAStringToAddressA 将字符串 IP 转换为二进制地址
if WSAStringToAddressA( if WSAStringToAddressA(
ip_string, ip_string,
address_family, address_family,
@ -40,34 +61,50 @@ def inet_pton(address_family, ip_string):
ctypes.byref(addr), ctypes.byref(addr),
ctypes.byref(addr_size) ctypes.byref(addr_size)
) != 0: ) != 0:
raise socket.error(ctypes.FormatError()) raise socket.error(ctypes.FormatError()) # 转换失败时抛出异常
# 根据地址族返回相应的打包 IP 地址
if address_family == socket.AF_INET: if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4) return ctypes.string_at(addr.ipv4_addr, 4) # IPv4 地址
if address_family == socket.AF_INET6: if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16) return ctypes.string_at(addr.ipv6_addr, 16) # IPv6 地址
raise socket.error('unknown address family') raise socket.error('unknown address family') # 抛出未知地址族的异常
def inet_ntop(address_family, packed_ip): def inet_ntop(address_family, packed_ip):
addr = sockaddr() """
addr.sa_family = address_family Convert a binary IP address to its text representation.
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128) Args:
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) address_family: The address family (AF_INET for IPv4 or AF_INET6 for IPv6).
packed_ip: The packed binary IP address.
Returns:
The IP address in string form.
Raises:
socket.error: If the conversion fails.
"""
addr = sockaddr() # 创建 sockaddr 实例
addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 地址结构的大小
ip_string = ctypes.create_string_buffer(128) # 用于存储 IP 字符串
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) # 字符串大小
# 根据地址族填充 sockaddr 结构
if address_family == socket.AF_INET: if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa') raise socket.error('packed IP wrong length for inet_ntoa') # IPv4 地址长度错误
ctypes.memmove(addr.ipv4_addr, packed_ip, 4) ctypes.memmove(addr.ipv4_addr, packed_ip, 4) # 复制 IPv4 地址
elif address_family == socket.AF_INET6: elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa') raise socket.error('packed IP wrong length for inet_ntoa') # IPv6 地址长度错误
ctypes.memmove(addr.ipv6_addr, packed_ip, 16) ctypes.memmove(addr.ipv6_addr, packed_ip, 16) # 复制 IPv6 地址
else: else:
raise socket.error('unknown address family') raise socket.error('unknown address family') # 抛出未知地址族的异常
# 调用 WSAAddressToStringA 将二进制地址转换为字符串
if WSAAddressToStringA( if WSAAddressToStringA(
ctypes.byref(addr), ctypes.byref(addr),
addr_size, addr_size,
@ -75,11 +112,12 @@ def inet_ntop(address_family, packed_ip):
ip_string, ip_string,
ctypes.byref(ip_string_size) ctypes.byref(ip_string_size)
) != 0: ) != 0:
raise socket.error(ctypes.FormatError()) raise socket.error(ctypes.FormatError()) # 转换失败时抛出异常
return ip_string[:ip_string_size.value - 1] # 返回字符串表示形式,移除尾部的空字符
return ip_string[:ip_string_size.value - 1]
# Adding our two functions to the socket library # 如果在 Windows 平台,将自定义的函数添加到 socket 库中
if os.name == 'nt': if os.name == 'nt':
socket.inet_pton = inet_pton socket.inet_pton = inet_pton # 添加 inet_pton 函数
socket.inet_ntop = inet_ntop socket.inet_ntop = inet_ntop # 添加 inet_ntop 函数
Loading…
Cancel
Save