Compare commits

...

10 Commits

@ -7,7 +7,7 @@ import logging
import re import re
import sys import sys
from lib.core.settings import IS_WIN from lib.core.settings import IS_WIN # 导入一个设置用于判断是否在Windows系统上运行
if IS_WIN: if IS_WIN:
import ctypes import ctypes
@ -16,14 +16,15 @@ if IS_WIN:
# Reference: https://gist.github.com/vsajip/758430 # Reference: https://gist.github.com/vsajip/758430
# https://github.com/ipython/ipython/issues/4252 # https://github.com/ipython/ipython/issues/4252
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms686047%28v=vs.85%29.aspx # https://msdn.microsoft.com/en-us/library/windows/desktop/ms686047%28v=vs.85%29.aspx
# 设置Windows API函数SetConsoleTextAttribute的参数和返回值类型
ctypes.windll.kernel32.SetConsoleTextAttribute.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.WORD] ctypes.windll.kernel32.SetConsoleTextAttribute.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.WORD]
ctypes.windll.kernel32.SetConsoleTextAttribute.restype = ctypes.wintypes.BOOL ctypes.windll.kernel32.SetConsoleTextAttribute.restype = ctypes.wintypes.BOOL
def stdoutEncode(data): # Cross-referenced function def stdoutEncode(data): # 用于编码标准输出数据的函数
return data return data
class ColorizingStreamHandler(logging.StreamHandler): class ColorizingStreamHandler(logging.StreamHandler):
# color names to indices # 定义颜色名称到索引的映射
color_map = { color_map = {
'black': 0, 'black': 0,
'red': 1, 'red': 1,
@ -35,7 +36,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
'white': 7, 'white': 7,
} }
# levels to (background, foreground, bold/intense) # 定义日志级别到颜色和样式的映射
level_map = { level_map = {
logging.DEBUG: (None, 'blue', False), logging.DEBUG: (None, 'blue', False),
logging.INFO: (None, 'green', False), logging.INFO: (None, 'green', False),
@ -43,25 +44,30 @@ class ColorizingStreamHandler(logging.StreamHandler):
logging.ERROR: (None, 'red', False), logging.ERROR: (None, 'red', False),
logging.CRITICAL: ('red', 'white', False) logging.CRITICAL: ('red', 'white', False)
} }
csi = '\x1b[' csi = '\x1b[' # ANSI转义序列的前缀
reset = '\x1b[0m' reset = '\x1b[0m' # ANSI重置颜色的转义序列
bold = "\x1b[1m" bold = "\x1b[1m" # ANSI加粗的转义序列
disable_coloring = False disable_coloring = False # 是否禁用颜色
@property @property
def is_tty(self): def is_tty(self):
# 检查流是否是终端
isatty = getattr(self.stream, 'isatty', None) isatty = getattr(self.stream, 'isatty', None)
return isatty and isatty() and not self.disable_coloring return isatty and isatty() and not self.disable_coloring
def emit(self, record): def emit(self, record):
# 发送日志记录
try: try:
message = stdoutEncode(self.format(record)) message = stdoutEncode(self.format(record))
stream = self.stream stream = self.stream
#如果当前流不是TTY直接写入消息
if not self.is_tty: if not self.is_tty:
if message and message[0] == "\r": if message and message[0] == "\r":
message = message[1:] message = message[1:]
stream.write(message) stream.write(message)
#如果是TTY调用output_colorized方法来输出带颜色的消息
else: else:
self.output_colorized(message) self.output_colorized(message)
stream.write(getattr(self, 'terminator', '\n')) stream.write(getattr(self, 'terminator', '\n'))
@ -70,15 +76,19 @@ class ColorizingStreamHandler(logging.StreamHandler):
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
raise raise
except IOError: except IOError:
#IO错误时什么也不做pass
pass pass
except: except:
#其他异常时调用handleError方法
self.handleError(record) self.handleError(record)
if not IS_WIN: if not IS_WIN:
def output_colorized(self, message): def output_colorized(self, message):
# 如果不是Windows系统直接写入消息
self.stream.write(message) self.stream.write(message)
else: else:
ansi_esc = re.compile(r'\x1b\[((?:\d+)(?:;(?:\d+))*)m') ansi_esc = re.compile(r'\x1b\[((?:\d+)(?:;(?:\d+))*)m')
# 正则表达式用于匹配ANSI转义序列
nt_color_map = { nt_color_map = {
0: 0x00, # black 0: 0x00, # black
@ -92,26 +102,32 @@ class ColorizingStreamHandler(logging.StreamHandler):
} }
def output_colorized(self, message): def output_colorized(self, message):
# 如果是Windows系统解析ANSI转义序列并设置控制台颜色
parts = self.ansi_esc.split(message) parts = self.ansi_esc.split(message)
h = None h = None
fd = getattr(self.stream, 'fileno', None) fd = getattr(self.stream, 'fileno', None)
#文件描述符有效并且是标准输出或标准错误获取对应的Windows句柄
if fd is not None: if fd is not None:
fd = fd() fd = fd()
if fd in (1, 2): # stdout or stderr if fd in (1, 2): # stdout or stderr
h = ctypes.windll.kernel32.GetStdHandle(-10 - fd) h = ctypes.windll.kernel32.GetStdHandle(-10 - fd)
#循环处理分割后的消息部分
while parts: while parts:
text = parts.pop(0) text = parts.pop(0)
#如果部分是文本,写入并刷新流
if text: if text:
self.stream.write(text) self.stream.write(text)
self.stream.flush() self.stream.flush()
#如果还有部分,取出下一个部分作为参数
if parts: if parts:
params = parts.pop(0) params = parts.pop(0)
#如果句柄有效,将参数分割并转换为整数,初始化颜色代码
if h is not None: if h is not None:
params = [int(p) for p in params.split(';')] params = [int(p) for p in params.split(';')]
color = 0 color = 0
@ -131,9 +147,12 @@ class ColorizingStreamHandler(logging.StreamHandler):
ctypes.windll.kernel32.SetConsoleTextAttribute(h, color) ctypes.windll.kernel32.SetConsoleTextAttribute(h, color)
def _reset(self, message): def _reset(self, message):
#重置消息的颜色
if not message.endswith(self.reset): if not message.endswith(self.reset):
# 如果消息不以重置序列结尾,则添加重置序列
reset = self.reset reset = self.reset
elif self.bold in message: # bold elif self.bold in message:
# 如果消息包含加粗,则在重置后加粗
reset = self.reset + self.bold reset = self.reset + self.bold
else: else:
reset = self.reset reset = self.reset
@ -141,19 +160,23 @@ class ColorizingStreamHandler(logging.StreamHandler):
return reset return reset
def colorize(self, message, levelno): def colorize(self, message, levelno):
# 根据日志级别给消息上色
if levelno in self.level_map and self.is_tty: if levelno in self.level_map and self.is_tty:
bg, fg, bold = self.level_map[levelno] bg, fg, bold = self.level_map[levelno]
params = [] params = []
#如果背景色有效,添加背景色参数
if bg in self.color_map: if bg in self.color_map:
params.append(str(self.color_map[bg] + 40)) params.append(str(self.color_map[bg] + 40))
#如果前景色有效,添加前景色参数
if fg in self.color_map: if fg in self.color_map:
params.append(str(self.color_map[fg] + 30)) params.append(str(self.color_map[fg] + 30))
#如果需要加粗,添加加粗参数
if bold: if bold:
params.append('1') params.append('1')
#如果参数和消息都有效,检查消息是否有前缀(空格),并提取出来
if params and message: if params and message:
if message.lstrip() != message: if message.lstrip() != message:
prefix = re.search(r"\s+", message).group(0) prefix = re.search(r"\s+", message).group(0)
@ -167,5 +190,6 @@ class ColorizingStreamHandler(logging.StreamHandler):
return message return message
def format(self, record): def format(self, record):
# 格式化日志记录
message = logging.StreamHandler.format(self, record) message = logging.StreamHandler.format(self, record)
return self.colorize(message, record.levelno) return self.colorize(message, record.levelno)

File diff suppressed because it is too large Load Diff

