sunninghao_branch
snh 2 months ago
parent 28f0660564
commit 57f5840b20

@ -29,9 +29,12 @@ __license__ = 'MIT'
def _cli_parse(args): # pragma: no coverage
# 导入ArgumentParser模块
from argparse import ArgumentParser
# 创建ArgumentParser对象设置程序名称和用法
parser = ArgumentParser(prog=args[0], usage="%(prog)s [options] package.module:app")
# 添加参数
opt = parser.add_argument
opt("--version", action="store_true", help="show version number.")
opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.")
@ -45,6 +48,7 @@ def _cli_parse(args): # pragma: no coverage
opt("--reload", action="store_true", help="auto-reload on file changes.")
opt('app', help='WSGI app entry point.', nargs='?')
# 解析命令行参数
cli_args = parser.parse_args(args[1:])
return cli_args, parser
@ -179,7 +183,9 @@ def depr(major, minor, cause, fix):
def makelist(data): # This is just too handy
# 判断data是否为元组、列表、集合或字典类型
if isinstance(data, (tuple, list, set, dict)):
# 如果是则返回data的列表形式
return list(data)
elif data:
return [data]
@ -198,18 +204,24 @@ class DictProperty(object):
self.getter, self.key = func, self.key or func.__name__
return self
# 如果obj为None则返回self
def __get__(self, obj, cls):
# 获取属性名和存储对象
if obj is None: return self
# 如果属性名不在存储对象中则调用getter方法获取值并存储
key, storage = self.key, getattr(obj, self.attr)
if key not in storage: storage[key] = self.getter(obj)
return storage[key]
# 如果属性是只读的则抛出AttributeError异常
def __set__(self, obj, value):
if self.read_only: raise AttributeError("Read-Only property.")
getattr(obj, self.attr)[self.key] = value
def __delete__(self, obj):
# 如果属性是只读的则抛出AttributeError异常
if self.read_only: raise AttributeError("Read-Only property.")
# 从存储对象中删除对应的值
del getattr(obj, self.attr)[self.key]
@ -737,26 +749,38 @@ class Bottle(object):
self.route('/' + '/'.join(segments), **options)
def _mount_app(self, prefix, app, **options):
# 检查app是否已经被挂载或者app的config中是否已经存在'_mount.app'键
if app in self._mounts or '_mount.app' in app.config:
# 如果app已经被挂载或者app的config中已经存在'_mount.app'键则发出警告并回退到WSGI挂载
depr(0, 13, "Application mounted multiple times. Falling back to WSGI mount.",
"Clone application before mounting to a different location.")
return self._mount_wsgi(prefix, app, **options)
# 检查options是否为空
if options:
# 如果options不为空则发出警告并回退到WSGI挂载
depr(0, 13, "Unsupported mount options. Falling back to WSGI mount.",
"Do not specify any route options when mounting bottle application.")
return self._mount_wsgi(prefix, app, **options)
# 检查prefix是否以'/'结尾
if not prefix.endswith("/"):
# 如果prefix不以'/'结尾则发出警告并回退到WSGI挂载
depr(0, 13, "Prefix must end in '/'. Falling back to WSGI mount.",
"Consider adding an explicit redirect from '/prefix' to '/prefix/' in the parent application.")
return self._mount_wsgi(prefix, app, **options)
# 将app添加到_mounts列表中
self._mounts.append(app)
# 将prefix添加到app的config中
app.config['_mount.prefix'] = prefix
# 将self添加到app的config中
app.config['_mount.app'] = self
# 遍历app的routes
for route in app.routes:
# 将route的rule修改为prefix + route.rule.lstrip('/')
route.rule = prefix + route.rule.lstrip('/')
# 将修改后的route添加到self的routes中
self.add_route(route)
def mount(self, prefix, app, **options):
@ -781,11 +805,15 @@ class Bottle(object):
parent application.
"""
# 检查prefix是否以'/'开头
if not prefix.startswith('/'):
# 如果prefix不以'/'开头则抛出ValueError异常
raise ValueError("Prefix must start with '/'")
# 如果app是Bottle实例则调用_mount_app方法
if isinstance(app, Bottle):
return self._mount_app(prefix, app, **options)
# 否则调用_mount_wsgi方法
else:
return self._mount_wsgi(prefix, app, **options)
@ -1089,31 +1117,46 @@ class Bottle(object):
def wsgi(self, environ, start_response):
""" The bottle WSGI-interface. """
try:
# 将environ传递给_handle方法获取返回值
out = self._cast(self._handle(environ))
# rfc2616 section 4.3
# 如果返回的状态码是100, 101, 204, 304或者请求方法是HEAD则关闭输出流
if response._status_code in (100, 101, 204, 304)\
or environ['REQUEST_METHOD'] == 'HEAD':
if hasattr(out, 'close'): out.close()
out = []
# 获取environ中的bottle.exc_info
exc_info = environ.get('bottle.exc_info')
# 如果有异常信息则删除environ中的bottle.exc_info
if exc_info is not None:
del environ['bottle.exc_info']
# 调用start_response方法设置响应状态行、响应头和异常信息
start_response(response._wsgi_status_line(), response.headerlist, exc_info)
# 返回输出流
return out
except (KeyboardInterrupt, SystemExit, MemoryError):
# 如果捕获到KeyboardInterrupt, SystemExit, MemoryError异常则抛出
raise
except Exception as E:
# 如果没有开启catchall则抛出异常
if not self.catchall: raise
# 构造错误页面
err = '<h1>Critical error while processing request: %s</h1>' \
% html_escape(environ.get('PATH_INFO', '/'))
# 如果开启了DEBUG模式则输出错误信息和堆栈信息
if DEBUG:
err += '<h2>Error:</h2>\n<pre>\n%s\n</pre>\n' \
'<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n' \
% (html_escape(repr(E)), html_escape(format_exc()))
# 将错误页面写入environ中的wsgi.errors
environ['wsgi.errors'].write(err)
# 刷新wsgi.errors
environ['wsgi.errors'].flush()
# 设置响应头
headers = [('Content-Type', 'text/html; charset=UTF-8')]
# 调用start_response方法设置响应状态行、响应头和异常信息
start_response('500 INTERNAL SERVER ERROR', headers, sys.exc_info())
# 返回错误页面
return [tob(err)]
def __call__(self, environ, start_response):

@ -32,10 +32,15 @@ from .mbcssm import BIG5_SM_MODEL
class Big5Prober(MultiByteCharSetProber):
# 初始化Big5Prober类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(Big5Prober, self).__init__()
# 初始化Big5编码状态机
self.coding_sm = CodingStateMachine(BIG5_SM_MODEL)
# 初始化Big5分布分析器
self.distribution_analyzer = Big5DistributionAnalysis()
# 重置Big5Prober类
self.reset()
@property

@ -30,69 +30,126 @@ from .charsetprober import CharSetProber
class CharSetGroupProber(CharSetProber):
# 初始化函数,传入语言过滤器
def __init__(self, lang_filter=None):
# 调用父类的初始化函数
super(CharSetGroupProber, self).__init__(lang_filter=lang_filter)
# 初始化活动探测器数量
self._active_num = 0
# 初始化探测器列表
self.probers = []
# 初始化最佳猜测探测器
self._best_guess_prober = None
# 重置函数
def reset(self):
# 调用父类的重置函数
super(CharSetGroupProber, self).reset()
# 重置活动探测器数量
self._active_num = 0
# 遍历探测器列表
for prober in self.probers:
# 如果探测器存在
if prober:
# 重置探测器
prober.reset()
# 设置探测器为活动状态
prober.active = True
# 活动探测器数量加一
self._active_num += 1
# 重置最佳猜测探测器
self._best_guess_prober = None
# 获取字符集名称的属性函数
@property
def charset_name(self):
# 如果最佳猜测探测器不存在
if not self._best_guess_prober:
# 调用获取置信度函数
self.get_confidence()
# 如果最佳猜测探测器仍然不存在
if not self._best_guess_prober:
# 返回None
return None
# 返回最佳猜测探测器的字符集名称
return self._best_guess_prober.charset_name
# 获取语言的属性函数
@property
def language(self):
# 如果最佳猜测探测器不存在
if not self._best_guess_prober:
# 调用获取置信度函数
self.get_confidence()
# 如果最佳猜测探测器仍然不存在
if not self._best_guess_prober:
# 返回None
return None
# 返回最佳猜测探测器的语言
return self._best_guess_prober.language
# 接收字节字符串的函数
def feed(self, byte_str):
# 遍历探测器列表
for prober in self.probers:
# 如果探测器不存在
if not prober:
# 跳过
continue
# 如果探测器不是活动状态
if not prober.active:
# 跳过
continue
# 调用探测器接收字节字符串的函数
state = prober.feed(byte_str)
# 如果探测器返回的状态不是FOUND_IT
if not state:
# 跳过
continue
# 如果探测器返回的状态是FOUND_IT
if state == ProbingState.FOUND_IT:
# 设置最佳猜测探测器为当前探测器
self._best_guess_prober = prober
# 返回当前探测器的状态
return self.state
# 如果探测器返回的状态是NOT_ME
elif state == ProbingState.NOT_ME:
# 设置探测器为非活动状态
prober.active = False
# 活动探测器数量减一
self._active_num -= 1
# 如果活动探测器数量小于等于0
if self._active_num <= 0:
# 设置当前探测器的状态为NOT_ME
self._state = ProbingState.NOT_ME
# 返回当前探测器的状态
return self.state
# 返回当前探测器的状态
return self.state
# 获取置信度的函数
def get_confidence(self):
# 获取当前探测器的状态
state = self.state
# 如果当前探测器的状态是FOUND_IT
if state == ProbingState.FOUND_IT:
# 返回0.99
return 0.99
# 如果当前探测器的状态是NOT_ME
elif state == ProbingState.NOT_ME:
# 返回0.01
return 0.01
# 初始化最佳置信度
best_conf = 0.0
# 重置最佳猜测探测器
self._best_guess_prober = None
# 遍历探测器列表
for prober in self.probers:
# 如果探测器不存在
if not prober:
# 跳过
continue
# 如果探测器不是活动状态
if not prober.active:
self.logger.debug('%s not active', prober.charset_name)
continue

@ -34,32 +34,42 @@ from .enums import ProbingState
class CharSetProber(object):
# 定义一个阈值,当检测到的字符集概率大于这个值时,认为检测成功
SHORTCUT_THRESHOLD = 0.95
def __init__(self, lang_filter=None):
# 初始化状态为检测中
self._state = None
# 设置语言过滤器
self.lang_filter = lang_filter
# 获取日志记录器
self.logger = logging.getLogger(__name__)
def reset(self):
# 重置状态为检测中
self._state = ProbingState.DETECTING
@property
def charset_name(self):
# 返回字符集名称这里返回None
return None
def feed(self, buf):
# 接收输入的缓冲区
pass
@property
def state(self):
# 返回当前状态
return self._state
def get_confidence(self):
# 返回检测到的字符集的概率这里返回0.0
return 0.0
@staticmethod
def filter_high_byte_only(buf):
# 过滤掉所有非高字节字符
buf = re.sub(b'([\x00-\x7F])+', b' ', buf)
return buf

@ -53,20 +53,29 @@ class CodingStateMachine(object):
encoding from consideration from here on.
"""
def __init__(self, sm):
# 初始化函数sm为传入的模型
self._model = sm
# 当前字节位置
self._curr_byte_pos = 0
# 当前字符长度
self._curr_char_len = 0
# 当前状态
self._curr_state = None
# 获取logger
self.logger = logging.getLogger(__name__)
# 重置
self.reset()
def reset(self):
# 重置函数,将当前状态设置为起始状态
self._curr_state = MachineState.START
def next_state(self, c):
# for each byte we get its class
# if it is first byte, we also get byte length
# 获取当前字节的类别
byte_class = self._model['class_table'][c]
# 如果当前状态为起始状态,则获取当前字符长度
if self._curr_state == MachineState.START:
self._curr_byte_pos = 0
self._curr_char_len = self._model['char_len_table'][byte_class]

