Compare commits

...

10 Commits

@ -7,7 +7,7 @@ import logging
import re
import sys
from lib.core.settings import IS_WIN
from lib.core.settings import IS_WIN # 导入一个设置用于判断是否在Windows系统上运行
if IS_WIN:
import ctypes
@ -16,14 +16,15 @@ if IS_WIN:
# Reference: https://gist.github.com/vsajip/758430
# https://github.com/ipython/ipython/issues/4252
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms686047%28v=vs.85%29.aspx
# 设置Windows API函数SetConsoleTextAttribute的参数和返回值类型
ctypes.windll.kernel32.SetConsoleTextAttribute.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.WORD]
ctypes.windll.kernel32.SetConsoleTextAttribute.restype = ctypes.wintypes.BOOL
def stdoutEncode(data): # Cross-referenced function
def stdoutEncode(data): # 用于编码标准输出数据的函数
return data
class ColorizingStreamHandler(logging.StreamHandler):
# color names to indices
# 定义颜色名称到索引的映射
color_map = {
'black': 0,
'red': 1,
@ -35,7 +36,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
'white': 7,
}
# levels to (background, foreground, bold/intense)
# 定义日志级别到颜色和样式的映射
level_map = {
logging.DEBUG: (None, 'blue', False),
logging.INFO: (None, 'green', False),
@ -43,25 +44,30 @@ class ColorizingStreamHandler(logging.StreamHandler):
logging.ERROR: (None, 'red', False),
logging.CRITICAL: ('red', 'white', False)
}
csi = '\x1b['
reset = '\x1b[0m'
bold = "\x1b[1m"
disable_coloring = False
csi = '\x1b[' # ANSI转义序列的前缀
reset = '\x1b[0m' # ANSI重置颜色的转义序列
bold = "\x1b[1m" # ANSI加粗的转义序列
disable_coloring = False # 是否禁用颜色
@property
def is_tty(self):
# 检查流是否是终端
isatty = getattr(self.stream, 'isatty', None)
return isatty and isatty() and not self.disable_coloring
def emit(self, record):
# 发送日志记录
try:
message = stdoutEncode(self.format(record))
stream = self.stream
#如果当前流不是TTY直接写入消息
if not self.is_tty:
if message and message[0] == "\r":
message = message[1:]
stream.write(message)
#如果是TTY调用output_colorized方法来输出带颜色的消息
else:
self.output_colorized(message)
stream.write(getattr(self, 'terminator', '\n'))
@ -70,15 +76,19 @@ class ColorizingStreamHandler(logging.StreamHandler):
except (KeyboardInterrupt, SystemExit):
raise
except IOError:
#IO错误时什么也不做pass
pass
except:
#其他异常时调用handleError方法
self.handleError(record)
if not IS_WIN:
def output_colorized(self, message):
# 如果不是Windows系统直接写入消息
self.stream.write(message)
else:
ansi_esc = re.compile(r'\x1b\[((?:\d+)(?:;(?:\d+))*)m')
# 正则表达式用于匹配ANSI转义序列
nt_color_map = {
0: 0x00, # black
@ -92,26 +102,32 @@ class ColorizingStreamHandler(logging.StreamHandler):
}
def output_colorized(self, message):
# 如果是Windows系统解析ANSI转义序列并设置控制台颜色
parts = self.ansi_esc.split(message)
h = None
fd = getattr(self.stream, 'fileno', None)
#文件描述符有效并且是标准输出或标准错误获取对应的Windows句柄
if fd is not None:
fd = fd()
if fd in (1, 2): # stdout or stderr
h = ctypes.windll.kernel32.GetStdHandle(-10 - fd)
#循环处理分割后的消息部分
while parts:
text = parts.pop(0)
#如果部分是文本,写入并刷新流
if text:
self.stream.write(text)
self.stream.flush()
#如果还有部分,取出下一个部分作为参数
if parts:
params = parts.pop(0)
#如果句柄有效,将参数分割并转换为整数,初始化颜色代码
if h is not None:
params = [int(p) for p in params.split(';')]
color = 0
@ -131,9 +147,12 @@ class ColorizingStreamHandler(logging.StreamHandler):
ctypes.windll.kernel32.SetConsoleTextAttribute(h, color)
def _reset(self, message):
#重置消息的颜色
if not message.endswith(self.reset):
# 如果消息不以重置序列结尾,则添加重置序列
reset = self.reset
elif self.bold in message: # bold
elif self.bold in message:
# 如果消息包含加粗,则在重置后加粗
reset = self.reset + self.bold
else:
reset = self.reset
@ -141,19 +160,23 @@ class ColorizingStreamHandler(logging.StreamHandler):
return reset
def colorize(self, message, levelno):
# 根据日志级别给消息上色
if levelno in self.level_map and self.is_tty:
bg, fg, bold = self.level_map[levelno]
params = []
#如果背景色有效,添加背景色参数
if bg in self.color_map:
params.append(str(self.color_map[bg] + 40))
#如果前景色有效,添加前景色参数
if fg in self.color_map:
params.append(str(self.color_map[fg] + 30))
#如果需要加粗,添加加粗参数
if bold:
params.append('1')
#如果参数和消息都有效,检查消息是否有前缀(空格),并提取出来
if params and message:
if message.lstrip() != message:
prefix = re.search(r"\s+", message).group(0)
@ -167,5 +190,6 @@ class ColorizingStreamHandler(logging.StreamHandler):
return message
def format(self, record):
# 格式化日志记录
message = logging.StreamHandler.format(self, record)
return self.colorize(message, record.levelno)
return self.colorize(message, record.levelno)

File diff suppressed because it is too large Load Diff