@ -29,9 +29,12 @@ __license__ = 'MIT'
def _cli_parse(args): # pragma: no coverage def _cli_parse(args): # pragma: no coverage
# 导入ArgumentParser模块
from argparse import ArgumentParser from argparse import ArgumentParser
# 创建ArgumentParser对象设置程序名称和用法
parser = ArgumentParser(prog=args[0], usage="%(prog)s [options] package.module:app") parser = ArgumentParser(prog=args[0], usage="%(prog)s [options] package.module:app")
# 添加参数
opt = parser.add_argument opt = parser.add_argument
opt("--version", action="store_true", help="show version number.") opt("--version", action="store_true", help="show version number.")
opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.") opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.")
@ -45,6 +48,7 @@ def _cli_parse(args): # pragma: no coverage
opt("--reload", action="store_true", help="auto-reload on file changes.") opt("--reload", action="store_true", help="auto-reload on file changes.")
opt('app', help='WSGI app entry point.', nargs='?') opt('app', help='WSGI app entry point.', nargs='?')
# 解析命令行参数
cli_args = parser.parse_args(args[1:]) cli_args = parser.parse_args(args[1:])
return cli_args, parser return cli_args, parser
@ -179,7 +183,9 @@ def depr(major, minor, cause, fix):
def makelist(data): # This is just too handy def makelist(data): # This is just too handy
# 判断data是否为元组、列表、集合或字典类型
if isinstance(data, (tuple, list, set, dict)): if isinstance(data, (tuple, list, set, dict)):
# 如果是则返回data的列表形式
return list(data) return list(data)
elif data: elif data:
return [data] return [data]
@ -198,18 +204,24 @@ class DictProperty(object):
self.getter, self.key = func, self.key or func.__name__ self.getter, self.key = func, self.key or func.__name__
return self return self
# 如果obj为None则返回self
def __get__(self, obj, cls): def __get__(self, obj, cls):
# 获取属性名和存储对象
if obj is None: return self if obj is None: return self
# 如果属性名不在存储对象中则调用getter方法获取值并存储
key, storage = self.key, getattr(obj, self.attr) key, storage = self.key, getattr(obj, self.attr)
if key not in storage: storage[key] = self.getter(obj) if key not in storage: storage[key] = self.getter(obj)
return storage[key] return storage[key]
# 如果属性是只读的则抛出AttributeError异常
def __set__(self, obj, value): def __set__(self, obj, value):
if self.read_only: raise AttributeError("Read-Only property.") if self.read_only: raise AttributeError("Read-Only property.")
getattr(obj, self.attr)[self.key] = value getattr(obj, self.attr)[self.key] = value
def __delete__(self, obj): def __delete__(self, obj):
# 如果属性是只读的则抛出AttributeError异常
if self.read_only: raise AttributeError("Read-Only property.") if self.read_only: raise AttributeError("Read-Only property.")
# 从存储对象中删除对应的值
del getattr(obj, self.attr)[self.key] del getattr(obj, self.attr)[self.key]
@ -737,26 +749,38 @@ class Bottle(object):
self.route('/' + '/'.join(segments), **options) self.route('/' + '/'.join(segments), **options)
def _mount_app(self, prefix, app, **options): def _mount_app(self, prefix, app, **options):
# 检查app是否已经被挂载或者app的config中是否已经存在'_mount.app'键
if app in self._mounts or '_mount.app' in app.config: if app in self._mounts or '_mount.app' in app.config:
# 如果app已经被挂载或者app的config中已经存在'_mount.app'键则发出警告并回退到WSGI挂载
depr(0, 13, "Application mounted multiple times. Falling back to WSGI mount.", depr(0, 13, "Application mounted multiple times. Falling back to WSGI mount.",
"Clone application before mounting to a different location.") "Clone application before mounting to a different location.")
return self._mount_wsgi(prefix, app, **options) return self._mount_wsgi(prefix, app, **options)
# 检查options是否为空
if options: if options:
# 如果options不为空则发出警告并回退到WSGI挂载
depr(0, 13, "Unsupported mount options. Falling back to WSGI mount.", depr(0, 13, "Unsupported mount options. Falling back to WSGI mount.",
"Do not specify any route options when mounting bottle application.") "Do not specify any route options when mounting bottle application.")
return self._mount_wsgi(prefix, app, **options) return self._mount_wsgi(prefix, app, **options)
# 检查prefix是否以'/'结尾
if not prefix.endswith("/"): if not prefix.endswith("/"):
# 如果prefix不以'/'结尾则发出警告并回退到WSGI挂载
depr(0, 13, "Prefix must end in '/'. Falling back to WSGI mount.", depr(0, 13, "Prefix must end in '/'. Falling back to WSGI mount.",
"Consider adding an explicit redirect from '/prefix' to '/prefix/' in the parent application.") "Consider adding an explicit redirect from '/prefix' to '/prefix/' in the parent application.")
return self._mount_wsgi(prefix, app, **options) return self._mount_wsgi(prefix, app, **options)
# 将app添加到_mounts列表中
self._mounts.append(app) self._mounts.append(app)
# 将prefix添加到app的config中
app.config['_mount.prefix'] = prefix app.config['_mount.prefix'] = prefix
# 将self添加到app的config中
app.config['_mount.app'] = self app.config['_mount.app'] = self
# 遍历app的routes
for route in app.routes: for route in app.routes:
# 将route的rule修改为prefix + route.rule.lstrip('/')
route.rule = prefix + route.rule.lstrip('/') route.rule = prefix + route.rule.lstrip('/')
# 将修改后的route添加到self的routes中
self.add_route(route) self.add_route(route)
def mount(self, prefix, app, **options): def mount(self, prefix, app, **options):
@ -781,11 +805,15 @@ class Bottle(object):
parent application. parent application.
""" """
# 检查prefix是否以'/'开头
if not prefix.startswith('/'): if not prefix.startswith('/'):
# 如果prefix不以'/'开头则抛出ValueError异常
raise ValueError("Prefix must start with '/'") raise ValueError("Prefix must start with '/'")
# 如果app是Bottle实例则调用_mount_app方法
if isinstance(app, Bottle): if isinstance(app, Bottle):
return self._mount_app(prefix, app, **options) return self._mount_app(prefix, app, **options)
# 否则调用_mount_wsgi方法
else: else:
return self._mount_wsgi(prefix, app, **options) return self._mount_wsgi(prefix, app, **options)
@ -1089,31 +1117,46 @@ class Bottle(object):
def wsgi(self, environ, start_response): def wsgi(self, environ, start_response):
""" The bottle WSGI-interface. """ """ The bottle WSGI-interface. """
try: try:
# 将environ传递给_handle方法获取返回值
out = self._cast(self._handle(environ)) out = self._cast(self._handle(environ))
# rfc2616 section 4.3 # rfc2616 section 4.3
# 如果返回的状态码是100, 101, 204, 304或者请求方法是HEAD则关闭输出流
if response._status_code in (100, 101, 204, 304)\ if response._status_code in (100, 101, 204, 304)\
or environ['REQUEST_METHOD'] == 'HEAD': or environ['REQUEST_METHOD'] == 'HEAD':
if hasattr(out, 'close'): out.close() if hasattr(out, 'close'): out.close()
out = [] out = []
# 获取environ中的bottle.exc_info
exc_info = environ.get('bottle.exc_info') exc_info = environ.get('bottle.exc_info')
# 如果有异常信息则删除environ中的bottle.exc_info
if exc_info is not None: if exc_info is not None:
del environ['bottle.exc_info'] del environ['bottle.exc_info']
# 调用start_response方法设置响应状态行、响应头和异常信息
start_response(response._wsgi_status_line(), response.headerlist, exc_info) start_response(response._wsgi_status_line(), response.headerlist, exc_info)
# 返回输出流
return out return out
except (KeyboardInterrupt, SystemExit, MemoryError): except (KeyboardInterrupt, SystemExit, MemoryError):
# 如果捕获到KeyboardInterrupt, SystemExit, MemoryError异常则抛出
raise raise
except Exception as E: except Exception as E:
# 如果没有开启catchall则抛出异常
if not self.catchall: raise if not self.catchall: raise
# 构造错误页面
err = '<h1>Critical error while processing request: %s</h1>' \ err = '<h1>Critical error while processing request: %s</h1>' \
% html_escape(environ.get('PATH_INFO', '/')) % html_escape(environ.get('PATH_INFO', '/'))
# 如果开启了DEBUG模式则输出错误信息和堆栈信息
if DEBUG: if DEBUG:
err += '<h2>Error:</h2>\n<pre>\n%s\n</pre>\n' \ err += '<h2>Error:</h2>\n<pre>\n%s\n</pre>\n' \
'<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n' \ '<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n' \
% (html_escape(repr(E)), html_escape(format_exc())) % (html_escape(repr(E)), html_escape(format_exc()))
# 将错误页面写入environ中的wsgi.errors
environ['wsgi.errors'].write(err) environ['wsgi.errors'].write(err)
# 刷新wsgi.errors
environ['wsgi.errors'].flush() environ['wsgi.errors'].flush()
# 设置响应头
headers = [('Content-Type', 'text/html; charset=UTF-8')] headers = [('Content-Type', 'text/html; charset=UTF-8')]
# 调用start_response方法设置响应状态行、响应头和异常信息
start_response('500 INTERNAL SERVER ERROR', headers, sys.exc_info()) start_response('500 INTERNAL SERVER ERROR', headers, sys.exc_info())
# 返回错误页面
return [tob(err)] return [tob(err)]
def __call__(self, environ, start_response): def __call__(self, environ, start_response):

@ -15,7 +15,6 @@
# 02110-1301 USA # 02110-1301 USA
######################### END LICENSE BLOCK ######################### ######################### END LICENSE BLOCK #########################
from .compat import PY2, PY3 from .compat import PY2, PY3
from .universaldetector import UniversalDetector from .universaldetector import UniversalDetector
from .version import __version__, VERSION from .version import __version__, VERSION
@ -25,15 +24,28 @@ def detect(byte_str):
""" """
Detect the encoding of the given byte string. Detect the encoding of the given byte string.
This function uses the UniversalDetector class to determine the encoding
of a given byte string. It creates a new UniversalDetector instance,
feeds the byte string to it, and then returns the detected encoding.
:param byte_str: The byte sequence to examine. :param byte_str: The byte sequence to examine.
:type byte_str: ``bytes`` or ``bytearray`` :type byte_str: ``bytes`` or ``bytearray``
:return: The detected encoding.
""" """
# Check if the input is of the correct type
if not isinstance(byte_str, bytearray): if not isinstance(byte_str, bytearray):
if not isinstance(byte_str, bytes): if not isinstance(byte_str, bytes):
raise TypeError('Expected object of type bytes or bytearray, got: ' raise TypeError('Expected object of type bytes or bytearray, got: '
'{0}'.format(type(byte_str))) '{0}'.format(type(byte_str)))
else: else:
# If the input is of type bytes, convert it to bytearray
byte_str = bytearray(byte_str) byte_str = bytearray(byte_str)
# Create a new UniversalDetector instance
detector = UniversalDetector() detector = UniversalDetector()
# Feed the byte string to the detector
detector.feed(byte_str) detector.feed(byte_str)
# Close the detector and return the detected encoding
return detector.close() return detector.close()