@ -22,13 +22,20 @@
import sys
# 判断当前Python版本是否小于3.0
if sys.version_info < (3, 0):
# 如果是Python2版本
PY2 = True
PY3 = False
# 定义base_str为str和unicode类型
base_str = (str, unicode)
# 定义text_type为unicode类型
text_type = unicode
else:
# 如果是Python3版本
PY2 = False
PY3 = True
# 定义base_str为bytes和str类型
base_str = (bytes, str)
# 定义text_type为str类型
text_type = str

@ -40,62 +40,95 @@ class EscCharSetProber(CharSetProber):
"""
def __init__(self, lang_filter=None):
# 初始化EscCharSetProber类
super(EscCharSetProber, self).__init__(lang_filter=lang_filter)
# 初始化编码状态机列表
self.coding_sm = []
# 如果语言过滤器包含简体中文
if self.lang_filter & LanguageFilter.CHINESE_SIMPLIFIED:
# 添加简体中文编码状态机
self.coding_sm.append(CodingStateMachine(HZ_SM_MODEL))
# 添加ISO2022CN编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022CN_SM_MODEL))
# 如果语言过滤器包含日语
if self.lang_filter & LanguageFilter.JAPANESE:
# 添加ISO2022JP编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022JP_SM_MODEL))
# 如果语言过滤器包含韩语
if self.lang_filter & LanguageFilter.KOREAN:
# 添加ISO2022KR编码状态机
self.coding_sm.append(CodingStateMachine(ISO2022KR_SM_MODEL))
# 初始化活动状态机数量
self.active_sm_count = None
# 初始化检测到的字符集
self._detected_charset = None
# 初始化检测到的语言
self._detected_language = None
# 初始化状态
self._state = None
# 重置
self.reset()
def reset(self):
# 重置EscCharSetProber类
super(EscCharSetProber, self).reset()
# 遍历编码状态机列表
for coding_sm in self.coding_sm:
# 如果编码状态机为空,则跳过
if not coding_sm:
continue
# 设置编码状态机为活动状态
coding_sm.active = True
# 重置编码状态机
coding_sm.reset()
# 设置活动状态机数量为编码状态机列表的长度
self.active_sm_count = len(self.coding_sm)
# 设置检测到的字符集为空
self._detected_charset = None
# 设置检测到的语言为空
self._detected_language = None
@property
def charset_name(self):
# 返回检测到的字符集
return self._detected_charset
@property
def language(self):
# 返回检测到的语言
return self._detected_language
def get_confidence(self):
# 如果检测到了字符集则返回0.99否则返回0.00
if self._detected_charset:
return 0.99
else:
return 0.00
def feed(self, byte_str):
# 遍历字节字符串
for c in byte_str:
# 遍历编码状态机列表
for coding_sm in self.coding_sm:
# 如果编码状态机为空或非活动状态,则跳过
if not coding_sm or not coding_sm.active:
continue
# 获取编码状态机的下一个状态
coding_state = coding_sm.next_state(c)
# 如果状态为错误,则设置编码状态机为非活动状态,活动状态机数量减一
if coding_state == MachineState.ERROR:
coding_sm.active = False
self.active_sm_count -= 1
# 如果活动状态机数量小于等于0则设置状态为非匹配
if self.active_sm_count <= 0:
self._state = ProbingState.NOT_ME
return self.state
# 如果状态为匹配,则设置状态为匹配,设置检测到的字符集和语言
elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT
self._detected_charset = coding_sm.get_coding_state_machine()
self._detected_language = coding_sm.language
return self.state
# 返回状态
return self.state

@ -34,59 +34,90 @@ from .mbcssm import EUCJP_SM_MODEL
class EUCJPProber(MultiByteCharSetProber):
# 初始化EUCJPProber类
def __init__(self):
super(EUCJPProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCJP_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCJPDistributionAnalysis()
# 初始化上下文分析器
self.context_analyzer = EUCJPContextAnalysis()
# 重置
self.reset()
# 重置
def reset(self):
super(EUCJPProber, self).reset()
self.context_analyzer.reset()
# 获取字符集名称
@property
def charset_name(self):
return "EUC-JP"
# 获取语言
@property
def language(self):
return "Japanese"
# 输入字节流
def feed(self, byte_str):
for i in range(len(byte_str)):
# PY3K: byte_str is a byte array, so byte_str[i] is an int, not a byte
# 获取下一个状态
coding_state = self.coding_sm.next_state(byte_str[i])
# 如果状态为错误
if coding_state == MachineState.ERROR:
self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i)
# 设置状态为不是该字符集
self._state = ProbingState.NOT_ME
break
# 如果状态为确定
elif coding_state == MachineState.ITS_ME:
# 设置状态为确定
self._state = ProbingState.FOUND_IT
break
# 如果状态为开始
elif coding_state == MachineState.START:
# 获取当前字符长度
char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字符
if i == 0:
# 更新最后一个字符
self._last_char[1] = byte_str[0]
# 输入最后一个字符和当前字符长度到上下文分析器
self.context_analyzer.feed(self._last_char, char_len)
# 输入最后一个字符和当前字符长度到分布分析器
self.distribution_analyzer.feed(self._last_char, char_len)
else:
# 输入前一个字符和当前字符到上下文分析器
self.context_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 输入前一个字符和当前字符到分布分析器
self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 更新最后一个字符
self._last_char[0] = byte_str[-1]
# 如果状态为检测中
if self.state == ProbingState.DETECTING:
# 如果上下文分析器有足够的数据,并且置信度大于阈值
if (self.context_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 设置状态为确定
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
# 获取置信度
def get_confidence(self):
# 获取上下文分析器的置信度
context_conf = self.context_analyzer.get_confidence()
# 获取分布分析器的置信度
distrib_conf = self.distribution_analyzer.get_confidence()
# 返回最大置信度
return max(context_conf, distrib_conf)

@ -32,16 +32,23 @@ from .mbcssm import EUCKR_SM_MODEL
class EUCKRProber(MultiByteCharSetProber):
# 初始化EUCKRProber类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(EUCKRProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCKR_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCKRDistributionAnalysis()
# 重置
self.reset()
# 获取字符集名称
@property
def charset_name(self):
return "EUC-KR"
# 获取语言
@property
def language(self):
return "Korean"

@ -31,16 +31,23 @@ from .chardistribution import EUCTWDistributionAnalysis
from .mbcssm import EUCTW_SM_MODEL
class EUCTWProber(MultiByteCharSetProber):
# 初始化EUCTWProber类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(EUCTWProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(EUCTW_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = EUCTWDistributionAnalysis()
# 重置
self.reset()
# 获取字符集名称
@property
def charset_name(self):
return "EUC-TW"
# 获取语言
@property
def language(self):
return "Taiwan"

@ -31,16 +31,23 @@ from .chardistribution import GB2312DistributionAnalysis
from .mbcssm import GB2312_SM_MODEL
class GB2312Prober(MultiByteCharSetProber):
# 初始化GB2312Prober类
def __init__(self):
# 调用父类MultiByteCharSetProber的初始化方法
super(GB2312Prober, self).__init__()
# 初始化GB2312编码状态机
self.coding_sm = CodingStateMachine(GB2312_SM_MODEL)
# 初始化GB2312分布分析器
self.distribution_analyzer = GB2312DistributionAnalysis()
# 重置
self.reset()
# 获取字符集名称
@property
def charset_name(self):
return "GB2312"
# 获取语言
@property
def language(self):
return "Chinese"

@ -152,17 +152,27 @@ class HebrewProber(CharSetProber):
LOGICAL_HEBREW_NAME = "windows-1255"
def __init__(self):
# 初始化HebrewProber类
super(HebrewProber, self).__init__()
# 初始化_final_char_logical_score为None
self._final_char_logical_score = None
# 初始化_final_char_visual_score为None
self._final_char_visual_score = None
# 初始化_prev为None
self._prev = None
# 初始化_before_prev为None
self._before_prev = None
# 初始化_logical_prober为None
self._logical_prober = None
# 初始化_visual_prober为None
self._visual_prober = None
# 调用reset方法
self.reset()
def reset(self):
# 重置_final_char_logical_score为0
self._final_char_logical_score = 0
# 重置_final_char_visual_score为0
self._final_char_visual_score = 0
# The two last characters seen in the previous buffer,
# mPrev and mBeforePrev are initialized to space in order to simulate

@ -37,17 +37,28 @@ class MultiByteCharSetProber(CharSetProber):
"""
def __init__(self, lang_filter=None):
# 初始化函数传入参数lang_filter
super(MultiByteCharSetProber, self).__init__(lang_filter=lang_filter)
# 调用父类的初始化函数
self.distribution_analyzer = None
# 初始化分布分析器
self.coding_sm = None
# 初始化编码状态机
self._last_char = [0, 0]
# 初始化最后一个字符
def reset(self):
# 重置函数
super(MultiByteCharSetProber, self).reset()
# 调用父类的重置函数
if self.coding_sm:
# 如果编码状态机存在
self.coding_sm.reset()
# 重置编码状态机
if self.distribution_analyzer:
# 如果分布分析器存在
self.distribution_analyzer.reset()
# 重置分布分析器
self._last_char = [0, 0]
@property
@ -59,33 +70,45 @@ class MultiByteCharSetProber(CharSetProber):
raise NotImplementedError
def feed(self, byte_str):
# 遍历byte_str中的每个字节
for i in range(len(byte_str)):
# 获取当前字节的编码状态
coding_state = self.coding_sm.next_state(byte_str[i])
# 如果编码状态为错误则记录错误信息并将状态设置为NOT_ME
if coding_state == MachineState.ERROR:
self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i)
self._state = ProbingState.NOT_ME
break
# 如果编码状态为确定则将状态设置为FOUND_IT
elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT
break
# 如果编码状态为开始,则获取当前字符长度
elif coding_state == MachineState.START:
char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字节则将当前字节和上一个字节作为参数传入feed方法
if i == 0:
self._last_char[1] = byte_str[0]
self.distribution_analyzer.feed(self._last_char, char_len)
# 否则将当前字节和上一个字节作为参数传入feed方法
else:
self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 将最后一个字节赋值给_last_char[0]
self._last_char[0] = byte_str[-1]
# 如果状态为DETECTING则判断是否已经获取足够的数据并且置信度是否大于SHORTCUT_THRESHOLD
if self.state == ProbingState.DETECTING:
if (self.distribution_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 如果满足条件则将状态设置为FOUND_IT
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
def get_confidence(self):
# 获取置信度
return self.distribution_analyzer.get_confidence()

@ -39,16 +39,20 @@ from .euctwprober import EUCTWProber
class MBCSGroupProber(CharSetGroupProber):
# 初始化MBCSGroupProber类继承自CharSetGroupProber类
def __init__(self, lang_filter=None):
# 调用父类CharSetGroupProber的初始化方法
super(MBCSGroupProber, self).__init__(lang_filter=lang_filter)
# 定义一个包含多种字符集探测器的列表
self.probers = [
UTF8Prober(),
SJISProber(),
EUCJPProber(),
GB2312Prober(),
EUCKRProber(),
CP949Prober(),
Big5Prober(),
EUCTWProber()
UTF8Prober(), # UTF-8字符集探测器
SJISProber(), # Shift_JIS字符集探测器
EUCJPProber(), # EUC-JP字符集探测器
GB2312Prober(), # GB2312字符集探测器
EUCKRProber(), # EUCKR字符集探测器
CP949Prober(), # CP949字符集探测器
Big5Prober(), # Big5字符集探测器
EUCTWProber() # EUCTW字符集探测器
]
# 重置探测器
self.reset()

@ -31,13 +31,19 @@ from .enums import CharacterCategory, ProbingState, SequenceLikelihood
class SingleByteCharSetProber(CharSetProber):
# 定义样本大小
SAMPLE_SIZE = 64
# 定义相对阈值
SB_ENOUGH_REL_THRESHOLD = 1024 # 0.25 * SAMPLE_SIZE^2
# 定义正向阈值
POSITIVE_SHORTCUT_THRESHOLD = 0.95
# 定义负向阈值
NEGATIVE_SHORTCUT_THRESHOLD = 0.05
def __init__(self, model, reversed=False, name_prober=None):
# 调用父类构造函数
super(SingleByteCharSetProber, self).__init__()
# 设置模型
self._model = model
# TRUE if we need to reverse every pair in the model lookup
self._reversed = reversed
@ -51,6 +57,7 @@ class SingleByteCharSetProber(CharSetProber):
self.reset()
def reset(self):
# 重置函数
super(SingleByteCharSetProber, self).reset()
# char order of last character
self._last_order = 255
@ -69,16 +76,20 @@ class SingleByteCharSetProber(CharSetProber):
@property
def language(self):
# 如果_name_prober存在则返回_name_prober的语言否则返回_model中的语言
if self._name_prober:
return self._name_prober.language
else:
return self._model.get('language')
def feed(self, byte_str):
# 如果_model中的keep_english_letter为False则过滤掉国际字符
if not self._model['keep_english_letter']:
byte_str = self.filter_international_words(byte_str)
# 如果byte_str为空则返回状态
if not byte_str:
return self.state
# 获取字符到顺序的映射
char_to_order_map = self._model['char_to_order_map']
for i, c in enumerate(byte_str):
# XXX: Order is in range 1-64, so one would think we want 0-63 here,
@ -122,11 +133,17 @@ class SingleByteCharSetProber(CharSetProber):
return self.state
def get_confidence(self):
# 初始化r为0.01
r = 0.01
# 如果总序列数大于0
if self._total_seqs > 0:
# 计算r的值
r = ((1.0 * self._seq_counters[SequenceLikelihood.POSITIVE]) /
self._total_seqs / self._model['typical_positive_ratio'])
# 乘以字符频率和总字符数
r = r * self._freq_char / self._total_char
# 如果r大于等于1.0则将r设置为0.99
if r >= 1.0:
r = 0.99
# 返回r的值
return r

@ -34,59 +34,94 @@ from .enums import ProbingState, MachineState
class SJISProber(MultiByteCharSetProber):
# 初始化函数
def __init__(self):
# 调用父类的初始化函数
super(SJISProber, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(SJIS_SM_MODEL)
# 初始化分布分析器
self.distribution_analyzer = SJISDistributionAnalysis()
# 初始化上下文分析器
self.context_analyzer = SJISContextAnalysis()
# 重置分析器
self.reset()
# 重置函数
def reset(self):
# 调用父类的重置函数
super(SJISProber, self).reset()
# 重置上下文分析器
self.context_analyzer.reset()
@property
def charset_name(self):
# 返回字符集名称
return self.context_analyzer.charset_name
@property
def language(self):
# 返回语言
return "Japanese"
def feed(self, byte_str):
# 遍历字节字符串
for i in range(len(byte_str)):
# 获取下一个状态
coding_state = self.coding_sm.next_state(byte_str[i])
# 如果状态为错误
if coding_state == MachineState.ERROR:
# 记录错误日志
self.logger.debug('%s %s prober hit error at byte %s',
self.charset_name, self.language, i)
# 设置状态为不是该字符集
self._state = ProbingState.NOT_ME
break
# 如果状态为确定
elif coding_state == MachineState.ITS_ME:
# 设置状态为确定
self._state = ProbingState.FOUND_IT
break
# 如果状态为开始
elif coding_state == MachineState.START:
# 获取当前字符长度
char_len = self.coding_sm.get_current_charlen()
# 如果是第一个字符
if i == 0:
# 更新最后一个字符
self._last_char[1] = byte_str[0]
# 向上下文分析器输入字符
self.context_analyzer.feed(self._last_char[2 - char_len:],
char_len)
# 向分布分析器输入字符
self.distribution_analyzer.feed(self._last_char, char_len)
else:
# 向上下文分析器输入字符
self.context_analyzer.feed(byte_str[i + 1 - char_len:i + 3
- char_len], char_len)
# 向分布分析器输入字符
self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
char_len)
# 更新最后一个字符
self._last_char[0] = byte_str[-1]
# 如果状态为检测中
if self.state == ProbingState.DETECTING:
# 如果上下文分析器有足够的数据,并且置信度大于阈值
if (self.context_analyzer.got_enough_data() and
(self.get_confidence() > self.SHORTCUT_THRESHOLD)):
# 设置状态为确定
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
# 获取置信度
def get_confidence(self):
# 获取上下文分析器的置信度
context_conf = self.context_analyzer.get_confidence()
# 获取分布分析器的置信度
distrib_conf = self.distribution_analyzer.get_confidence()
# 返回上下文置信度和分布置信度中的最大值
return max(context_conf, distrib_conf)

@ -79,16 +79,27 @@ class UniversalDetector(object):
'iso-8859-13': 'Windows-1257'}
def __init__(self, lang_filter=LanguageFilter.ALL):
# 初始化语言过滤器
self._esc_charset_prober = None
# 初始化字符集探测器
self._charset_probers = []
# 初始化结果
self.result = None
# 初始化完成标志
self.done = None
# 初始化是否获取数据标志
self._got_data = None
# 初始化输入状态
self._input_state = None
# 初始化最后一个字符
self._last_char = None
# 设置语言过滤器
self.lang_filter = lang_filter
# 获取日志记录器
self.logger = logging.getLogger(__name__)
# 初始化是否包含Windows字节标志
self._has_win_bytes = None
# 重置
self.reset()
def reset(self):
@ -97,14 +108,22 @@ class UniversalDetector(object):
initial states. This is called by ``__init__``, so you only need to
call this directly in between analyses of different documents.
"""
# 重置结果
self.result = {'encoding': None, 'confidence': 0.0, 'language': None}
# 重置完成标志
self.done = False
# 重置是否接收到数据标志
self._got_data = False
# 重置是否有win字节标志
self._has_win_bytes = False
# 重置输入状态
self._input_state = InputState.PURE_ASCII
# 重置最后一个字符
self._last_char = b''
# 如果有esc字符集探测器重置它
if self._esc_charset_prober:
self._esc_charset_prober.reset()
# 重置所有字符集探测器
for prober in self._charset_probers:
prober.reset()

@ -33,50 +33,75 @@ from .mbcssm import UTF8_SM_MODEL
class UTF8Prober(CharSetProber):
# 定义一个常量表示一个字符的初始概率为0.5
ONE_CHAR_PROB = 0.5
# 初始化函数
def __init__(self):
# 调用父类的初始化函数
super(UTF8Prober, self).__init__()
# 初始化编码状态机
self.coding_sm = CodingStateMachine(UTF8_SM_MODEL)
# 初始化多字节字符数量
self._num_mb_chars = None
# 调用重置函数
self.reset()
# 重置函数
def reset(self):
# 调用父类的重置函数
super(UTF8Prober, self).reset()
# 重置编码状态机
self.coding_sm.reset()
# 重置多字节字符数量
self._num_mb_chars = 0
# 获取字符集名称的属性
@property
def charset_name(self):
# 返回字符集名称
return "utf-8"
# 获取语言名称的属性
@property
def language(self):
# 返回语言名称
return ""
def feed(self, byte_str):
# 遍历byte_str中的每个字符
for c in byte_str:
# 获取下一个状态
coding_state = self.coding_sm.next_state(c)
# 如果状态为ERROR则将状态设置为NOT_ME并跳出循环
if coding_state == MachineState.ERROR:
self._state = ProbingState.NOT_ME
break
# 如果状态为ITS_ME则将状态设置为FOUND_IT并跳出循环
elif coding_state == MachineState.ITS_ME:
self._state = ProbingState.FOUND_IT
break
# 如果状态为START且当前字符长度大于等于2则将_num_mb_chars加1
elif coding_state == MachineState.START:
if self.coding_sm.get_current_charlen() >= 2:
self._num_mb_chars += 1
# 如果状态为DETECTING且置信度大于SHORTCUT_THRESHOLD则将状态设置为FOUND_IT
if self.state == ProbingState.DETECTING:
if self.get_confidence() > self.SHORTCUT_THRESHOLD:
self._state = ProbingState.FOUND_IT
# 返回状态
return self.state
def get_confidence(self):
# 初始化 unlike 为 0.99
unlike = 0.99
# 如果_num_mb_chars 小于 6则 unlike 乘以 ONE_CHAR_PROB 的 _num_mb_chars 次方
if self._num_mb_chars < 6:
unlike *= self.ONE_CHAR_PROB ** self._num_mb_chars
# 返回 1.0 减去 unlike
return 1.0 - unlike
# 否则返回 unlike
else:
return unlike

@ -67,30 +67,47 @@ __all__ = ['AmbiguityError', 'CheckboxControl', 'Control',
'TextareaControl', 'XHTMLCompatibleFormParser']
try:
# 尝试导入logging和inspect模块
import logging
import inspect
except ImportError:
# 如果导入失败定义一个空的debug函数
def debug(msg, *args, **kwds):
pass
else:
# 如果导入成功定义一个_logger对象
_logger = logging.getLogger("ClientForm")
# 定义一个优化hack变量
OPTIMIZATION_HACK = True
# 定义一个debug函数
def debug(msg, *args, **kwds):
# 如果优化hack为True则返回
if OPTIMIZATION_HACK:
return
# 获取调用者的函数名
caller_name = inspect.stack()[1][3]
# 定义一个扩展的消息
extended_msg = '%%s %s' % msg
# 定义一个扩展的参数
extended_args = (caller_name,)+args
# 调用_logger对象的debug方法
debug = _logger.debug(extended_msg, *extended_args, **kwds)
# 定义一个_show_debug_messages函数
def _show_debug_messages():
# 定义一个全局变量OPTIMIZATION_HACK
global OPTIMIZATION_HACK
# 将优化hack设置为False
OPTIMIZATION_HACK = False
# 将_logger对象的日志级别设置为DEBUG
_logger.setLevel(logging.DEBUG)
# 定义一个StreamHandler对象
handler = logging.StreamHandler(sys.stdout)
# 将StreamHandler对象的日志级别设置为DEBUG
handler.setLevel(logging.DEBUG)
# 将StreamHandler对象添加到_logger对象中
_logger.addHandler(handler)
try:
@ -114,13 +131,17 @@ except ImportError:
import sys, re, random
if sys.version_info >= (3, 0):
# 如果Python版本大于等于3.0则将xrange替换为range
xrange = range
# monkeypatch to fix http://www.python.org/sf/803422 :-(
# 修补monkeypatch以修复http://www.python.org/sf/803422 :-(
sgmllib.charref = re.compile("&#(x?[0-9a-fA-F]+)[^0-9a-fA-F]")
# HTMLParser.HTMLParser is recent, so live without it if it's not available
# (also, sgmllib.SGMLParser is much more tolerant of bad HTML)
# HTMLParser.HTMLParser是最近的如果不可用则没有它
# 另外sgmllib.SGMLParser对不良HTML的容忍度更高
try:
import HTMLParser
except ImportError:
@ -131,9 +152,11 @@ else:
try:
import warnings
except ImportError:
# 如果没有导入warnings模块则定义一个空函数
def deprecation(message, stack_offset=0):
pass
else:
# 如果成功导入warnings模块则定义一个警告函数
def deprecation(message, stack_offset=0):
warnings.warn(message, DeprecationWarning, stacklevel=3+stack_offset)
@ -224,29 +247,39 @@ string.
return '&'.join(l)
def unescape(data, entities, encoding=DEFAULT_ENCODING):
# 如果data为None或者data中不包含"&"则直接返回data
if data is None or "&" not in data:
return data
# 如果data是字符串类型则将encoding设置为None
if isinstance(data, six.string_types):
encoding = None
# 定义一个函数,用于替换实体
def replace_entities(match, entities=entities, encoding=encoding):
# 获取匹配到的实体
ent = match.group()
# 如果实体以"#"开头则调用unescape_charref函数进行替换
if ent[1] == "#":
return unescape_charref(ent[2:-1], encoding)
# 从entities中获取实体的替换值
repl = entities.get(ent)
# 如果替换值存在并且encoding不为None则尝试将替换值解码为字符串
if repl is not None:
if hasattr(repl, "decode") and encoding is not None:
try:
repl = repl.decode(encoding)
except UnicodeError:
repl = ent
# 如果替换值不存在,则将替换值设置为实体本身
else:
repl = ent
# 返回替换值
return repl
# 使用正则表达式替换data中的实体
return re.sub(r"&#?[A-Za-z0-9]+?;", replace_entities, data)
def unescape_charref(data, encoding):
@ -646,31 +679,47 @@ class _AbstractFormParser:
self._textarea = None
def start_label(self, attrs):
# 打印attrs
debug("%s", attrs)
# 如果当前标签存在,则结束标签
if self._current_label:
self.end_label()
# 创建一个空字典
d = {}
# 遍历attrs
for key, val in attrs:
# 如果val需要转义则进行转义
d[key] = self.unescape_attr_if_required(val)
# 如果存在for属性则taken为True
taken = bool(d.get("for")) # empty id is invalid
# 添加__text属性值为空字符串
d["__text"] = ""
# 添加__taken属性值为taken
d["__taken"] = taken
# 如果taken为True则将d添加到labels列表中
if taken:
self.labels.append(d)
# 将当前标签设置为d
self._current_label = d
def end_label(self):
# 打印空字符串
debug("")
# 获取当前标签
label = self._current_label
# 如果当前标签不存在,则返回
if label is None:
# something is ugly in the HTML, but we're ignoring it
return
# 将当前标签设置为None
self._current_label = None
# 如果当前标签存在则删除__taken属性
# if it is staying around, it is True in all cases
del label["__taken"]
def _add_label(self, d):
#debug("%s", d)
# 如果当前标签存在且__taken属性为False则将__taken属性设置为True并将当前标签添加到d的__label属性中
if self._current_label is not None:
if not self._current_label["__taken"]:
self._current_label["__taken"] = True
@ -743,12 +792,16 @@ class _AbstractFormParser:
controls.append((type, name, d))
def do_isindex(self, attrs):
# 打印传入的属性
debug("%s", attrs)
d = {}
# 遍历属性,将属性名和属性值存入字典
for key, val in attrs:
d[key] = self.unescape_attr_if_required(val)
# 获取当前表单的控件
controls = self._current_form[2]
# 添加标签
self._add_label(d)
# isindex doesn't have type or name HTML attributes
controls.append(("isindex", None, d))

Loading…
Cancel
Save