@ -29,9 +29,12 @@ __license__ = 'MIT'
def _cli_parse(args): # pragma: no coverage
# 导入ArgumentParser模块
from argparse import ArgumentParser
# 创建ArgumentParser对象设置程序名称和用法
parser = ArgumentParser(prog=args[0], usage="%(prog)s [options] package.module:app")
# 添加参数
opt = parser.add_argument
opt("--version", action="store_true", help="show version number.")
opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.")
@ -45,6 +48,7 @@ def _cli_parse(args): # pragma: no coverage
opt("--reload", action="store_true", help="auto-reload on file changes.")
opt('app', help='WSGI app entry point.', nargs='?')
# 解析命令行参数
cli_args = parser.parse_args(args[1:])
return cli_args, parser
@ -179,7 +183,9 @@ def depr(major, minor, cause, fix):
def makelist(data): # This is just too handy
# 判断data是否为元组、列表、集合或字典类型
if isinstance(data, (tuple, list, set, dict)):
# 如果是则返回data的列表形式
return list(data)
elif data:
return [data]
@ -198,18 +204,24 @@ class DictProperty(object):
self.getter, self.key = func, self.key or func.__name__
return self
# 如果obj为None则返回self
def __get__(self, obj, cls):
# 获取属性名和存储对象
if obj is None: return self
# 如果属性名不在存储对象中则调用getter方法获取值并存储
key, storage = self.key, getattr(obj, self.attr)
if key not in storage: storage[key] = self.getter(obj)
return storage[key]
# 如果属性是只读的则抛出AttributeError异常
def __set__(self, obj, value):
if self.read_only: raise AttributeError("Read-Only property.")
getattr(obj, self.attr)[self.key] = value
def __delete__(self, obj):
# 如果属性是只读的则抛出AttributeError异常
if self.read_only: raise AttributeError("Read-Only property.")
# 从存储对象中删除对应的值
del getattr(obj, self.attr)[self.key]
@ -737,26 +749,38 @@ class Bottle(object):
self.route('/' + '/'.join(segments), **options)
def _mount_app(self, prefix, app, **options):
# 检查app是否已经被挂载或者app的config中是否已经存在'_mount.app'键
if app in self._mounts or '_mount.app' in app.config:
# 如果app已经被挂载或者app的config中已经存在'_mount.app'键则发出警告并回退到WSGI挂载
depr(0, 13, "Application mounted multiple times. Falling back to WSGI mount.",
"Clone application before mounting to a different location.")
return self._mount_wsgi(prefix, app, **options)
# 检查options是否为空
if options:
# 如果options不为空则发出警告并回退到WSGI挂载
depr(0, 13, "Unsupported mount options. Falling back to WSGI mount.",
"Do not specify any route options when mounting bottle application.")
return self._mount_wsgi(prefix, app, **options)
# 检查prefix是否以'/'结尾
if not prefix.endswith("/"):
# 如果prefix不以'/'结尾则发出警告并回退到WSGI挂载
depr(0, 13, "Prefix must end in '/'. Falling back to WSGI mount.",
"Consider adding an explicit redirect from '/prefix' to '/prefix/' in the parent application.")
return self._mount_wsgi(prefix, app, **options)
# 将app添加到_mounts列表中
self._mounts.append(app)
# 将prefix添加到app的config中
app.config['_mount.prefix'] = prefix
# 将self添加到app的config中
app.config['_mount.app'] = self
# 遍历app的routes
for route in app.routes:
# 将route的rule修改为prefix + route.rule.lstrip('/')
route.rule = prefix + route.rule.lstrip('/')
# 将修改后的route添加到self的routes中
self.add_route(route)
def mount(self, prefix, app, **options):
@ -781,11 +805,15 @@ class Bottle(object):
parent application.
"""
# 检查prefix是否以'/'开头
if not prefix.startswith('/'):
# 如果prefix不以'/'开头则抛出ValueError异常
raise ValueError("Prefix must start with '/'")
# 如果app是Bottle实例则调用_mount_app方法
if isinstance(app, Bottle):
return self._mount_app(prefix, app, **options)
# 否则调用_mount_wsgi方法
else:
return self._mount_wsgi(prefix, app, **options)
@ -1089,31 +1117,46 @@ class Bottle(object):
def wsgi(self, environ, start_response):
""" The bottle WSGI-interface. """
try:
# 将environ传递给_handle方法获取返回值
out = self._cast(self._handle(environ))
# rfc2616 section 4.3
# 如果返回的状态码是100, 101, 204, 304或者请求方法是HEAD则关闭输出流
if response._status_code in (100, 101, 204, 304)\
or environ['REQUEST_METHOD'] == 'HEAD':
if hasattr(out, 'close'): out.close()
out = []
# 获取environ中的bottle.exc_info
exc_info = environ.get('bottle.exc_info')
# 如果有异常信息则删除environ中的bottle.exc_info
if exc_info is not None:
del environ['bottle.exc_info']
# 调用start_response方法设置响应状态行、响应头和异常信息
start_response(response._wsgi_status_line(), response.headerlist, exc_info)
# 返回输出流
return out
except (KeyboardInterrupt, SystemExit, MemoryError):
# 如果捕获到KeyboardInterrupt, SystemExit, MemoryError异常则抛出
raise
except Exception as E:
# 如果没有开启catchall则抛出异常
if not self.catchall: raise
# 构造错误页面
err = '<h1>Critical error while processing request: %s</h1>' \
% html_escape(environ.get('PATH_INFO', '/'))
# 如果开启了DEBUG模式则输出错误信息和堆栈信息
if DEBUG:
err += '<h2>Error:</h2>\n<pre>\n%s\n</pre>\n' \
'<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n' \
% (html_escape(repr(E)), html_escape(format_exc()))
# 将错误页面写入environ中的wsgi.errors
environ['wsgi.errors'].write(err)
# 刷新wsgi.errors
environ['wsgi.errors'].flush()
# 设置响应头
headers = [('Content-Type', 'text/html; charset=UTF-8')]
# 调用start_response方法设置响应状态行、响应头和异常信息
start_response('500 INTERNAL SERVER ERROR', headers, sys.exc_info())
# 返回错误页面
return [tob(err)]
def __call__(self, environ, start_response):

@ -15,7 +15,6 @@
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
from .compat import PY2, PY3
from .universaldetector import UniversalDetector
from .version import __version__, VERSION
@ -25,15 +24,28 @@ def detect(byte_str):
"""
Detect the encoding of the given byte string.
This function uses the UniversalDetector class to determine the encoding
of a given byte string. It creates a new UniversalDetector instance,
feeds the byte string to it, and then returns the detected encoding.
:param byte_str: The byte sequence to examine.
:type byte_str: ``bytes`` or ``bytearray``
:return: The detected encoding.
"""
# Check if the input is of the correct type
if not isinstance(byte_str, bytearray):
if not isinstance(byte_str, bytes):
raise TypeError('Expected object of type bytes or bytearray, got: '
'{0}'.format(type(byte_str)))
else:
# If the input is of type bytes, convert it to bytearray
byte_str = bytearray(byte_str)
# Create a new UniversalDetector instance
detector = UniversalDetector()
# Feed the byte string to the detector
detector.feed(byte_str)
return detector.close()
# Close the detector and return the detected encoding
return detector.close()

@ -32,10 +32,15 @@ from .mbcssm import BIG5_SM_MODEL
class Big5Prober(MultiByteCharSetProber):
# 初始化Big5Prober类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(Big5Prober, self).__init__()
# 初始化Big5编码状态机
self.coding_sm = CodingStateMachine(BIG5_SM_MODEL)
# 初始化Big5分布分析器
self.distribution_analyzer = Big5DistributionAnalysis()
# 重置Big5Prober类
self.reset()
@property

@ -30,69 +30,126 @@ from .charsetprober import CharSetProber
class CharSetGroupProber(CharSetProber):
# 初始化函数,传入语言过滤器
def __init__(self, lang_filter=None):
# 调用父类的初始化函数
super(CharSetGroupProber, self).__init__(lang_filter=lang_filter)
# 初始化活动探测器数量
self._active_num = 0
# 初始化探测器列表
self.probers = []
# 初始化最佳猜测探测器
self._best_guess_prober = None
# 重置函数
def reset(self):
# 调用父类的重置函数
super(CharSetGroupProber, self).reset()
# 重置活动探测器数量
self._active_num = 0
# 遍历探测器列表
for prober in self.probers:
# 如果探测器存在
if prober:
# 重置探测器
prober.reset()
# 设置探测器为活动状态
prober.active = True
# 活动探测器数量加一
self._active_num += 1
# 重置最佳猜测探测器
self._best_guess_prober = None
# 获取字符集名称的属性函数
@property
def charset_name(self):
# 如果最佳猜测探测器不存在
if not self._best_guess_prober:
# 调用获取置信度函数
self.get_confidence()
# 如果最佳猜测探测器仍然不存在
if not self._best_guess_prober:
# 返回None
return None
# 返回最佳猜测探测器的字符集名称
return self._best_guess_prober.charset_name
# 获取语言的属性函数
@property
def language(self):
# 如果最佳猜测探测器不存在
if not self._best_guess_prober:
# 调用获取置信度函数
self.get_confidence()
# 如果最佳猜测探测器仍然不存在
if not self._best_guess_prober:
# 返回None
return None
# 返回最佳猜测探测器的语言
return self._best_guess_prober.language
# 接收字节字符串的函数
def feed(self, byte_str):
# 遍历探测器列表
for prober in self.probers:
# 如果探测器不存在
if not prober:
# 跳过
continue
# 如果探测器不是活动状态
if not prober.active:
# 跳过
continue
# 调用探测器接收字节字符串的函数
state = prober.feed(byte_str)
# 如果探测器返回的状态不是FOUND_IT
if not state:
# 跳过
continue
# 如果探测器返回的状态是FOUND_IT
if state == ProbingState.FOUND_IT:
# 设置最佳猜测探测器为当前探测器
self._best_guess_prober = prober
# 返回当前探测器的状态
return self.state
# 如果探测器返回的状态是NOT_ME
elif state == ProbingState.NOT_ME:
# 设置探测器为非活动状态
prober.active = False
# 活动探测器数量减一
self._active_num -= 1
# 如果活动探测器数量小于等于0
if self._active_num <= 0:
# 设置当前探测器的状态为NOT_ME
self._state = ProbingState.NOT_ME
# 返回当前探测器的状态
return self.state
# 返回当前探测器的状态
return self.state
# 获取置信度的函数
def get_confidence(self):
# 获取当前探测器的状态
state = self.state
# 如果当前探测器的状态是FOUND_IT
if state == ProbingState.FOUND_IT:
# 返回0.99
return 0.99
# 如果当前探测器的状态是NOT_ME
elif state == ProbingState.NOT_ME:
# 返回0.01
return 0.01
# 初始化最佳置信度
best_conf = 0.0
# 重置最佳猜测探测器
self._best_guess_prober = None
# 遍历探测器列表
for prober in self.probers:
# 如果探测器不存在
if not prober:
# 跳过
continue
# 如果探测器不是活动状态
if not prober.active:
self.logger.debug('%s not active', prober.charset_name)
continue

@ -34,32 +34,42 @@ from .enums import ProbingState
class CharSetProber(object):
# 定义一个阈值,当检测到的字符集概率大于这个值时,认为检测成功
SHORTCUT_THRESHOLD = 0.95
def __init__(self, lang_filter=None):
# 初始化状态为检测中
self._state = None
# 设置语言过滤器
self.lang_filter = lang_filter
# 获取日志记录器
self.logger = logging.getLogger(__name__)
def reset(self):
# 重置状态为检测中
self._state = ProbingState.DETECTING
@property
def charset_name(self):
# 返回字符集名称这里返回None
return None
def feed(self, buf):
# 接收输入的缓冲区
pass
@property
def state(self):
# 返回当前状态
return self._state
def get_confidence(self):
# 返回检测到的字符集的概率这里返回0.0
return 0.0
@staticmethod
def filter_high_byte_only(buf):
# 过滤掉所有非高字节字符
buf = re.sub(b'([\x00-\x7F])+', b' ', buf)
return buf

@ -53,20 +53,29 @@ class CodingStateMachine(object):
encoding from consideration from here on.
"""
def __init__(self, sm):
# 初始化函数sm为传入的模型
self._model = sm
# 当前字节位置
self._curr_byte_pos = 0
# 当前字符长度
self._curr_char_len = 0
# 当前状态
self._curr_state = None
# 获取logger
self.logger = logging.getLogger(__name__)
# 重置
self.reset()
def reset(self):
# 重置函数,将当前状态设置为起始状态
self._curr_state = MachineState.START
def next_state(self, c):
# for each byte we get its class
# if it is first byte, we also get byte length
# 获取当前字节的类别
byte_class = self._model['class_table'][c]
# 如果当前状态为起始状态,则获取当前字符长度
if self._curr_state == MachineState.START:
self._curr_byte_pos = 0
self._curr_char_len = self._model['char_len_table'][byte_class]