@ -32,10 +32,15 @@ from .mbcssm import BIG5_SM_MODEL
class Big5Prober(MultiByteCharSetProber): class Big5Prober(MultiByteCharSetProber):
# 初始化Big5Prober类
def __init__(self): def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(Big5Prober, self).__init__() super(Big5Prober, self).__init__()
# 初始化Big5编码状态机
self.coding_sm = CodingStateMachine(BIG5_SM_MODEL) self.coding_sm = CodingStateMachine(BIG5_SM_MODEL)
# 初始化Big5分布分析器
self.distribution_analyzer = Big5DistributionAnalysis() self.distribution_analyzer = Big5DistributionAnalysis()
# 重置Big5Prober类
self.reset() self.reset()
@property @property

@ -30,69 +30,126 @@ from .charsetprober import CharSetProber
class CharSetGroupProber(CharSetProber): class CharSetGroupProber(CharSetProber):
# 初始化函数,传入语言过滤器
def __init__(self, lang_filter=None): def __init__(self, lang_filter=None):
# 调用父类的初始化函数
super(CharSetGroupProber, self).__init__(lang_filter=lang_filter) super(CharSetGroupProber, self).__init__(lang_filter=lang_filter)
# 初始化活动探测器数量
self._active_num = 0 self._active_num = 0
# 初始化探测器列表
self.probers = [] self.probers = []
# 初始化最佳猜测探测器
self._best_guess_prober = None self._best_guess_prober = None
# 重置函数
def reset(self): def reset(self):
# 调用父类的重置函数
super(CharSetGroupProber, self).reset() super(CharSetGroupProber, self).reset()
# 重置活动探测器数量
self._active_num = 0 self._active_num = 0
# 遍历探测器列表
for prober in self.probers: for prober in self.probers:
# 如果探测器存在
if prober: if prober:
# 重置探测器
prober.reset() prober.reset()
# 设置探测器为活动状态
prober.active = True prober.active = True
# 活动探测器数量加一
self._active_num += 1 self._active_num += 1
# 重置最佳猜测探测器
self._best_guess_prober = None self._best_guess_prober = None
# 获取字符集名称的属性函数
@property @property
def charset_name(self): def charset_name(self):
# 如果最佳猜测探测器不存在
if not self._best_guess_prober: if not self._best_guess_prober:
# 调用获取置信度函数
self.get_confidence() self.get_confidence()
# 如果最佳猜测探测器仍然不存在
if not self._best_guess_prober: if not self._best_guess_prober:
# 返回None
return None return None
# 返回最佳猜测探测器的字符集名称
return self._best_guess_prober.charset_name return self._best_guess_prober.charset_name
# 获取语言的属性函数
@property @property
def language(self): def language(self):
# 如果最佳猜测探测器不存在
if not self._best_guess_prober: if not self._best_guess_prober:
# 调用获取置信度函数
self.get_confidence() self.get_confidence()
# 如果最佳猜测探测器仍然不存在
if not self._best_guess_prober: if not self._best_guess_prober:
# 返回None
return None return None
# 返回最佳猜测探测器的语言
return self._best_guess_prober.language return self._best_guess_prober.language
# 接收字节字符串的函数
def feed(self, byte_str): def feed(self, byte_str):
# 遍历探测器列表
for prober in self.probers: for prober in self.probers:
# 如果探测器不存在
if not prober: if not prober:
# 跳过
continue continue
# 如果探测器不是活动状态
if not prober.active: if not prober.active:
# 跳过
continue continue
# 调用探测器接收字节字符串的函数
state = prober.feed(byte_str) state = prober.feed(byte_str)
# 如果探测器返回的状态不是FOUND_IT
if not state: if not state:
# 跳过
continue continue
# 如果探测器返回的状态是FOUND_IT
if state == ProbingState.FOUND_IT: if state == ProbingState.FOUND_IT:
# 设置最佳猜测探测器为当前探测器
self._best_guess_prober = prober self._best_guess_prober = prober
# 返回当前探测器的状态
return self.state return self.state
# 如果探测器返回的状态是NOT_ME
elif state == ProbingState.NOT_ME: elif state == ProbingState.NOT_ME:
# 设置探测器为非活动状态
prober.active = False prober.active = False
# 活动探测器数量减一
self._active_num -= 1 self._active_num -= 1
# 如果活动探测器数量小于等于0
if self._active_num <= 0: if self._active_num <= 0:
# 设置当前探测器的状态为NOT_ME
self._state = ProbingState.NOT_ME self._state = ProbingState.NOT_ME
# 返回当前探测器的状态
return self.state return self.state
# 返回当前探测器的状态
return self.state return self.state
# 获取置信度的函数
def get_confidence(self): def get_confidence(self):
# 获取当前探测器的状态
state = self.state state = self.state
# 如果当前探测器的状态是FOUND_IT
if state == ProbingState.FOUND_IT: if state == ProbingState.FOUND_IT:
# 返回0.99
return 0.99 return 0.99
# 如果当前探测器的状态是NOT_ME
elif state == ProbingState.NOT_ME: elif state == ProbingState.NOT_ME:
# 返回0.01
return 0.01 return 0.01
# 初始化最佳置信度
best_conf = 0.0 best_conf = 0.0
# 重置最佳猜测探测器
self._best_guess_prober = None self._best_guess_prober = None
# 遍历探测器列表
for prober in self.probers: for prober in self.probers:
# 如果探测器不存在
if not prober: if not prober:
# 跳过
continue continue
# 如果探测器不是活动状态
if not prober.active: if not prober.active:
self.logger.debug('%s not active', prober.charset_name) self.logger.debug('%s not active', prober.charset_name)
continue continue

@ -34,32 +34,42 @@ from .enums import ProbingState
class CharSetProber(object): class CharSetProber(object):
# 定义一个阈值,当检测到的字符集概率大于这个值时,认为检测成功
SHORTCUT_THRESHOLD = 0.95 SHORTCUT_THRESHOLD = 0.95
def __init__(self, lang_filter=None): def __init__(self, lang_filter=None):
# 初始化状态为检测中
self._state = None self._state = None
# 设置语言过滤器
self.lang_filter = lang_filter self.lang_filter = lang_filter
# 获取日志记录器
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
def reset(self): def reset(self):
# 重置状态为检测中
self._state = ProbingState.DETECTING self._state = ProbingState.DETECTING
@property @property
def charset_name(self): def charset_name(self):
# 返回字符集名称这里返回None
return None return None
def feed(self, buf): def feed(self, buf):
# 接收输入的缓冲区
pass pass
@property @property
def state(self): def state(self):
# 返回当前状态
return self._state return self._state
def get_confidence(self): def get_confidence(self):
# 返回检测到的字符集的概率这里返回0.0
return 0.0 return 0.0
@staticmethod @staticmethod
def filter_high_byte_only(buf): def filter_high_byte_only(buf):
# 过滤掉所有非高字节字符
buf = re.sub(b'([\x00-\x7F])+', b' ', buf) buf = re.sub(b'([\x00-\x7F])+', b' ', buf)
return buf return buf

@ -53,20 +53,29 @@ class CodingStateMachine(object):
encoding from consideration from here on. encoding from consideration from here on.
""" """
def __init__(self, sm): def __init__(self, sm):
# 初始化函数sm为传入的模型
self._model = sm self._model = sm
# 当前字节位置
self._curr_byte_pos = 0 self._curr_byte_pos = 0
# 当前字符长度
self._curr_char_len = 0 self._curr_char_len = 0
# 当前状态
self._curr_state = None self._curr_state = None
# 获取logger
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
# 重置
self.reset() self.reset()
def reset(self): def reset(self):
# 重置函数,将当前状态设置为起始状态
self._curr_state = MachineState.START self._curr_state = MachineState.START
def next_state(self, c): def next_state(self, c):
# for each byte we get its class # for each byte we get its class
# if it is first byte, we also get byte length # if it is first byte, we also get byte length
# 获取当前字节的类别
byte_class = self._model['class_table'][c] byte_class = self._model['class_table'][c]
# 如果当前状态为起始状态,则获取当前字符长度
if self._curr_state == MachineState.START: if self._curr_state == MachineState.START:
self._curr_byte_pos = 0 self._curr_byte_pos = 0
self._curr_char_len = self._model['char_len_table'][byte_class] self._curr_char_len = self._model['char_len_table'][byte_class]

@ -22,13 +22,20 @@
import sys import sys
# 判断当前Python版本是否小于3.0
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
# 如果是Python2版本
PY2 = True PY2 = True
PY3 = False PY3 = False
# 定义base_str为str和unicode类型
base_str = (str, unicode) base_str = (str, unicode)
# 定义text_type为unicode类型
text_type = unicode text_type = unicode
else: else:
# 如果是Python3版本
PY2 = False PY2 = False
PY3 = True PY3 = True
# 定义base_str为bytes和str类型
base_str = (bytes, str) base_str = (bytes, str)
# 定义text_type为str类型
text_type = str text_type = str

@ -40,62 +40,95 @@ class EscCharSetProber(CharSetProber):
""" """
def __init__(self, lang_filter=None): def __init__(self, lang_filter=None):
# 初始化EscCharSetProber类
super(EscCharSetProber, self).__init__(lang_filter=lang_filter) super(EscCharSetProber, self).__init__(lang_filter=lang_filter)
# 初始化编码状态机列表
self.coding_sm = [] self.coding_sm = []
# 如果语言过滤器包含简体中文
if self.lang_filter & LanguageFilter.CHINESE_SIMPLIFIED: if self.lang_filter & LanguageFilter.CHINESE_SIMPLIFIED:
# 添加简体中文编码状态机
self.coding_sm.append(CodingStateMachine(HZ_SM_MODEL)) self.coding_sm.append(CodingStateMachine(HZ_SM_MODEL))
# 添加ISO2022CN编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022CN_SM_MODEL)) self.coding_sm.append(CodingStateMachine(ISO2022CN_SM_MODEL))
# 如果语言过滤器包含日语
if self.lang_filter & LanguageFilter.JAPANESE: if self.lang_filter & LanguageFilter.JAPANESE:
# 添加ISO2022JP编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022JP_SM_MODEL)) self.coding_sm.append(CodingStateMachine(ISO2022JP_SM_MODEL))
# 如果语言过滤器包含韩语
if self.lang_filter & LanguageFilter.KOREAN: if self.lang_filter & LanguageFilter.KOREAN:
# 添加ISO2022KR编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022KR_SM_MODEL)) self.coding_sm.append(CodingStateMachine(ISO2022KR_SM_MODEL))
# 初始化活动状态机数量
self.active_sm_count = None self.active_sm_count = None
# 初始化检测到的字符集
self._detected_charset = None self._detected_charset = None
# 初始化检测到的语言
self._detected_language = None self._detected_language = None
# 初始化状态
self._state = None self._state = None
# 重置
self.reset() self.reset()
def reset(self): def reset(self):
# 重置EscCharSetProber类
super(EscCharSetProber, self).reset() super(EscCharSetProber, self).reset()
# 遍历编码状态机列表
for coding_sm in self.coding_sm: for coding_sm in self.coding_sm:
# 如果编码状态机为空,则跳过
if not coding_sm: if not coding_sm:
continue continue
# 设置编码状态机为活动状态
coding_sm.active = True coding_sm.active = True
# 重置编码状态机
coding_sm.reset() coding_sm.reset()
# 设置活动状态机数量为编码状态机列表的长度
self.active_sm_count = len(self.coding_sm) self.active_sm_count = len(self.coding_sm)
# 设置检测到的字符集为空
self._detected_charset = None self._detected_charset = None
# 设置检测到的语言为空
self._detected_language = None self._detected_language = None
@property @property
def charset_name(self): def charset_name(self):
# 返回检测到的字符集
return self._detected_charset return self._detected_charset
@property @property
def language(self): def language(self):
# 返回检测到的语言
return self._detected_language return self._detected_language
def get_confidence(self): def get_confidence(self):
# 如果检测到了字符集则返回0.99否则返回0.00
if self._detected_charset: if self._detected_charset:
return 0.99 return 0.99
else: else:
return 0.00 return 0.00
def feed(self, byte_str): def feed(self, byte_str):
# 遍历字节字符串
for c in byte_str: for c in byte_str:
# 遍历编码状态机列表
for coding_sm in self.coding_sm: for coding_sm in self.coding_sm:
# 如果编码状态机为空或非活动状态,则跳过
if not coding_sm or not coding_sm.active: if not coding_sm or not coding_sm.active:
continue continue
# 获取编码状态机的下一个状态
coding_state = coding_sm.next_state(c) coding_state = coding_sm.next_state(c)
# 如果状态为错误,则设置编码状态机为非活动状态,活动状态机数量减一
if coding_state == MachineState.ERROR: if coding_state == MachineState.ERROR:
coding_sm.active = False coding_sm.active = False
self.active_sm_count -= 1 self.active_sm_count -= 1
# 如果活动状态机数量小于等于0则设置状态为非匹配
if self.active_sm_count <= 0: if self.active_sm_count <= 0:
self._state = ProbingState.NOT_ME self._state = ProbingState.NOT_ME
return self.state return self.state
# 如果状态为匹配,则设置状态为匹配,设置检测到的字符集和语言
elif coding_state == MachineState.ITS_ME: elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
self._detected_charset = coding_sm.get_coding_state_machine() self._detected_charset = coding_sm.get_coding_state_machine()
self._detected_language = coding_sm.language self._detected_language = coding_sm.language
return self.state return self.state
# 返回状态
return self.state return self.state