@ -22,13 +22,20 @@
import sys
# 判断当前Python版本是否小于3.0
if sys.version_info < (3, 0):
# 如果是Python2版本
PY2 = True
PY3 = False
# 定义base_str为str和unicode类型
base_str = (str, unicode)
# 定义text_type为unicode类型
text_type = unicode
else:
# 如果是Python3版本
PY2 = False
PY3 = True
# 定义base_str为bytes和str类型
base_str = (bytes, str)
# 定义text_type为str类型
text_type = str

@ -40,62 +40,95 @@ class EscCharSetProber(CharSetProber):
"""
def __init__(self, lang_filter=None):
# 初始化EscCharSetProber类
super(EscCharSetProber, self).__init__(lang_filter=lang_filter)
# 初始化编码状态机列表
self.coding_sm = []
# 如果语言过滤器包含简体中文
if self.lang_filter & LanguageFilter.CHINESE_SIMPLIFIED:
# 添加简体中文编码状态机
self.coding_sm.append(CodingStateMachine(HZ_SM_MODEL))
# 添加ISO2022CN编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022CN_SM_MODEL))
# 如果语言过滤器包含日语
if self.lang_filter & LanguageFilter.JAPANESE:
# 添加ISO2022JP编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022JP_SM_MODEL))
# 如果语言过滤器包含韩语
if self.lang_filter & LanguageFilter.KOREAN:
# 添加ISO2022KR编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022KR_SM_MODEL))
# 初始化活动状态机数量
self.active_sm_count = None
# 初始化检测到的字符集
self._detected_charset = None
# 初始化检测到的语言
self._detected_language = None
# 初始化状态
self._state = None
# 重置
self.reset()
def reset(self):
# 重置EscCharSetProber类
super(EscCharSetProber, self).reset()
# 遍历编码状态机列表
for coding_sm in self.coding_sm:
# 如果编码状态机为空,则跳过
if not coding_sm:
continue
# 设置编码状态机为活动状态
coding_sm.active = True
# 重置编码状态机
coding_sm.reset()
# 设置活动状态机数量为编码状态机列表的长度
self.active_sm_count = len(self.coding_sm)
# 设置检测到的字符集为空
self._detected_charset = None
# 设置检测到的语言为空
self._detected_language = None
@property
def charset_name(self):
# 返回检测到的字符集
return self._detected_charset
@property
def language(self):
# 返回检测到的语言
return self._detected_language
def get_confidence(self):
# 如果检测到了字符集则返回0.99否则返回0.00
if self._detected_charset:
return 0.99
else:
return 0.00
def feed(self, byte_str):
# 遍历字节字符串
for c in byte_str:
# 遍历编码状态机列表
for coding_sm in self.coding_sm:
# 如果编码状态机为空或非活动状态,则跳过
if not coding_sm or not coding_sm.active:
continue
# 获取编码状态机的下一个状态
coding_state = coding_sm.next_state(c)
# 如果状态为错误,则设置编码状态机为非活动状态,活动状态机数量减一
if coding_state == MachineState.ERROR:
coding_sm.active = False
self.active_sm_count -= 1
# 如果活动状态机数量小于等于0则设置状态为非匹配
if self.active_sm_count <= 0:
self._state = ProbingState.NOT_ME
return self.state
# 如果状态为匹配,则设置状态为匹配,设置检测到的字符集和语言
elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT
self._detected_charset = coding_sm.get_coding_state_machine()
self._detected_language = coding_sm.language
return self.state
# 返回状态
return self.state

@ -34,59 +34,90 @@ from .mbcssm import EUCJP_SM_MODEL
class EUCJPProber(MultiByteCharSetProber):
# 初始化EUCJPProber类
def __init__(self):
super(EUCJPProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCJP_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCJPDistributionAnalysis()
# 初始化上下文分析器
self.context_analyzer = EUCJPContextAnalysis()
# 重置
self.reset()
# 重置
def reset(self):
super(EUCJPProber, self).reset()
self.context_analyzer.reset()
# 获取字符集名称
@property
def charset_name(self):
return "EUC-JP"
# 获取语言
@property
def language(self):
return "Japanese"
# 输入字节流
def feed(self, byte_str):
for i in range(len(byte_str)):
# PY3K: byte_str is a byte array, so byte_str[i] is an int, not a byte
# 获取下一个状态
coding_state = self.coding_sm.next_state(byte_str[i])
# 如果状态为错误
if coding_state == MachineState.ERROR:
self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i)
# 设置状态为不是该字符集
self._state = ProbingState.NOT_ME
break
# 如果状态为确定
elif coding_state == MachineState.ITS_ME:
# 设置状态为确定
self._state = ProbingState.FOUND_IT
break
# 如果状态为开始
elif coding_state == MachineState.START:
# 获取当前字符长度
char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字符
if i == 0:
# 更新最后一个字符
self._last_char[1] = byte_str[0]
# 输入最后一个字符和当前字符长度到上下文分析器
self.context_analyzer.feed(self._last_char, char_len)
# 输入最后一个字符和当前字符长度到分布分析器
self.distribution_analyzer.feed(self._last_char, char_len)
else:
# 输入前一个字符和当前字符到上下文分析器
self.context_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 输入前一个字符和当前字符到分布分析器
self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 更新最后一个字符
self._last_char[0] = byte_str[-1]
# 如果状态为检测中
if self.state == ProbingState.DETECTING:
# 如果上下文分析器有足够的数据,并且置信度大于阈值
if (self.context_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 设置状态为确定
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
# 获取置信度
def get_confidence(self):
# 获取上下文分析器的置信度
context_conf = self.context_analyzer.get_confidence()
# 获取分布分析器的置信度
distrib_conf = self.distribution_analyzer.get_confidence()
# 返回最大置信度
return max(context_conf, distrib_conf)

@ -32,16 +32,23 @@ from .mbcssm import EUCKR_SM_MODEL
class EUCKRProber(MultiByteCharSetProber):
# 初始化EUCKRProber类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(EUCKRProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCKR_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCKRDistributionAnalysis()
# 重置
self.reset()
# 获取字符集名称
@property
def charset_name(self):
return "EUC-KR"
# 获取语言
@property
def language(self):
return "Korean"

@ -31,16 +31,23 @@ from .chardistribution import EUCTWDistributionAnalysis
from .mbcssm import EUCTW_SM_MODEL
class EUCTWProber(MultiByteCharSetProber):
# 初始化EUCTWProber类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(EUCTWProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCTW_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCTWDistributionAnalysis()
# 重置
self.reset()
# 获取字符集名称
@property
def charset_name(self):
return "EUC-TW"
# 获取语言
@property
def language(self):
return "Taiwan"

@ -31,16 +31,23 @@ from .chardistribution import GB2312DistributionAnalysis
from .mbcssm import GB2312_SM_MODEL
class GB2312Prober(MultiByteCharSetProber):
# 初始化GB2312Prober类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(GB2312Prober, self).__init__()
# 初始化GB2312编码状态机
self.coding_sm = CodingStateMachine(GB2312_SM_MODEL)
# 初始化GB2312分布分析器
self.distribution_analyzer = GB2312DistributionAnalysis()
# 重置
self.reset()
# 获取字符集名称
@property
def charset_name(self):
return "GB2312"
# 获取语言
@property
def language(self):
return "Chinese"

@ -152,17 +152,27 @@ class HebrewProber(CharSetProber):
LOGICAL_HEBREW_NAME = "windows-1255"
def __init__(self):
# 初始化HebrewProber类
super(HebrewProber, self).__init__()
# 初始化_final_char_logical_score为None
self._final_char_logical_score = None
# 初始化_final_char_visual_score为None
self._final_char_visual_score = None
# 初始化_prev为None
self._prev = None
# 初始化_before_prev为None
self._before_prev = None
# 初始化_logical_prober为None
self._logical_prober = None
# 初始化_visual_prober为None
self._visual_prober = None
# 调用reset方法
self.reset()
def reset(self):
# 重置_final_char_logical_score为0
self._final_char_logical_score = 0
# 重置_final_char_visual_score为0
self._final_char_visual_score = 0
# The two last characters seen in the previous buffer,
# mPrev and mBeforePrev are initialized to space in order to simulate

@ -37,17 +37,28 @@ class MultiByteCharSetProber(CharSetProber):
"""
def __init__(self, lang_filter=None):
# 初始化函数传入参数lang_filter
super(MultiByteCharSetProber, self).__init__(lang_filter=lang_filter)
# 调用父类的初始化函数
self.distribution_analyzer = None
# 初始化分布分析器
self.coding_sm = None
# 初始化编码状态机
self._last_char = [0, 0]
# 初始化最后一个字符
def reset(self):
# 重置函数
super(MultiByteCharSetProber, self).reset()
# 调用父类的重置函数
if self.coding_sm:
# 如果编码状态机存在
self.coding_sm.reset()
# 重置编码状态机
if self.distribution_analyzer:
# 如果分布分析器存在
self.distribution_analyzer.reset()
# 重置分布分析器
self._last_char = [0, 0]
@property
@ -59,33 +70,45 @@ class MultiByteCharSetProber(CharSetProber):
raise NotImplementedError
def feed(self, byte_str):
# 遍历byte_str中的每个字节
for i in range(len(byte_str)):
# 获取当前字节的编码状态
coding_state = self.coding_sm.next_state(byte_str[i])
# 如果编码状态为错误则记录错误信息并将状态设置为NOT_ME
if coding_state == MachineState.ERROR:
self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i)
self._state = ProbingState.NOT_ME
break
# 如果编码状态为确定则将状态设置为FOUND_IT
elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT
break
# 如果编码状态为开始,则获取当前字符长度
elif coding_state == MachineState.START:
char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字节则将当前字节和上一个字节作为参数传入feed方法
if i == 0:
self._last_char[1] = byte_str[0]
self.distribution_analyzer.feed(self._last_char, char_len)
# 否则将当前字节和上一个字节作为参数传入feed方法
else:
self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 将最后一个字节赋值给_last_char[0]
self._last_char[0] = byte_str[-1]
# 如果状态为DETECTING则判断是否已经获取足够的数据并且置信度是否大于SHORTCUT_THRESHOLD
if self.state == ProbingState.DETECTING:
if (self.distribution_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 如果满足条件则将状态设置为FOUND_IT
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
def get_confidence(self):
# 获取置信度
return self.distribution_analyzer.get_confidence()

@ -39,16 +39,20 @@ from .euctwprober import EUCTWProber
class MBCSGroupProber(CharSetGroupProber):
# 初始化MBCSGroupProber类继承自CharSetGroupProber类
def __init__(self, lang_filter=None):
# 调用父类CharSetGroupProber的初始化方法
super(MBCSGroupProber, self).__init__(lang_filter=lang_filter)
# 定义一个包含多种字符集探测器的列表
self.probers = [
UTF8Prober(),
SJISProber(),
EUCJPProber(),
GB2312Prober(),
EUCKRProber(),
CP949Prober(),
Big5Prober(),
EUCTWProber()
UTF8Prober(), # UTF-8字符集探测器
SJISProber(), # Shift_JIS字符集探测器
EUCJPProber(), # EUC-JP字符集探测器
GB2312Prober(), # GB2312字符集探测器
EUCKRProber(), # EUCKR字符集探测器
CP949Prober(), # CP949字符集探测器
Big5Prober(), # Big5字符集探测器
EUCTWProber() # EUCTW字符集探测器
]
# 重置探测器
self.reset()

@ -31,13 +31,19 @@ from .enums import CharacterCategory, ProbingState, SequenceLikelihood
class SingleByteCharSetProber(CharSetProber):
# 定义样本大小
SAMPLE_SIZE = 64
# 定义相对阈值
SB_ENOUGH_REL_THRESHOLD = 1024 # 0.25 * SAMPLE_SIZE^2
# 定义正向阈值
POSITIVE_SHORTCUT_THRESHOLD = 0.95
# 定义负向阈值
NEGATIVE_SHORTCUT_THRESHOLD = 0.05
def __init__(self, model, reversed=False, name_prober=None):
# 调用父类构造函数
super(SingleByteCharSetProber, self).__init__()
# 设置模型
self._model = model
# TRUE if we need to reverse every pair in the model lookup
self._reversed = reversed
@ -51,6 +57,7 @@ class SingleByteCharSetProber(CharSetProber):
self.reset()
def reset(self):
# 重置函数
super(SingleByteCharSetProber, self).reset()
# char order of last character
self._last_order = 255
@ -69,16 +76,20 @@ class SingleByteCharSetProber(CharSetProber):
@property
def language(self):
# 如果_name_prober存在则返回_name_prober的语言否则返回_model中的语言
if self._name_prober:
return self._name_prober.language
else:
return self._model.get('language')
def feed(self, byte_str):
# 如果_model中的keep_english_letter为False则过滤掉国际字符
if not self._model['keep_english_letter']:
byte_str = self.filter_international_words(byte_str)
# 如果byte_str为空则返回状态
if not byte_str:
return self.state
# 获取字符到顺序的映射
char_to_order_map = self._model['char_to_order_map']
for i, c in enumerate(byte_str):
# XXX: Order is in range 1-64, so one would think we want 0-63 here,
@ -122,11 +133,17 @@ class SingleByteCharSetProber(CharSetProber):
return self.state
def get_confidence(self):
# 初始化r为0.01
r = 0.01
# 如果总序列数大于0
if self._total_seqs > 0:
# 计算r的值
r = ((1.0 * self._seq_counters[SequenceLikelihood.POSITIVE]) /
self._total_seqs / self._model['typical_positive_ratio'])
# 乘以字符频率和总字符数
r = r * self._freq_char / self._total_char
# 如果r大于等于1.0则将r设置为0.99
if r >= 1.0:
r = 0.99
# 返回r的值
return r

@ -34,59 +34,94 @@ from .enums import ProbingState, MachineState
class SJISProber(MultiByteCharSetProber):
# 初始化函数
def __init__(self):
# 调用父类的初始化函数
super(SJISProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(SJIS_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = SJISDistributionAnalysis()
# 初始化上下文分析器
self.context_analyzer = SJISContextAnalysis()
# 重置分析器
self.reset()
# 重置函数
def reset(self):
# 调用父类的重置函数
super(SJISProber, self).reset()
# 重置上下文分析器
self.context_analyzer.reset()
@property
def charset_name(self):
# 返回字符集名称
return self.context_analyzer.charset_name
@property
def language(self):
# 返回语言
return "Japanese"
def feed(self, byte_str):
# 遍历字节字符串
for i in range(len(byte_str)):
# 获取下一个状态
coding_state = self.coding_sm.next_state(byte_str[i])
# 如果状态为错误
if coding_state == MachineState.ERROR:
# 记录错误日志
self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i)
# 设置状态为不是该字符集
self._state = ProbingState.NOT_ME
break
# 如果状态为确定
elif coding_state == MachineState.ITS_ME:
# 设置状态为确定
self._state = ProbingState.FOUND_IT
break
# 如果状态为开始
elif coding_state == MachineState.START:
# 获取当前字符长度
char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字符
if i == 0:
# 更新最后一个字符
self._last_char[1] = byte_str[0]
# 向上下文分析器输入字符
self.context_analyzer.feed(self._last_char[2 - char_len:],
char_len)
# 向分布分析器输入字符
self.distribution_analyzer.feed(self._last_char, char_len)
else:
# 向上下文分析器输入字符
self.context_analyzer.feed(byte_str[i + 1 - char_len:i + 3
- char_len], char_len)
# 向分布分析器输入字符
self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 更新最后一个字符
self._last_char[0] = byte_str[-1]
# 如果状态为检测中
if self.state == ProbingState.DETECTING:
# 如果上下文分析器有足够的数据,并且置信度大于阈值
if (self.context_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 设置状态为确定
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
# 获取置信度
def get_confidence(self):
# 获取上下文分析器的置信度
context_conf = self.context_analyzer.get_confidence()
# 获取分布分析器的置信度
distrib_conf = self.distribution_analyzer.get_confidence()
# 返回上下文置信度和分布置信度中的最大值
return max(context_conf, distrib_conf)

@ -79,16 +79,27 @@ class UniversalDetector(object):
'iso-8859-13': 'Windows-1257'}
def __init__(self, lang_filter=LanguageFilter.ALL):
# 初始化语言过滤器
self._esc_charset_prober = None
# 初始化字符集探测器
self._charset_probers = []
# 初始化结果
self.result = None
# 初始化完成标志
self.done = None
# 初始化是否获取数据标志
self._got_data = None
# 初始化输入状态
self._input_state = None
# 初始化最后一个字符
self._last_char = None
# 设置语言过滤器
self.lang_filter = lang_filter
# 获取日志记录器
self.logger = logging.getLogger(__name__)
# 初始化是否包含Windows字节标志
self._has_win_bytes = None
# 重置
self.reset()
def reset(self):
@ -97,14 +108,22 @@ class UniversalDetector(object):
initial states. This is called by ``__init__``, so you only need to
call this directly in between analyses of different documents.
"""
# 重置结果
self.result = {'encoding': None, 'confidence': 0.0, 'language': None}
# 重置完成标志
self.done = False
# 重置是否接收到数据标志
self._got_data = False
# 重置是否有win字节标志
self._has_win_bytes = False
# 重置输入状态
self._input_state = InputState.PURE_ASCII
# 重置最后一个字符
self._last_char = b''
# 如果有esc字符集探测器重置它
if self._esc_charset_prober:
self._esc_charset_prober.reset()
# 重置所有字符集探测器
for prober in self._charset_probers:
prober.reset()

@ -33,50 +33,75 @@ from .mbcssm import UTF8_SM_MODEL
class UTF8Prober(CharSetProber):
# 定义一个常量表示一个字符的初始概率为0.5
ONE_CHAR_PROB = 0.5
# 初始化函数
def __init__(self):
# 调用父类的初始化函数
super(UTF8Prober, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(UTF8_SM_MODEL)
# 初始化多字节字符数量
self._num_mb_chars = None
# 调用重置函数
self.reset()
# 重置函数
def reset(self):
# 调用父类的重置函数
super(UTF8Prober, self).reset()
# 重置编码状态机
self.coding_sm.reset()
# 重置多字节字符数量
self._num_mb_chars = 0
# 获取字符集名称的属性
@property
def charset_name(self):
# 返回字符集名称
return "utf-8"
# 获取语言名称的属性
@property
def language(self):
# 返回语言名称
return ""
def feed(self, byte_str):
# 遍历byte_str中的每个字符
for c in byte_str:
# 获取下一个状态
coding_state = self.coding_sm.next_state(c)
# 如果状态为ERROR则将状态设置为NOT_ME并跳出循环
if coding_state == MachineState.ERROR:
self._state = ProbingState.NOT_ME
break
# 如果状态为ITS_ME则将状态设置为FOUND_IT并跳出循环
elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT
break
# 如果状态为START且当前字符长度大于等于2则将_num_mb_chars加1
elif coding_state == MachineState.START:
if self.coding_sm.get_current_charlen() >= 2:
self._num_mb_chars += 1
# 如果状态为DETECTING且置信度大于SHORTCUT_THRESHOLD则将状态设置为FOUND_IT
if self.state == ProbingState.DETECTING:
if self.get_confidence() > self.SHORTCUT_THRESHOLD:
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
def get_confidence(self):
# 初始化 unlike 为 0.99
unlike = 0.99
# 如果_num_mb_chars 小于 6则 unlike 乘以 ONE_CHAR_PROB 的 _num_mb_chars 次方
if self._num_mb_chars < 6:
unlike *= self.ONE_CHAR_PROB ** self._num_mb_chars
# 返回 1.0 减去 unlike
return 1.0 - unlike
# 否则返回 unlike
else:
return unlike

@ -67,30 +67,47 @@ __all__ = ['AmbiguityError', 'CheckboxControl', 'Control',
'TextareaControl', 'XHTMLCompatibleFormParser']
try:
# 尝试导入logging和inspect模块
import logging
import inspect
except ImportError:
# 如果导入失败定义一个空的debug函数
def debug(msg, *args, **kwds):
pass
else:
# 如果导入成功定义一个_logger对象
_logger = logging.getLogger("ClientForm")
# 定义一个优化hack变量
OPTIMIZATION_HACK = True
# 定义一个debug函数
def debug(msg, *args, **kwds):
# 如果优化hack为True则返回
if OPTIMIZATION_HACK:
return
# 获取调用者的函数名
caller_name = inspect.stack()[1][3]
# 定义一个扩展的消息
extended_msg = '%%s %s' % msg
# 定义一个扩展的参数
extended_args = (caller_name,)+args
# 调用_logger对象的debug方法
debug = _logger.debug(extended_msg, *extended_args, **kwds)
# 定义一个_show_debug_messages函数
def _show_debug_messages():
# 定义一个全局变量OPTIMIZATION_HACK
global OPTIMIZATION_HACK
# 将优化hack设置为False
OPTIMIZATION_HACK = False
# 将_logger对象的日志级别设置为DEBUG
_logger.setLevel(logging.DEBUG)
# 定义一个StreamHandler对象
handler = logging.StreamHandler(sys.stdout)
# 将StreamHandler对象的日志级别设置为DEBUG
handler.setLevel(logging.DEBUG)
# 将StreamHandler对象添加到_logger对象中
_logger.addHandler(handler)
try:
@ -114,13 +131,17 @@ except ImportError:
import sys, re, random
if sys.version_info >= (3, 0):
# 如果Python版本大于等于3.0则将xrange替换为range
xrange = range
# monkeypatch to fix http://www.python.org/sf/803422 :-(
# 修补monkeypatch以修复http://www.python.org/sf/803422 :-(
sgmllib.charref = re.compile("&#(x?[0-9a-fA-F]+)[^0-9a-fA-F]")
# HTMLParser.HTMLParser is recent, so live without it if it's not available
# (also, sgmllib.SGMLParser is much more tolerant of bad HTML)
# HTMLParser.HTMLParser是最近的如果不可用则没有它
# 另外sgmllib.SGMLParser对不良HTML的容忍度更高
try:
import HTMLParser
except ImportError:
@ -131,9 +152,11 @@ else:
try:
import warnings
except ImportError:
# 如果没有导入warnings模块则定义一个空函数
def deprecation(message, stack_offset=0):
pass
else:
# 如果成功导入warnings模块则定义一个警告函数
def deprecation(message, stack_offset=0):
warnings.warn(message, DeprecationWarning, stacklevel=3+stack_offset)
@ -224,29 +247,39 @@ string.
return '&'.join(l)
def unescape(data, entities, encoding=DEFAULT_ENCODING):
# 如果data为None或者data中不包含"&"则直接返回data
if data is None or "&" not in data:
return data
# 如果data是字符串类型则将encoding设置为None
if isinstance(data, six.string_types):
encoding = None
# 定义一个函数,用于替换实体
def replace_entities(match, entities=entities, encoding=encoding):
# 获取匹配到的实体
ent = match.group()
# 如果实体以"#"开头则调用unescape_charref函数进行替换
if ent[1] == "#":
return unescape_charref(ent[2:-1], encoding)
# 从entities中获取实体的替换值
repl = entities.get(ent)
# 如果替换值存在并且encoding不为None则尝试将替换值解码为字符串
if repl is not None:
if hasattr(repl, "decode") and encoding is not None:
try:
repl = repl.decode(encoding)
except UnicodeError:
repl = ent
# 如果替换值不存在,则将替换值设置为实体本身
else:
repl = ent
# 返回替换值
return repl
# 使用正则表达式替换data中的实体
return re.sub(r"&#?[A-Za-z0-9]+?;", replace_entities, data)
def unescape_charref(data, encoding):
@ -646,31 +679,47 @@ class _AbstractFormParser:
self._textarea = None
def start_label(self, attrs):
# 打印attrs
debug("%s", attrs)
# 如果当前标签存在,则结束标签
if self._current_label:
self.end_label()
# 创建一个空字典
d = {}
# 遍历attrs
for key, val in attrs:
# 如果val需要转义则进行转义
d[key] = self.unescape_attr_if_required(val)
# 如果存在for属性则taken为True
taken = bool(d.get("for")) # empty id is invalid
# 添加__text属性值为空字符串
d["__text"] = ""
# 添加__taken属性值为taken
d["__taken"] = taken
# 如果taken为True则将d添加到labels列表中
if taken:
self.labels.append(d)
# 将当前标签设置为d
self._current_label = d
def end_label(self):
# 打印空字符串
debug("")
# 获取当前标签
label = self._current_label
# 如果当前标签不存在,则返回
if label is None:
# something is ugly in the HTML, but we're ignoring it
return
# 将当前标签设置为None
self._current_label = None
# 如果当前标签存在则删除__taken属性
# if it is staying around, it is True in all cases
del label["__taken"]
def _add_label(self, d):
#debug("%s", d)
# 如果当前标签存在且__taken属性为False则将__taken属性设置为True并将当前标签添加到d的__label属性中
if self._current_label is not None:
if not self._current_label["__taken"]:
self._current_label["__taken"] = True
@ -743,12 +792,16 @@ class _AbstractFormParser:
controls.append((type, name, d))
def do_isindex(self, attrs):
# 打印传入的属性
debug("%s", attrs)
d = {}
# 遍历属性,将属性名和属性值存入字典
for key, val in attrs:
d[key] = self.unescape_attr_if_required(val)
# 获取当前表单的控件
controls = self._current_form[2]
# 添加标签
self._add_label(d)
# isindex doesn't have type or name HTML attributes
controls.append(("isindex", None, d))

@ -64,14 +64,16 @@ class Magic:
return magic_file(self.cookie, filename)
def __del__(self):
# during shutdown magic_close may have been cleared already
# 析构函数,确保在对象被垃圾回收时关闭 libmagic cookie
if self.cookie and magic_close:
magic_close(self.cookie)
self.cookie = None
# 全局变量用于保存默认和MIME magic对象
_magic_mime = None
_magic = None
# 获取默认和MIME magic对象的函数
def _get_magic_mime():
global _magic_mime
if not _magic_mime:
@ -90,6 +92,7 @@ def _get_magic_type(mime):
else:
return _get_magic()
# 公共函数,用于识别文件和缓冲区
def from_file(filename, mime=False):
m = _get_magic_type(mime)
return m.from_file(filename)
@ -98,6 +101,7 @@ def from_buffer(buffer, mime=False):
m = _get_magic_type(mime)
return m.from_buffer(buffer)
# 使用 ctypes 导入 libmagic 库
try:
libmagic = None
@ -106,7 +110,7 @@ try:
from ctypes import c_char_p, c_int, c_size_t, c_void_p
# Let's try to find magic or magic1
# 尝试找到 libmagic 库
dll = ctypes.util.find_library('magic') or ctypes.util.find_library('magic1')
# This is necessary because find_library returns None if it doesn't find the library
@ -116,6 +120,7 @@ try:
except WindowsError:
pass
# 如果没有找到,尝试平台特定的路径
if not libmagic or not libmagic._name:
platform_to_lib = {'darwin': ['/opt/local/lib/libmagic.dylib',
'/usr/local/lib/libmagic.dylib',
@ -126,11 +131,13 @@ try:
libmagic = ctypes.CDLL(dll)
except OSError:
pass
# 如果仍然没有找到,抛出 ImportError
if not libmagic or not libmagic._name:
# It is better to raise an ImportError since we are importing magic module
raise ImportError('failed to find libmagic. Check your installation')
# 定义 magic_t 类型和错误检查函数
magic_t = ctypes.c_void_p
def errorcheck(result, func, args):
@ -145,6 +152,7 @@ try:
return None
return filename.encode(sys.getfilesystemencoding())
# 使用 ctypes 定义 libmagic 函数
magic_open = libmagic.magic_open
magic_open.restype = magic_t
magic_open.argtypes = [c_int]
@ -198,28 +206,31 @@ try:
magic_compile.restype = c_int
magic_compile.argtypes = [magic_t, c_char_p]
# 如果 libmagic 无法导入,定义回退函数
except (ImportError, OSError):
from_file = from_buffer = lambda *args, **kwargs: MAGIC_UNKNOWN_FILETYPE
MAGIC_NONE = 0x000000 # No flags
MAGIC_DEBUG = 0x000001 # Turn on debugging
MAGIC_SYMLINK = 0x000002 # Follow symlinks
MAGIC_COMPRESS = 0x000004 # Check inside compressed files
MAGIC_DEVICES = 0x000008 # Look at the contents of devices
MAGIC_MIME = 0x000010 # Return a mime string
MAGIC_MIME_ENCODING = 0x000400 # Return the MIME encoding
MAGIC_CONTINUE = 0x000020 # Return all matches
MAGIC_CHECK = 0x000040 # Print warnings to stderr
MAGIC_PRESERVE_ATIME = 0x000080 # Restore access time on exit
MAGIC_RAW = 0x000100 # Don't translate unprintable chars
MAGIC_ERROR = 0x000200 # Handle ENOENT etc as real errors
MAGIC_NO_CHECK_COMPRESS = 0x001000 # Don't check for compressed files
MAGIC_NO_CHECK_TAR = 0x002000 # Don't check for tar files
MAGIC_NO_CHECK_SOFT = 0x004000 # Don't check magic entries
MAGIC_NO_CHECK_APPTYPE = 0x008000 # Don't check application type
MAGIC_NO_CHECK_ELF = 0x010000 # Don't check for elf details
MAGIC_NO_CHECK_ASCII = 0x020000 # Don't check for ascii files
MAGIC_NO_CHECK_TROFF = 0x040000 # Don't check ascii/troff
MAGIC_NO_CHECK_FORTRAN = 0x080000 # Don't check ascii/fortran
MAGIC_NO_CHECK_TOKENS = 0x100000 # Don't check ascii/tokens
# 定义 libmagic 标志常量
MAGIC_NONE = 0x000000 # 无标志
MAGIC_DEBUG = 0x000001 # 打开调试
MAGIC_SYMLINK = 0x000002 # 跟随符号链接
MAGIC_COMPRESS = 0x000004 # 检查压缩文件内部
MAGIC_DEVICES = 0x000008 # 查看设备内容
MAGIC_MIME = 0x000010 # 返回 MIME 字符串
MAGIC_MIME_ENCODING = 0x000400 # 返回 MIME 编码
MAGIC_CONTINUE = 0x000020 # 返回所有匹配项
MAGIC_CHECK = 0x000040 # 打印警告到标准错误
MAGIC_PRESERVE_ATIME = 0x000080 # 退出时恢复访问时间
MAGIC_RAW = 0x000100 # 不转换不可打印字符
MAGIC_ERROR = 0x000200 # 将 ENOENT 等视为真实错误
MAGIC_NO_CHECK_COMPRESS = 0x001000 # 不检查压缩文件
MAGIC_NO_CHECK_TAR = 0x002000 # 不检查 tar 文件
MAGIC_NO_CHECK_SOFT = 0x004000 # 不检查 magic 条目
MAGIC_NO_CHECK_APPTYPE = 0x008000 # 不检查应用程序类型
MAGIC_NO_CHECK_ELF = 0x010000 # 不检查 elf 详细信息
MAGIC_NO_CHECK_ASCII = 0x020000 # 不检查 ascii 文件
MAGIC_NO_CHECK_TROFF = 0x040000 # 不检查 ascii/troff
MAGIC_NO_CHECK_FORTRAN = 0x080000 # 不检查 ascii/fortran
MAGIC_NO_CHECK_TOKENS = 0x100000 # 不检查 ascii/tokens
MAGIC_UNKNOWN_FILETYPE = b"unknown"

@ -8,14 +8,15 @@ import socket
import ctypes
import os
# 定义一个结构体用于存储socket地址信息
class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)]
_fields_ = [("sa_family", ctypes.c_short), # 地址族例如AF_INET或AF_INET6
("__pad1", ctypes.c_ushort), # 填充字段
("ipv4_addr", ctypes.c_byte * 4), # IPv4地址4个字节
("ipv6_addr", ctypes.c_byte * 16),# IPv6地址16个字节
("__pad2", ctypes.c_ulong)] # 填充字段
# 根据操作系统的不同,导入不同的库
if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
@ -27,12 +28,13 @@ else:
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows
# inet_pton函数将IP字符串转换为二进制格式
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
addr = sockaddr() # 创建sockaddr实例
addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 获取地址结构体大小
# 使用WSAStringToAddressA函数将IP字符串转换为地址结构体
if WSAStringToAddressA(
ip_string,
address_family,
@ -42,6 +44,7 @@ def inet_pton(address_family, ip_string):
) != 0:
raise socket.error(ctypes.FormatError())
# 根据地址族返回对应的二进制IP地址
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
@ -49,14 +52,15 @@ def inet_pton(address_family, ip_string):
raise socket.error('unknown address family')
# inet_ntop函数将二进制格式的IP地址转换为字符串
def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
addr = sockaddr() # 创建sockaddr实例
addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 获取地址结构体大小
ip_string = ctypes.create_string_buffer(128) # 创建字符串缓冲区
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) # 获取字符串缓冲区大小
# 根据地址族将二进制IP地址复制到地址结构体中
if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
@ -68,6 +72,7 @@ def inet_ntop(address_family, packed_ip):
else:
raise socket.error('unknown address family')
# 使用WSAAddressToStringA函数将地址结构体转换为IP字符串
if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
@ -79,7 +84,7 @@ def inet_ntop(address_family, packed_ip):
return ip_string[:ip_string_size.value - 1]
# Adding our two functions to the socket library
# 如果当前操作系统是Windows将自定义的inet_pton和inet_ntop函数添加到socket库中
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop
socket.inet_ntop = inet_ntop
Loading…
Cancel
Save