@ -34,59 +34,90 @@ from .mbcssm import EUCJP_SM_MODEL
class EUCJPProber(MultiByteCharSetProber): class EUCJPProber(MultiByteCharSetProber):
# 初始化EUCJPProber类
def __init__(self): def __init__(self):
super(EUCJPProber, self).__init__() super(EUCJPProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCJP_SM_MODEL) self.coding_sm = CodingStateMachine(EUCJP_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCJPDistributionAnalysis() self.distribution_analyzer = EUCJPDistributionAnalysis()
# 初始化上下文分析器
self.context_analyzer = EUCJPContextAnalysis() self.context_analyzer = EUCJPContextAnalysis()
# 重置
self.reset() self.reset()
# 重置
def reset(self): def reset(self):
super(EUCJPProber, self).reset() super(EUCJPProber, self).reset()
self.context_analyzer.reset() self.context_analyzer.reset()
# 获取字符集名称
@property @property
def charset_name(self): def charset_name(self):
return "EUC-JP" return "EUC-JP"
# 获取语言
@property @property
def language(self): def language(self):
return "Japanese" return "Japanese"
# 输入字节流
def feed(self, byte_str): def feed(self, byte_str):
for i in range(len(byte_str)): for i in range(len(byte_str)):
# PY3K: byte_str is a byte array, so byte_str[i] is an int, not a byte # PY3K: byte_str is a byte array, so byte_str[i] is an int, not a byte
# 获取下一个状态
coding_state = self.coding_sm.next_state(byte_str[i]) coding_state = self.coding_sm.next_state(byte_str[i])
# 如果状态为错误
if coding_state == MachineState.ERROR: if coding_state == MachineState.ERROR:
self.logger.debug('%s %s prober hit error at byte %s', self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i) self.charset_name, self.language, i)
# 设置状态为不是该字符集
self._state = ProbingState.NOT_ME self._state = ProbingState.NOT_ME
break break
# 如果状态为确定
elif coding_state == MachineState.ITS_ME: elif coding_state == MachineState.ITS_ME:
# 设置状态为确定
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
break break
# 如果状态为开始
elif coding_state == MachineState.START: elif coding_state == MachineState.START:
# 获取当前字符长度
char_len = self.coding_sm.get_current_charlen() char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字符
if i == 0: if i == 0:
# 更新最后一个字符
self._last_char[1] = byte_str[0] self._last_char[1] = byte_str[0]
# 输入最后一个字符和当前字符长度到上下文分析器
self.context_analyzer.feed(self._last_char, char_len) self.context_analyzer.feed(self._last_char, char_len)
# 输入最后一个字符和当前字符长度到分布分析器
self.distribution_analyzer.feed(self._last_char, char_len) self.distribution_analyzer.feed(self._last_char, char_len)
else: else:
# 输入前一个字符和当前字符到上下文分析器
self.context_analyzer.feed(byte_str[i - 1:i + 1], self.context_analyzer.feed(byte_str[i - 1:i + 1],
char_len) char_len)
# 输入前一个字符和当前字符到分布分析器
self.distribution_analyzer.feed(byte_str[i - 1:i + 1], self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len) char_len)
# 更新最后一个字符
self._last_char[0] = byte_str[-1] self._last_char[0] = byte_str[-1]
# 如果状态为检测中
if self.state == ProbingState.DETECTING: if self.state == ProbingState.DETECTING:
# 如果上下文分析器有足够的数据,并且置信度大于阈值
if (self.context_analyzer.got_enough_data() and if (self.context_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)): (self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 设置状态为确定
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
# 返回状态
return self.state return self.state
# 获取置信度
def get_confidence(self): def get_confidence(self):
# 获取上下文分析器的置信度
context_conf = self.context_analyzer.get_confidence() context_conf = self.context_analyzer.get_confidence()
# 获取分布分析器的置信度
distrib_conf = self.distribution_analyzer.get_confidence() distrib_conf = self.distribution_analyzer.get_confidence()
# 返回最大置信度
return max(context_conf, distrib_conf) return max(context_conf, distrib_conf)

@ -32,16 +32,23 @@ from .mbcssm import EUCKR_SM_MODEL
class EUCKRProber(MultiByteCharSetProber): class EUCKRProber(MultiByteCharSetProber):
# 初始化EUCKRProber类
def __init__(self): def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(EUCKRProber, self).__init__() super(EUCKRProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCKR_SM_MODEL) self.coding_sm = CodingStateMachine(EUCKR_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCKRDistributionAnalysis() self.distribution_analyzer = EUCKRDistributionAnalysis()
# 重置
self.reset() self.reset()
# 获取字符集名称
@property @property
def charset_name(self): def charset_name(self):
return "EUC-KR" return "EUC-KR"
# 获取语言
@property @property
def language(self): def language(self):
return "Korean" return "Korean"

@ -31,16 +31,23 @@ from .chardistribution import EUCTWDistributionAnalysis
from .mbcssm import EUCTW_SM_MODEL from .mbcssm import EUCTW_SM_MODEL
class EUCTWProber(MultiByteCharSetProber): class EUCTWProber(MultiByteCharSetProber):
# 初始化EUCTWProber类
def __init__(self): def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(EUCTWProber, self).__init__() super(EUCTWProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCTW_SM_MODEL) self.coding_sm = CodingStateMachine(EUCTW_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCTWDistributionAnalysis() self.distribution_analyzer = EUCTWDistributionAnalysis()
# 重置
self.reset() self.reset()
# 获取字符集名称
@property @property
def charset_name(self): def charset_name(self):
return "EUC-TW" return "EUC-TW"
# 获取语言
@property @property
def language(self): def language(self):
return "Taiwan" return "Taiwan"

@ -31,16 +31,23 @@ from .chardistribution import GB2312DistributionAnalysis
from .mbcssm import GB2312_SM_MODEL from .mbcssm import GB2312_SM_MODEL
class GB2312Prober(MultiByteCharSetProber): class GB2312Prober(MultiByteCharSetProber):
# 初始化GB2312Prober类
def __init__(self): def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(GB2312Prober, self).__init__() super(GB2312Prober, self).__init__()
# 初始化GB2312编码状态机
self.coding_sm = CodingStateMachine(GB2312_SM_MODEL) self.coding_sm = CodingStateMachine(GB2312_SM_MODEL)
# 初始化GB2312分布分析器
self.distribution_analyzer = GB2312DistributionAnalysis() self.distribution_analyzer = GB2312DistributionAnalysis()
# 重置
self.reset() self.reset()
# 获取字符集名称
@property @property
def charset_name(self): def charset_name(self):
return "GB2312" return "GB2312"
# 获取语言
@property @property
def language(self): def language(self):
return "Chinese" return "Chinese"

@ -152,17 +152,27 @@ class HebrewProber(CharSetProber):
LOGICAL_HEBREW_NAME = "windows-1255" LOGICAL_HEBREW_NAME = "windows-1255"
def __init__(self): def __init__(self):
# 初始化HebrewProber类
super(HebrewProber, self).__init__() super(HebrewProber, self).__init__()
# 初始化_final_char_logical_score为None
self._final_char_logical_score = None self._final_char_logical_score = None
# 初始化_final_char_visual_score为None
self._final_char_visual_score = None self._final_char_visual_score = None
# 初始化_prev为None
self._prev = None self._prev = None
# 初始化_before_prev为None
self._before_prev = None self._before_prev = None
# 初始化_logical_prober为None
self._logical_prober = None self._logical_prober = None
# 初始化_visual_prober为None
self._visual_prober = None self._visual_prober = None
# 调用reset方法
self.reset() self.reset()
def reset(self): def reset(self):
# 重置_final_char_logical_score为0
self._final_char_logical_score = 0 self._final_char_logical_score = 0
# 重置_final_char_visual_score为0
self._final_char_visual_score = 0 self._final_char_visual_score = 0
# The two last characters seen in the previous buffer, # The two last characters seen in the previous buffer,
# mPrev and mBeforePrev are initialized to space in order to simulate # mPrev and mBeforePrev are initialized to space in order to simulate

@ -37,17 +37,28 @@ class MultiByteCharSetProber(CharSetProber):
""" """
def __init__(self, lang_filter=None): def __init__(self, lang_filter=None):
# 初始化函数传入参数lang_filter
super(MultiByteCharSetProber, self).__init__(lang_filter=lang_filter) super(MultiByteCharSetProber, self).__init__(lang_filter=lang_filter)
# 调用父类的初始化函数
self.distribution_analyzer = None self.distribution_analyzer = None
# 初始化分布分析器
self.coding_sm = None self.coding_sm = None
# 初始化编码状态机
self._last_char = [0, 0] self._last_char = [0, 0]
# 初始化最后一个字符
def reset(self): def reset(self):
# 重置函数
super(MultiByteCharSetProber, self).reset() super(MultiByteCharSetProber, self).reset()
# 调用父类的重置函数
if self.coding_sm: if self.coding_sm:
# 如果编码状态机存在
self.coding_sm.reset() self.coding_sm.reset()
# 重置编码状态机
if self.distribution_analyzer: if self.distribution_analyzer:
# 如果分布分析器存在
self.distribution_analyzer.reset() self.distribution_analyzer.reset()
# 重置分布分析器
self._last_char = [0, 0] self._last_char = [0, 0]
@property @property
@ -59,33 +70,45 @@ class MultiByteCharSetProber(CharSetProber):
raise NotImplementedError raise NotImplementedError
def feed(self, byte_str): def feed(self, byte_str):
# 遍历byte_str中的每个字节
for i in range(len(byte_str)): for i in range(len(byte_str)):
# 获取当前字节的编码状态
coding_state = self.coding_sm.next_state(byte_str[i]) coding_state = self.coding_sm.next_state(byte_str[i])
# 如果编码状态为错误则记录错误信息并将状态设置为NOT_ME
if coding_state == MachineState.ERROR: if coding_state == MachineState.ERROR:
self.logger.debug('%s %s prober hit error at byte %s', self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i) self.charset_name, self.language, i)
self._state = ProbingState.NOT_ME self._state = ProbingState.NOT_ME
break break
# 如果编码状态为确定则将状态设置为FOUND_IT
elif coding_state == MachineState.ITS_ME: elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
break break
# 如果编码状态为开始,则获取当前字符长度
elif coding_state == MachineState.START: elif coding_state == MachineState.START:
char_len = self.coding_sm.get_current_charlen() char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字节则将当前字节和上一个字节作为参数传入feed方法
if i == 0: if i == 0:
self._last_char[1] = byte_str[0] self._last_char[1] = byte_str[0]
self.distribution_analyzer.feed(self._last_char, char_len) self.distribution_analyzer.feed(self._last_char, char_len)
# 否则将当前字节和上一个字节作为参数传入feed方法
else: else:
self.distribution_analyzer.feed(byte_str[i - 1:i + 1], self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len) char_len)
# 将最后一个字节赋值给_last_char[0]
self._last_char[0] = byte_str[-1] self._last_char[0] = byte_str[-1]
# 如果状态为DETECTING则判断是否已经获取足够的数据并且置信度是否大于SHORTCUT_THRESHOLD
if self.state == ProbingState.DETECTING: if self.state == ProbingState.DETECTING:
if (self.distribution_analyzer.got_enough_data() and if (self.distribution_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)): (self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 如果满足条件则将状态设置为FOUND_IT
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
# 返回状态
return self.state return self.state
def get_confidence(self): def get_confidence(self):
# 获取置信度
return self.distribution_analyzer.get_confidence() return self.distribution_analyzer.get_confidence()

@ -39,16 +39,20 @@ from .euctwprober import EUCTWProber
class MBCSGroupProber(CharSetGroupProber): class MBCSGroupProber(CharSetGroupProber):
# 初始化MBCSGroupProber类继承自CharSetGroupProber类
def __init__(self, lang_filter=None): def __init__(self, lang_filter=None):
# 调用父类CharSetGroupProber的初始化方法
super(MBCSGroupProber, self).__init__(lang_filter=lang_filter) super(MBCSGroupProber, self).__init__(lang_filter=lang_filter)
# 定义一个包含多种字符集探测器的列表
self.probers = [ self.probers = [
UTF8Prober(), UTF8Prober(), # UTF-8字符集探测器
SJISProber(), SJISProber(), # Shift_JIS字符集探测器
EUCJPProber(), EUCJPProber(), # EUC-JP字符集探测器
GB2312Prober(), GB2312Prober(), # GB2312字符集探测器
EUCKRProber(), EUCKRProber(), # EUCKR字符集探测器
CP949Prober(), CP949Prober(), # CP949字符集探测器
Big5Prober(), Big5Prober(), # Big5字符集探测器
EUCTWProber() EUCTWProber() # EUCTW字符集探测器
] ]
# 重置探测器
self.reset() self.reset()

@ -31,13 +31,19 @@ from .enums import CharacterCategory, ProbingState, SequenceLikelihood
class SingleByteCharSetProber(CharSetProber): class SingleByteCharSetProber(CharSetProber):
# 定义样本大小
SAMPLE_SIZE = 64 SAMPLE_SIZE = 64
# 定义相对阈值
SB_ENOUGH_REL_THRESHOLD = 1024 # 0.25 * SAMPLE_SIZE^2 SB_ENOUGH_REL_THRESHOLD = 1024 # 0.25 * SAMPLE_SIZE^2
# 定义正向阈值
POSITIVE_SHORTCUT_THRESHOLD = 0.95 POSITIVE_SHORTCUT_THRESHOLD = 0.95
# 定义负向阈值
NEGATIVE_SHORTCUT_THRESHOLD = 0.05 NEGATIVE_SHORTCUT_THRESHOLD = 0.05
def __init__(self, model, reversed=False, name_prober=None): def __init__(self, model, reversed=False, name_prober=None):
# 调用父类构造函数
super(SingleByteCharSetProber, self).__init__() super(SingleByteCharSetProber, self).__init__()
# 设置模型
self._model = model self._model = model
# TRUE if we need to reverse every pair in the model lookup # TRUE if we need to reverse every pair in the model lookup
self._reversed = reversed self._reversed = reversed
@ -51,6 +57,7 @@ class SingleByteCharSetProber(CharSetProber):
self.reset() self.reset()
def reset(self): def reset(self):
# 重置函数
super(SingleByteCharSetProber, self).reset() super(SingleByteCharSetProber, self).reset()
# char order of last character # char order of last character
self._last_order = 255 self._last_order = 255
@ -69,16 +76,20 @@ class SingleByteCharSetProber(CharSetProber):
@property @property
def language(self): def language(self):
# 如果_name_prober存在则返回_name_prober的语言否则返回_model中的语言
if self._name_prober: if self._name_prober:
return self._name_prober.language return self._name_prober.language
else: else:
return self._model.get('language') return self._model.get('language')
def feed(self, byte_str): def feed(self, byte_str):
# 如果_model中的keep_english_letter为False则过滤掉国际字符
if not self._model['keep_english_letter']: if not self._model['keep_english_letter']:
byte_str = self.filter_international_words(byte_str) byte_str = self.filter_international_words(byte_str)
# 如果byte_str为空则返回状态
if not byte_str: if not byte_str:
return self.state return self.state
# 获取字符到顺序的映射
char_to_order_map = self._model['char_to_order_map'] char_to_order_map = self._model['char_to_order_map']
for i, c in enumerate(byte_str): for i, c in enumerate(byte_str):
# XXX: Order is in range 1-64, so one would think we want 0-63 here, # XXX: Order is in range 1-64, so one would think we want 0-63 here,
@ -122,11 +133,17 @@ class SingleByteCharSetProber(CharSetProber):
return self.state return self.state
def get_confidence(self): def get_confidence(self):
# 初始化r为0.01
r = 0.01 r = 0.01
# 如果总序列数大于0
if self._total_seqs > 0: if self._total_seqs > 0:
# 计算r的值
r = ((1.0 * self._seq_counters[SequenceLikelihood.POSITIVE]) / r = ((1.0 * self._seq_counters[SequenceLikelihood.POSITIVE]) /
self._total_seqs / self._model['typical_positive_ratio']) self._total_seqs / self._model['typical_positive_ratio'])
# 乘以字符频率和总字符数
r = r * self._freq_char / self._total_char r = r * self._freq_char / self._total_char
# 如果r大于等于1.0则将r设置为0.99
if r >= 1.0: if r >= 1.0:
r = 0.99 r = 0.99
# 返回r的值
return r return r

@ -34,59 +34,94 @@ from .enums import ProbingState, MachineState
class SJISProber(MultiByteCharSetProber): class SJISProber(MultiByteCharSetProber):
# 初始化函数
def __init__(self): def __init__(self):
# 调用父类的初始化函数
super(SJISProber, self).__init__() super(SJISProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(SJIS_SM_MODEL) self.coding_sm = CodingStateMachine(SJIS_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = SJISDistributionAnalysis() self.distribution_analyzer = SJISDistributionAnalysis()
# 初始化上下文分析器
self.context_analyzer = SJISContextAnalysis() self.context_analyzer = SJISContextAnalysis()
# 重置分析器
self.reset() self.reset()
# 重置函数
def reset(self): def reset(self):
# 调用父类的重置函数
super(SJISProber, self).reset() super(SJISProber, self).reset()
# 重置上下文分析器
self.context_analyzer.reset() self.context_analyzer.reset()
@property @property
def charset_name(self): def charset_name(self):
# 返回字符集名称
return self.context_analyzer.charset_name return self.context_analyzer.charset_name
@property @property
def language(self): def language(self):
# 返回语言
return "Japanese" return "Japanese"
def feed(self, byte_str): def feed(self, byte_str):
# 遍历字节字符串
for i in range(len(byte_str)): for i in range(len(byte_str)):
# 获取下一个状态
coding_state = self.coding_sm.next_state(byte_str[i]) coding_state = self.coding_sm.next_state(byte_str[i])
# 如果状态为错误
if coding_state == MachineState.ERROR: if coding_state == MachineState.ERROR:
# 记录错误日志
self.logger.debug('%s %s prober hit error at byte %s', self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i) self.charset_name, self.language, i)
# 设置状态为不是该字符集
self._state = ProbingState.NOT_ME self._state = ProbingState.NOT_ME
break break
# 如果状态为确定
elif coding_state == MachineState.ITS_ME: elif coding_state == MachineState.ITS_ME:
# 设置状态为确定
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
break break
# 如果状态为开始
elif coding_state == MachineState.START: elif coding_state == MachineState.START:
# 获取当前字符长度
char_len = self.coding_sm.get_current_charlen() char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字符
if i == 0: if i == 0:
# 更新最后一个字符
self._last_char[1] = byte_str[0] self._last_char[1] = byte_str[0]
# 向上下文分析器输入字符
self.context_analyzer.feed(self._last_char[2 - char_len:], self.context_analyzer.feed(self._last_char[2 - char_len:],
char_len) char_len)
# 向分布分析器输入字符
self.distribution_analyzer.feed(self._last_char, char_len) self.distribution_analyzer.feed(self._last_char, char_len)
else: else:
# 向上下文分析器输入字符
self.context_analyzer.feed(byte_str[i + 1 - char_len:i + 3 self.context_analyzer.feed(byte_str[i + 1 - char_len:i + 3
- char_len], char_len) - char_len], char_len)
# 向分布分析器输入字符
self.distribution_analyzer.feed(byte_str[i - 1:i + 1], self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len) char_len)
# 更新最后一个字符
self._last_char[0] = byte_str[-1] self._last_char[0] = byte_str[-1]
# 如果状态为检测中
if self.state == ProbingState.DETECTING: if self.state == ProbingState.DETECTING:
# 如果上下文分析器有足够的数据,并且置信度大于阈值
if (self.context_analyzer.got_enough_data() and if (self.context_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)): (self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 设置状态为确定
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
# 返回状态
return self.state return self.state
# 获取置信度
def get_confidence(self): def get_confidence(self):
# 获取上下文分析器的置信度
context_conf = self.context_analyzer.get_confidence() context_conf = self.context_analyzer.get_confidence()
# 获取分布分析器的置信度
distrib_conf = self.distribution_analyzer.get_confidence() distrib_conf = self.distribution_analyzer.get_confidence()
# 返回上下文置信度和分布置信度中的最大值
return max(context_conf, distrib_conf) return max(context_conf, distrib_conf)

@ -79,16 +79,27 @@ class UniversalDetector(object):
'iso-8859-13': 'Windows-1257'} 'iso-8859-13': 'Windows-1257'}
def __init__(self, lang_filter=LanguageFilter.ALL): def __init__(self, lang_filter=LanguageFilter.ALL):
# 初始化语言过滤器
self._esc_charset_prober = None self._esc_charset_prober = None
# 初始化字符集探测器
self._charset_probers = [] self._charset_probers = []
# 初始化结果
self.result = None self.result = None
# 初始化完成标志
self.done = None self.done = None
# 初始化是否获取数据标志
self._got_data = None self._got_data = None
# 初始化输入状态
self._input_state = None self._input_state = None
# 初始化最后一个字符
self._last_char = None self._last_char = None
# 设置语言过滤器
self.lang_filter = lang_filter self.lang_filter = lang_filter
# 获取日志记录器
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
# 初始化是否包含Windows字节标志
self._has_win_bytes = None self._has_win_bytes = None
# 重置
self.reset() self.reset()
def reset(self): def reset(self):
@ -97,14 +108,22 @@ class UniversalDetector(object):
initial states. This is called by ``__init__``, so you only need to initial states. This is called by ``__init__``, so you only need to
call this directly in between analyses of different documents. call this directly in between analyses of different documents.
""" """
# 重置结果
self.result = {'encoding': None, 'confidence': 0.0, 'language': None} self.result = {'encoding': None, 'confidence': 0.0, 'language': None}
# 重置完成标志
self.done = False self.done = False
# 重置是否接收到数据标志
self._got_data = False self._got_data = False
# 重置是否有win字节标志
self._has_win_bytes = False self._has_win_bytes = False
# 重置输入状态
self._input_state = InputState.PURE_ASCII self._input_state = InputState.PURE_ASCII
# 重置最后一个字符
self._last_char = b'' self._last_char = b''
# 如果有esc字符集探测器重置它
if self._esc_charset_prober: if self._esc_charset_prober:
self._esc_charset_prober.reset() self._esc_charset_prober.reset()
# 重置所有字符集探测器
for prober in self._charset_probers: for prober in self._charset_probers:
prober.reset() prober.reset()

@ -33,50 +33,75 @@ from .mbcssm import UTF8_SM_MODEL
class UTF8Prober(CharSetProber): class UTF8Prober(CharSetProber):
# 定义一个常量表示一个字符的初始概率为0.5
ONE_CHAR_PROB = 0.5 ONE_CHAR_PROB = 0.5
# 初始化函数
def __init__(self): def __init__(self):
# 调用父类的初始化函数
super(UTF8Prober, self).__init__() super(UTF8Prober, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(UTF8_SM_MODEL) self.coding_sm = CodingStateMachine(UTF8_SM_MODEL)
# 初始化多字节字符数量
self._num_mb_chars = None self._num_mb_chars = None
# 调用重置函数
self.reset() self.reset()
# 重置函数
def reset(self): def reset(self):
# 调用父类的重置函数
super(UTF8Prober, self).reset() super(UTF8Prober, self).reset()
# 重置编码状态机
self.coding_sm.reset() self.coding_sm.reset()
# 重置多字节字符数量
self._num_mb_chars = 0 self._num_mb_chars = 0
# 获取字符集名称的属性
@property @property
def charset_name(self): def charset_name(self):
# 返回字符集名称
return "utf-8" return "utf-8"
# 获取语言名称的属性
@property @property
def language(self): def language(self):
# 返回语言名称
return "" return ""
def feed(self, byte_str): def feed(self, byte_str):
# 遍历byte_str中的每个字符
for c in byte_str: for c in byte_str:
# 获取下一个状态
coding_state = self.coding_sm.next_state(c) coding_state = self.coding_sm.next_state(c)
# 如果状态为ERROR则将状态设置为NOT_ME并跳出循环
if coding_state == MachineState.ERROR: if coding_state == MachineState.ERROR:
self._state = ProbingState.NOT_ME self._state = ProbingState.NOT_ME
break break
# 如果状态为ITS_ME则将状态设置为FOUND_IT并跳出循环
elif coding_state == MachineState.ITS_ME: elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
break break
# 如果状态为START且当前字符长度大于等于2则将_num_mb_chars加1
elif coding_state == MachineState.START: elif coding_state == MachineState.START:
if self.coding_sm.get_current_charlen() >= 2: if self.coding_sm.get_current_charlen() >= 2:
self._num_mb_chars += 1 self._num_mb_chars += 1
# 如果状态为DETECTING且置信度大于SHORTCUT_THRESHOLD则将状态设置为FOUND_IT
if self.state == ProbingState.DETECTING: if self.state == ProbingState.DETECTING:
if self.get_confidence() > self.SHORTCUT_THRESHOLD: if self.get_confidence() > self.SHORTCUT_THRESHOLD:
self._state = ProbingState.FOUND_IT self._state = ProbingState.FOUND_IT
# 返回状态
return self.state return self.state
def get_confidence(self): def get_confidence(self):
# 初始化 unlike 为 0.99
unlike = 0.99 unlike = 0.99
# 如果_num_mb_chars 小于 6则 unlike 乘以 ONE_CHAR_PROB 的 _num_mb_chars 次方
if self._num_mb_chars < 6: if self._num_mb_chars < 6:
unlike *= self.ONE_CHAR_PROB ** self._num_mb_chars unlike *= self.ONE_CHAR_PROB ** self._num_mb_chars
# 返回 1.0 减去 unlike
return 1.0 - unlike return 1.0 - unlike
# 否则返回 unlike
else: else:
return unlike return unlike

@ -67,30 +67,47 @@ __all__ = ['AmbiguityError', 'CheckboxControl', 'Control',
'TextareaControl', 'XHTMLCompatibleFormParser'] 'TextareaControl', 'XHTMLCompatibleFormParser']
try: try:
# 尝试导入logging和inspect模块
import logging import logging
import inspect import inspect
except ImportError: except ImportError:
# 如果导入失败定义一个空的debug函数
def debug(msg, *args, **kwds): def debug(msg, *args, **kwds):
pass pass
else: else:
# 如果导入成功定义一个_logger对象
_logger = logging.getLogger("ClientForm") _logger = logging.getLogger("ClientForm")
# 定义一个优化hack变量
OPTIMIZATION_HACK = True OPTIMIZATION_HACK = True
# 定义一个debug函数
def debug(msg, *args, **kwds): def debug(msg, *args, **kwds):
# 如果优化hack为True则返回
if OPTIMIZATION_HACK: if OPTIMIZATION_HACK:
return return
# 获取调用者的函数名
caller_name = inspect.stack()[1][3] caller_name = inspect.stack()[1][3]
# 定义一个扩展的消息
extended_msg = '%%s %s' % msg extended_msg = '%%s %s' % msg
# 定义一个扩展的参数
extended_args = (caller_name,)+args extended_args = (caller_name,)+args
# 调用_logger对象的debug方法
debug = _logger.debug(extended_msg, *extended_args, **kwds) debug = _logger.debug(extended_msg, *extended_args, **kwds)
# 定义一个_show_debug_messages函数
def _show_debug_messages(): def _show_debug_messages():
# 定义一个全局变量OPTIMIZATION_HACK
global OPTIMIZATION_HACK global OPTIMIZATION_HACK
# 将优化hack设置为False
OPTIMIZATION_HACK = False OPTIMIZATION_HACK = False
# 将_logger对象的日志级别设置为DEBUG
_logger.setLevel(logging.DEBUG) _logger.setLevel(logging.DEBUG)
# 定义一个StreamHandler对象
handler = logging.StreamHandler(sys.stdout) handler = logging.StreamHandler(sys.stdout)
# 将StreamHandler对象的日志级别设置为DEBUG
handler.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG)
# 将StreamHandler对象添加到_logger对象中
_logger.addHandler(handler) _logger.addHandler(handler)
try: try:
@ -114,13 +131,17 @@ except ImportError:
import sys, re, random import sys, re, random
if sys.version_info >= (3, 0): if sys.version_info >= (3, 0):
# 如果Python版本大于等于3.0则将xrange替换为range
xrange = range xrange = range
# monkeypatch to fix http://www.python.org/sf/803422 :-( # monkeypatch to fix http://www.python.org/sf/803422 :-(
# 修补monkeypatch以修复http://www.python.org/sf/803422 :-(
sgmllib.charref = re.compile("&#(x?[0-9a-fA-F]+)[^0-9a-fA-F]") sgmllib.charref = re.compile("&#(x?[0-9a-fA-F]+)[^0-9a-fA-F]")
# HTMLParser.HTMLParser is recent, so live without it if it's not available # HTMLParser.HTMLParser is recent, so live without it if it's not available
# (also, sgmllib.SGMLParser is much more tolerant of bad HTML) # (also, sgmllib.SGMLParser is much more tolerant of bad HTML)
# HTMLParser.HTMLParser是最近的如果不可用则没有它
# 另外sgmllib.SGMLParser对不良HTML的容忍度更高
try: try:
import HTMLParser import HTMLParser
except ImportError: except ImportError:
@ -131,9 +152,11 @@ else:
try: try:
import warnings import warnings
except ImportError: except ImportError:
# 如果没有导入warnings模块则定义一个空函数
def deprecation(message, stack_offset=0): def deprecation(message, stack_offset=0):
pass pass
else: else:
# 如果成功导入warnings模块则定义一个警告函数
def deprecation(message, stack_offset=0): def deprecation(message, stack_offset=0):
warnings.warn(message, DeprecationWarning, stacklevel=3+stack_offset) warnings.warn(message, DeprecationWarning, stacklevel=3+stack_offset)
@ -224,29 +247,39 @@ string.
return '&'.join(l) return '&'.join(l)
def unescape(data, entities, encoding=DEFAULT_ENCODING): def unescape(data, entities, encoding=DEFAULT_ENCODING):
# 如果data为None或者data中不包含"&"则直接返回data
if data is None or "&" not in data: if data is None or "&" not in data:
return data return data
# 如果data是字符串类型则将encoding设置为None
if isinstance(data, six.string_types): if isinstance(data, six.string_types):
encoding = None encoding = None
# 定义一个函数,用于替换实体
def replace_entities(match, entities=entities, encoding=encoding): def replace_entities(match, entities=entities, encoding=encoding):
# 获取匹配到的实体
ent = match.group() ent = match.group()
# 如果实体以"#"开头则调用unescape_charref函数进行替换
if ent[1] == "#": if ent[1] == "#":
return unescape_charref(ent[2:-1], encoding) return unescape_charref(ent[2:-1], encoding)
# 从entities中获取实体的替换值
repl = entities.get(ent) repl = entities.get(ent)
# 如果替换值存在并且encoding不为None则尝试将替换值解码为字符串
if repl is not None: if repl is not None:
if hasattr(repl, "decode") and encoding is not None: if hasattr(repl, "decode") and encoding is not None:
try: try:
repl = repl.decode(encoding) repl = repl.decode(encoding)
except UnicodeError: except UnicodeError:
repl = ent repl = ent
# 如果替换值不存在,则将替换值设置为实体本身
else: else:
repl = ent repl = ent
# 返回替换值
return repl return repl
# 使用正则表达式替换data中的实体
return re.sub(r"&#?[A-Za-z0-9]+?;", replace_entities, data) return re.sub(r"&#?[A-Za-z0-9]+?;", replace_entities, data)
def unescape_charref(data, encoding): def unescape_charref(data, encoding):
@ -646,31 +679,47 @@ class _AbstractFormParser:
self._textarea = None self._textarea = None
def start_label(self, attrs): def start_label(self, attrs):
# 打印attrs
debug("%s", attrs) debug("%s", attrs)
# 如果当前标签存在,则结束标签
if self._current_label: if self._current_label:
self.end_label() self.end_label()
# 创建一个空字典
d = {} d = {}
# 遍历attrs
for key, val in attrs: for key, val in attrs:
# 如果val需要转义则进行转义
d[key] = self.unescape_attr_if_required(val) d[key] = self.unescape_attr_if_required(val)
# 如果存在for属性则taken为True
taken = bool(d.get("for")) # empty id is invalid taken = bool(d.get("for")) # empty id is invalid
# 添加__text属性值为空字符串
d["__text"] = "" d["__text"] = ""
# 添加__taken属性值为taken
d["__taken"] = taken d["__taken"] = taken
# 如果taken为True则将d添加到labels列表中
if taken: if taken:
self.labels.append(d) self.labels.append(d)
# 将当前标签设置为d
self._current_label = d self._current_label = d
def end_label(self): def end_label(self):
# 打印空字符串
debug("") debug("")
# 获取当前标签
label = self._current_label label = self._current_label
# 如果当前标签不存在,则返回
if label is None: if label is None:
# something is ugly in the HTML, but we're ignoring it # something is ugly in the HTML, but we're ignoring it
return return
# 将当前标签设置为None
self._current_label = None self._current_label = None
# 如果当前标签存在则删除__taken属性
# if it is staying around, it is True in all cases # if it is staying around, it is True in all cases
del label["__taken"] del label["__taken"]
def _add_label(self, d): def _add_label(self, d):
#debug("%s", d) #debug("%s", d)
# 如果当前标签存在且__taken属性为False则将__taken属性设置为True并将当前标签添加到d的__label属性中
if self._current_label is not None: if self._current_label is not None:
if not self._current_label["__taken"]: if not self._current_label["__taken"]:
self._current_label["__taken"] = True self._current_label["__taken"] = True
@ -743,12 +792,16 @@ class _AbstractFormParser:
controls.append((type, name, d)) controls.append((type, name, d))
def do_isindex(self, attrs): def do_isindex(self, attrs):
# 打印传入的属性
debug("%s", attrs) debug("%s", attrs)
d = {} d = {}
# 遍历属性,将属性名和属性值存入字典
for key, val in attrs: for key, val in attrs:
d[key] = self.unescape_attr_if_required(val) d[key] = self.unescape_attr_if_required(val)
# 获取当前表单的控件
controls = self._current_form[2] controls = self._current_form[2]
# 添加标签
self._add_label(d) self._add_label(d)
# isindex doesn't have type or name HTML attributes # isindex doesn't have type or name HTML attributes
controls.append(("isindex", None, d)) controls.append(("isindex", None, d))

@ -64,14 +64,16 @@ class Magic:
return magic_file(self.cookie, filename) return magic_file(self.cookie, filename)
def __del__(self): def __del__(self):
# during shutdown magic_close may have been cleared already # 析构函数,确保在对象被垃圾回收时关闭 libmagic cookie
if self.cookie and magic_close: if self.cookie and magic_close:
magic_close(self.cookie) magic_close(self.cookie)
self.cookie = None self.cookie = None
# 全局变量用于保存默认和MIME magic对象
_magic_mime = None _magic_mime = None
_magic = None _magic = None
# 获取默认和MIME magic对象的函数
def _get_magic_mime(): def _get_magic_mime():
global _magic_mime global _magic_mime
if not _magic_mime: if not _magic_mime:
@ -90,6 +92,7 @@ def _get_magic_type(mime):
else: else:
return _get_magic() return _get_magic()
# 公共函数,用于识别文件和缓冲区
def from_file(filename, mime=False): def from_file(filename, mime=False):
m = _get_magic_type(mime) m = _get_magic_type(mime)
return m.from_file(filename) return m.from_file(filename)
@ -98,6 +101,7 @@ def from_buffer(buffer, mime=False):
m = _get_magic_type(mime) m = _get_magic_type(mime)
return m.from_buffer(buffer) return m.from_buffer(buffer)
# 使用 ctypes 导入 libmagic 库
try: try:
libmagic = None libmagic = None
@ -106,7 +110,7 @@ try:
from ctypes import c_char_p, c_int, c_size_t, c_void_p from ctypes import c_char_p, c_int, c_size_t, c_void_p
# Let's try to find magic or magic1 # 尝试找到 libmagic 库
dll = ctypes.util.find_library('magic') or ctypes.util.find_library('magic1') dll = ctypes.util.find_library('magic') or ctypes.util.find_library('magic1')
# This is necessary because find_library returns None if it doesn't find the library # This is necessary because find_library returns None if it doesn't find the library
@ -116,6 +120,7 @@ try:
except WindowsError: except WindowsError:
pass pass
# 如果没有找到,尝试平台特定的路径
if not libmagic or not libmagic._name: if not libmagic or not libmagic._name:
platform_to_lib = {'darwin': ['/opt/local/lib/libmagic.dylib', platform_to_lib = {'darwin': ['/opt/local/lib/libmagic.dylib',
'/usr/local/lib/libmagic.dylib', '/usr/local/lib/libmagic.dylib',
@ -127,10 +132,12 @@ try:
except OSError: except OSError:
pass pass
# 如果仍然没有找到,抛出 ImportError
if not libmagic or not libmagic._name: if not libmagic or not libmagic._name:
# It is better to raise an ImportError since we are importing magic module # It is better to raise an ImportError since we are importing magic module
raise ImportError('failed to find libmagic. Check your installation') raise ImportError('failed to find libmagic. Check your installation')
# 定义 magic_t 类型和错误检查函数
magic_t = ctypes.c_void_p magic_t = ctypes.c_void_p
def errorcheck(result, func, args): def errorcheck(result, func, args):
@ -145,6 +152,7 @@ try:
return None return None
return filename.encode(sys.getfilesystemencoding()) return filename.encode(sys.getfilesystemencoding())
# 使用 ctypes 定义 libmagic 函数
magic_open = libmagic.magic_open magic_open = libmagic.magic_open
magic_open.restype = magic_t magic_open.restype = magic_t
magic_open.argtypes = [c_int] magic_open.argtypes = [c_int]
@ -198,28 +206,31 @@ try:
magic_compile.restype = c_int magic_compile.restype = c_int
magic_compile.argtypes = [magic_t, c_char_p] magic_compile.argtypes = [magic_t, c_char_p]
# 如果 libmagic 无法导入,定义回退函数
except (ImportError, OSError): except (ImportError, OSError):
from_file = from_buffer = lambda *args, **kwargs: MAGIC_UNKNOWN_FILETYPE from_file = from_buffer = lambda *args, **kwargs: MAGIC_UNKNOWN_FILETYPE
MAGIC_NONE = 0x000000 # No flags
MAGIC_DEBUG = 0x000001 # Turn on debugging # 定义 libmagic 标志常量
MAGIC_SYMLINK = 0x000002 # Follow symlinks MAGIC_NONE = 0x000000 # 无标志
MAGIC_COMPRESS = 0x000004 # Check inside compressed files MAGIC_DEBUG = 0x000001 # 打开调试
MAGIC_DEVICES = 0x000008 # Look at the contents of devices MAGIC_SYMLINK = 0x000002 # 跟随符号链接
MAGIC_MIME = 0x000010 # Return a mime string MAGIC_COMPRESS = 0x000004 # 检查压缩文件内部
MAGIC_MIME_ENCODING = 0x000400 # Return the MIME encoding MAGIC_DEVICES = 0x000008 # 查看设备内容
MAGIC_CONTINUE = 0x000020 # Return all matches MAGIC_MIME = 0x000010 # 返回 MIME 字符串
MAGIC_CHECK = 0x000040 # Print warnings to stderr MAGIC_MIME_ENCODING = 0x000400 # 返回 MIME 编码
MAGIC_PRESERVE_ATIME = 0x000080 # Restore access time on exit MAGIC_CONTINUE = 0x000020 # 返回所有匹配项
MAGIC_RAW = 0x000100 # Don't translate unprintable chars MAGIC_CHECK = 0x000040 # 打印警告到标准错误
MAGIC_ERROR = 0x000200 # Handle ENOENT etc as real errors MAGIC_PRESERVE_ATIME = 0x000080 # 退出时恢复访问时间
MAGIC_NO_CHECK_COMPRESS = 0x001000 # Don't check for compressed files MAGIC_RAW = 0x000100 # 不转换不可打印字符
MAGIC_NO_CHECK_TAR = 0x002000 # Don't check for tar files MAGIC_ERROR = 0x000200 # 将 ENOENT 等视为真实错误
MAGIC_NO_CHECK_SOFT = 0x004000 # Don't check magic entries MAGIC_NO_CHECK_COMPRESS = 0x001000 # 不检查压缩文件
MAGIC_NO_CHECK_APPTYPE = 0x008000 # Don't check application type MAGIC_NO_CHECK_TAR = 0x002000 # 不检查 tar 文件
MAGIC_NO_CHECK_ELF = 0x010000 # Don't check for elf details MAGIC_NO_CHECK_SOFT = 0x004000 # 不检查 magic 条目
MAGIC_NO_CHECK_ASCII = 0x020000 # Don't check for ascii files MAGIC_NO_CHECK_APPTYPE = 0x008000 # 不检查应用程序类型
MAGIC_NO_CHECK_TROFF = 0x040000 # Don't check ascii/troff MAGIC_NO_CHECK_ELF = 0x010000 # 不检查 elf 详细信息
MAGIC_NO_CHECK_FORTRAN = 0x080000 # Don't check ascii/fortran MAGIC_NO_CHECK_ASCII = 0x020000 # 不检查 ascii 文件
MAGIC_NO_CHECK_TOKENS = 0x100000 # Don't check ascii/tokens MAGIC_NO_CHECK_TROFF = 0x040000 # 不检查 ascii/troff
MAGIC_NO_CHECK_FORTRAN = 0x080000 # 不检查 ascii/fortran
MAGIC_NO_CHECK_TOKENS = 0x100000 # 不检查 ascii/tokens
MAGIC_UNKNOWN_FILETYPE = b"unknown" MAGIC_UNKNOWN_FILETYPE = b"unknown"

@ -8,14 +8,15 @@ import socket
import ctypes import ctypes
import os import os
# 定义一个结构体用于存储socket地址信息
class sockaddr(ctypes.Structure): class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short), _fields_ = [("sa_family", ctypes.c_short), # 地址族例如AF_INET或AF_INET6
("__pad1", ctypes.c_ushort), ("__pad1", ctypes.c_ushort), # 填充字段
("ipv4_addr", ctypes.c_byte * 4), ("ipv4_addr", ctypes.c_byte * 4), # IPv4地址4个字节
("ipv6_addr", ctypes.c_byte * 16), ("ipv6_addr", ctypes.c_byte * 16),# IPv6地址16个字节
("__pad2", ctypes.c_ulong)] ("__pad2", ctypes.c_ulong)] # 填充字段
# 根据操作系统的不同,导入不同的库
if hasattr(ctypes, 'windll'): if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
@ -27,12 +28,13 @@ else:
WSAStringToAddressA = not_windows WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows WSAAddressToStringA = not_windows
# inet_pton函数将IP字符串转换为二进制格式
def inet_pton(address_family, ip_string): def inet_pton(address_family, ip_string):
addr = sockaddr() addr = sockaddr() # 创建sockaddr实例
addr.sa_family = address_family addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 获取地址结构体大小
# 使用WSAStringToAddressA函数将IP字符串转换为地址结构体
if WSAStringToAddressA( if WSAStringToAddressA(
ip_string, ip_string,
address_family, address_family,
@ -42,6 +44,7 @@ def inet_pton(address_family, ip_string):
) != 0: ) != 0:
raise socket.error(ctypes.FormatError()) raise socket.error(ctypes.FormatError())
# 根据地址族返回对应的二进制IP地址
if address_family == socket.AF_INET: if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4) return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6: if address_family == socket.AF_INET6:
@ -49,14 +52,15 @@ def inet_pton(address_family, ip_string):
raise socket.error('unknown address family') raise socket.error('unknown address family')
# inet_ntop函数将二进制格式的IP地址转换为字符串
def inet_ntop(address_family, packed_ip): def inet_ntop(address_family, packed_ip):
addr = sockaddr() addr = sockaddr() # 创建sockaddr实例
addr.sa_family = address_family addr.sa_family = address_family # 设置地址族
addr_size = ctypes.c_int(ctypes.sizeof(addr)) addr_size = ctypes.c_int(ctypes.sizeof(addr)) # 获取地址结构体大小
ip_string = ctypes.create_string_buffer(128) ip_string = ctypes.create_string_buffer(128) # 创建字符串缓冲区
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) # 获取字符串缓冲区大小
# 根据地址族将二进制IP地址复制到地址结构体中
if address_family == socket.AF_INET: if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa') raise socket.error('packed IP wrong length for inet_ntoa')
@ -68,6 +72,7 @@ def inet_ntop(address_family, packed_ip):
else: else:
raise socket.error('unknown address family') raise socket.error('unknown address family')
# 使用WSAAddressToStringA函数将地址结构体转换为IP字符串
if WSAAddressToStringA( if WSAAddressToStringA(
ctypes.byref(addr), ctypes.byref(addr),
addr_size, addr_size,
@ -79,7 +84,7 @@ def inet_ntop(address_family, packed_ip):
return ip_string[:ip_string_size.value - 1] return ip_string[:ip_string_size.value - 1]
# Adding our two functions to the socket library # 如果当前操作系统是Windows将自定义的inet_pton和inet_ntop函数添加到socket库中
if os.name == 'nt': if os.name == 'nt':
socket.inet_pton = inet_pton socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop socket.inet_ntop = inet_ntop
Loading…
Cancel
Save