添加注释

pull/3/head
wang 3 months ago
parent fa9039fd5e
commit bfffbfc9dd

@ -913,6 +913,7 @@ def checkFalsePositives(injection):
retVal = True retVal = True
# 如果注入数据中所有元素都在PAYLOAD.TECHNIQUE.BOOLEAN、PAYLOAD.TECHNIQUE.TIME、PAYLOAD.TECHNIQUE.STACKED中或者注入数据长度为1且PAYLOAD.TECHNIQUE.UNION在注入数据中且"Generic"在注入数据[PAYLOAD.TECHNIQUE.UNION].title中
if all(_ in (PAYLOAD.TECHNIQUE.BOOLEAN, PAYLOAD.TECHNIQUE.TIME, PAYLOAD.TECHNIQUE.STACKED) for _ in injection.data) or (len(injection.data) == 1 and PAYLOAD.TECHNIQUE.UNION in injection.data and "Generic" in injection.data[PAYLOAD.TECHNIQUE.UNION].title): if all(_ in (PAYLOAD.TECHNIQUE.BOOLEAN, PAYLOAD.TECHNIQUE.TIME, PAYLOAD.TECHNIQUE.STACKED) for _ in injection.data) or (len(injection.data) == 1 and PAYLOAD.TECHNIQUE.UNION in injection.data and "Generic" in injection.data[PAYLOAD.TECHNIQUE.UNION].title):
pushValue(kb.injection) pushValue(kb.injection)
@ -920,11 +921,13 @@ def checkFalsePositives(injection):
infoMsg += "parameter '%s' is a false positive" % injection.parameter infoMsg += "parameter '%s' is a false positive" % injection.parameter
logger.info(infoMsg) logger.info(infoMsg)
# 定义一个函数,返回一个随机整数
def _(): def _():
return int(randomInt(2)) + 1 return int(randomInt(2)) + 1
kb.injection = injection kb.injection = injection
# 遍历配置级别
for level in xrange(conf.level): for level in xrange(conf.level):
while True: while True:
randInt1, randInt2, randInt3 = (_() for j in xrange(3)) randInt1, randInt2, randInt3 = (_() for j in xrange(3))
@ -932,38 +935,48 @@ def checkFalsePositives(injection):
randInt1 = min(randInt1, randInt2, randInt3) randInt1 = min(randInt1, randInt2, randInt3)
randInt3 = max(randInt1, randInt2, randInt3) randInt3 = max(randInt1, randInt2, randInt3)
# 如果配置字符串存在且配置字符串在任意一个Unicode(_)中
if conf.string and any(conf.string in getUnicode(_) for _ in (randInt1, randInt2, randInt3)): if conf.string and any(conf.string in getUnicode(_) for _ in (randInt1, randInt2, randInt3)):
continue continue
# 如果配置非字符串存在且配置非字符串在任意一个Unicode(_)中
if conf.notString and any(conf.notString in getUnicode(_) for _ in (randInt1, randInt2, randInt3)): if conf.notString and any(conf.notString in getUnicode(_) for _ in (randInt1, randInt2, randInt3)):
continue continue
# 如果randInt3 > randInt2 > randInt1则跳出循环
if randInt3 > randInt2 > randInt1: if randInt3 > randInt2 > randInt1:
break break
# 如果不满足布尔表达式则retVal为False跳出循环
if not checkBooleanExpression("%d%s%d" % (randInt1, INFERENCE_EQUALS_CHAR, randInt1)): if not checkBooleanExpression("%d%s%d" % (randInt1, INFERENCE_EQUALS_CHAR, randInt1)):
retVal = False retVal = False
break break
# 如果PAYLOAD.TECHNIQUE.BOOLEAN不在注入数据中则检查布尔表达式
if PAYLOAD.TECHNIQUE.BOOLEAN not in injection.data: if PAYLOAD.TECHNIQUE.BOOLEAN not in injection.data:
checkBooleanExpression("%d%s%d" % (randInt1, INFERENCE_EQUALS_CHAR, randInt2)) # just in case if DBMS hasn't properly recovered from previous delayed request checkBooleanExpression("%d%s%d" % (randInt1, INFERENCE_EQUALS_CHAR, randInt2)) # just in case if DBMS hasn't properly recovered from previous delayed request
# 如果满足布尔表达式则retVal为False跳出循环
if checkBooleanExpression("%d%s%d" % (randInt1, INFERENCE_EQUALS_CHAR, randInt3)): # this must not be evaluated to True if checkBooleanExpression("%d%s%d" % (randInt1, INFERENCE_EQUALS_CHAR, randInt3)): # this must not be evaluated to True
retVal = False retVal = False
break break
# 如果满足布尔表达式则retVal为False跳出循环
elif checkBooleanExpression("%d%s%d" % (randInt3, INFERENCE_EQUALS_CHAR, randInt2)): # this must not be evaluated to True elif checkBooleanExpression("%d%s%d" % (randInt3, INFERENCE_EQUALS_CHAR, randInt2)): # this must not be evaluated to True
retVal = False retVal = False
break break
# 如果不满足布尔表达式则retVal为False跳出循环
elif not checkBooleanExpression("%d%s%d" % (randInt2, INFERENCE_EQUALS_CHAR, randInt2)): # this must be evaluated to True elif not checkBooleanExpression("%d%s%d" % (randInt2, INFERENCE_EQUALS_CHAR, randInt2)): # this must be evaluated to True
retVal = False retVal = False
break break
# 如果满足布尔表达式则retVal为False跳出循环
elif checkBooleanExpression("%d %d" % (randInt3, randInt2)): # this must not be evaluated to True (invalid statement) elif checkBooleanExpression("%d %d" % (randInt3, randInt2)): # this must not be evaluated to True (invalid statement)
retVal = False retVal = False
break break
# 如果retVal为False则记录警告信息
if not retVal: if not retVal:
warnMsg = "false positive or unexploitable injection point detected" warnMsg = "false positive or unexploitable injection point detected"
logger.warning(warnMsg) logger.warning(warnMsg)
@ -978,22 +991,28 @@ def checkSuhosinPatch(injection):
Checks for existence of Suhosin-patch (and alike) protection mechanism(s) Checks for existence of Suhosin-patch (and alike) protection mechanism(s)
""" """
# 如果注入点在GET或URI中
if injection.place in (PLACE.GET, PLACE.URI): if injection.place in (PLACE.GET, PLACE.URI):
debugMsg = "checking for parameter length " debugMsg = "checking for parameter length "
debugMsg += "constraining mechanisms" debugMsg += "constraining mechanisms"
logger.debug(debugMsg) logger.debug(debugMsg)
# 将当前的注入点保存到栈中
pushValue(kb.injection) pushValue(kb.injection)
# 设置当前的注入点
kb.injection = injection kb.injection = injection
# 生成一个随机数
randInt = randomInt() randInt = randomInt()
# 检查参数长度是否被限制
if not checkBooleanExpression("%d=%s%d" % (randInt, ' ' * SUHOSIN_MAX_VALUE_LENGTH, randInt)): if not checkBooleanExpression("%d=%s%d" % (randInt, ' ' * SUHOSIN_MAX_VALUE_LENGTH, randInt)):
warnMsg = "parameter length constraining " warnMsg = "parameter length constraining "
warnMsg += "mechanism detected (e.g. Suhosin patch). " warnMsg += "mechanism detected (e.g. Suhosin patch). "
warnMsg += "Potential problems in enumeration phase can be expected" warnMsg += "Potential problems in enumeration phase can be expected"
logger.warning(warnMsg) logger.warning(warnMsg)
# 恢复之前的注入点
kb.injection = popValue() kb.injection = popValue()
@stackedmethod @stackedmethod
@ -1001,9 +1020,12 @@ def checkFilteredChars(injection):
debugMsg = "checking for filtered characters" debugMsg = "checking for filtered characters"
logger.debug(debugMsg) logger.debug(debugMsg)
# 将当前的注入点保存到栈中
pushValue(kb.injection) pushValue(kb.injection)
# 设置当前的注入点
kb.injection = injection kb.injection = injection
# 生成一个随机数
randInt = randomInt() randInt = randomInt()
# all other techniques are already using parentheses in tests # all other techniques are already using parentheses in tests
@ -1025,17 +1047,23 @@ def checkFilteredChars(injection):
kb.injection = popValue() kb.injection = popValue()
# 定义一个函数用于检查SQL注入
def heuristicCheckSqlInjection(place, parameter): def heuristicCheckSqlInjection(place, parameter):
# 如果配置文件中设置了跳过启发式测试则返回None
if conf.skipHeuristics: if conf.skipHeuristics:
return None return None
# 获取原始值
origValue = conf.paramDict[place][parameter] origValue = conf.paramDict[place][parameter]
# 获取参数类型
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
# 定义前缀、后缀和随机字符串
prefix = "" prefix = ""
suffix = "" suffix = ""
randStr = "" randStr = ""
# 如果配置文件中设置了前缀或后缀,则获取前缀和后缀
if conf.prefix or conf.suffix: if conf.prefix or conf.suffix:
if conf.prefix: if conf.prefix:
prefix = conf.prefix prefix = conf.prefix
@ -1043,52 +1071,82 @@ def heuristicCheckSqlInjection(place, parameter):
if conf.suffix: if conf.suffix:
suffix = conf.suffix suffix = conf.suffix
# 生成一个包含一个单引号和一个双引号的随机字符串
while randStr.count('\'') != 1 or randStr.count('\"') != 1: while randStr.count('\'') != 1 or randStr.count('\"') != 1:
randStr = randomStr(length=10, alphabet=HEURISTIC_CHECK_ALPHABET) randStr = randomStr(length=10, alphabet=HEURISTIC_CHECK_ALPHABET)
# 设置启发式模式为True
kb.heuristicMode = True kb.heuristicMode = True
# 构造payload
payload = "%s%s%s" % (prefix, randStr, suffix) payload = "%s%s%s" % (prefix, randStr, suffix)
# 使用agent.payload方法构造payload
payload = agent.payload(place, parameter, newValue=payload) payload = agent.payload(place, parameter, newValue=payload)
# 发送请求并获取页面和响应码
page, _, code = Request.queryPage(payload, place, content=True, raise404=False) page, _, code = Request.queryPage(payload, place, content=True, raise404=False)
# 将页面和响应码保存到kb中
kb.heuristicPage = page kb.heuristicPage = page
kb.heuristicCode = code kb.heuristicCode = code
# 设置启发式模式为False
kb.heuristicMode = False kb.heuristicMode = False
# 解析页面中的文件路径
parseFilePaths(page) parseFilePaths(page)
# 判断是否是数据库错误
result = wasLastResponseDBMSError() result = wasLastResponseDBMSError()
# 构造提示信息
infoMsg = "heuristic (basic) test shows that %sparameter '%s' might " % ("%s " % paramType if paramType != parameter else "", parameter) infoMsg = "heuristic (basic) test shows that %sparameter '%s' might " % ("%s " % paramType if paramType != parameter else "", parameter)
# 判断page中是否包含FORMAT_EXCEPTION_STRINGS中的任意一个字符串
def _(page): def _(page):
return any(_ in (page or "") for _ in FORMAT_EXCEPTION_STRINGS) return any(_ in (page or "") for _ in FORMAT_EXCEPTION_STRINGS)
# 判断page中是否包含FORMAT_EXCEPTION_STRINGS中的任意一个字符串且kb.originalPage中不包含
casting = _(page) and not _(kb.originalPage) casting = _(page) and not _(kb.originalPage)
# 如果没有进行类型转换且result为空kb.dynamicParameter为真origValue为数字且kb.heavilyDynamic为假
if not casting and not result and kb.dynamicParameter and origValue.isdigit() and not kb.heavilyDynamic: if not casting and not result and kb.dynamicParameter and origValue.isdigit() and not kb.heavilyDynamic:
# 生成一个随机整数
randInt = int(randomInt()) randInt = int(randomInt())
# 生成payload
payload = "%s%s%s" % (prefix, "%d-%d" % (int(origValue) + randInt, randInt), suffix) payload = "%s%s%s" % (prefix, "%d-%d" % (int(origValue) + randInt, randInt), suffix)
# 使用agent.payload生成payload
payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE) payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
# 使用Request.queryPage查询页面
result = Request.queryPage(payload, place, raise404=False) result = Request.queryPage(payload, place, raise404=False)
# 如果result为空
if not result: if not result:
# 生成一个随机字符串
randStr = randomStr() randStr = randomStr()
# 生成payload
payload = "%s%s%s" % (prefix, "%s.%d%s" % (origValue, random.randint(1, 9), randStr), suffix) payload = "%s%s%s" % (prefix, "%s.%d%s" % (origValue, random.randint(1, 9), randStr), suffix)
# 使用agent.payload生成payload
payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE) payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
# 使用Request.queryPage查询页面
casting = Request.queryPage(payload, place, raise404=False) casting = Request.queryPage(payload, place, raise404=False)
# 根据casting和result的值设置kb.heuristicTest的值
kb.heuristicTest = HEURISTIC_TEST.CASTED if casting else HEURISTIC_TEST.NEGATIVE if not result else HEURISTIC_TEST.POSITIVE kb.heuristicTest = HEURISTIC_TEST.CASTED if casting else HEURISTIC_TEST.NEGATIVE if not result else HEURISTIC_TEST.POSITIVE
# 如果kb.heavilyDynamic为真
if kb.heavilyDynamic: if kb.heavilyDynamic:
# 输出debug信息
debugMsg = "heuristic check stopped because of heavy dynamicity" debugMsg = "heuristic check stopped because of heavy dynamicity"
logger.debug(debugMsg) logger.debug(debugMsg)
# 返回kb.heuristicTest的值
return kb.heuristicTest return kb.heuristicTest
# 如果casting为真
if casting: if casting:
# 输出错误信息
errMsg = "possible %s casting detected (e.g. '" % ("integer" if origValue.isdigit() else "type") errMsg = "possible %s casting detected (e.g. '" % ("integer" if origValue.isdigit() else "type")
# 获取url的后缀
platform = conf.url.split('.')[-1].lower() platform = conf.url.split('.')[-1].lower()
# 根据后缀,输出不同的错误信息
if platform == WEB_PLATFORM.ASP: if platform == WEB_PLATFORM.ASP:
errMsg += "%s=CInt(request.querystring(\"%s\"))" % (parameter, parameter) errMsg += "%s=CInt(request.querystring(\"%s\"))" % (parameter, parameter)
elif platform == WEB_PLATFORM.ASPX: elif platform == WEB_PLATFORM.ASPX:
@ -1101,45 +1159,68 @@ def heuristicCheckSqlInjection(place, parameter):
errMsg += "') at the back-end web application" errMsg += "') at the back-end web application"
logger.error(errMsg) logger.error(errMsg)
# 如果kb.ignoreCasted为空
if kb.ignoreCasted is None: if kb.ignoreCasted is None:
# 输出提示信息
message = "do you want to skip those kind of cases (and save scanning time)? %s " % ("[Y/n]" if conf.multipleTargets else "[y/N]") message = "do you want to skip those kind of cases (and save scanning time)? %s " % ("[Y/n]" if conf.multipleTargets else "[y/N]")
# 读取用户输入设置kb.ignoreCasted的值
kb.ignoreCasted = readInput(message, default='Y' if conf.multipleTargets else 'N', boolean=True) kb.ignoreCasted = readInput(message, default='Y' if conf.multipleTargets else 'N', boolean=True)
# 如果result为真
elif result: elif result:
# 输出信息
infoMsg += "be injectable" infoMsg += "be injectable"
# 如果Backend.getErrorParsedDBMSes()不为空
if Backend.getErrorParsedDBMSes(): if Backend.getErrorParsedDBMSes():
# 输出信息
infoMsg += " (possible DBMS: '%s')" % Format.getErrorParsedDBMSes() infoMsg += " (possible DBMS: '%s')" % Format.getErrorParsedDBMSes()
logger.info(infoMsg) logger.info(infoMsg)
# 如果以上条件都不满足
else: else:
# 输出警告信息
infoMsg += "not be injectable" infoMsg += "not be injectable"
logger.warning(infoMsg) logger.warning(infoMsg)
# 设置kb.heuristicMode为真
kb.heuristicMode = True kb.heuristicMode = True
# 设置kb.disableHtmlDecoding为真
kb.disableHtmlDecoding = True kb.disableHtmlDecoding = True
# 生成两个随机字符串
randStr1, randStr2 = randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH), randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH) randStr1, randStr2 = randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH), randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH)
# 生成value
value = "%s%s%s" % (randStr1, DUMMY_NON_SQLI_CHECK_APPENDIX, randStr2) value = "%s%s%s" % (randStr1, DUMMY_NON_SQLI_CHECK_APPENDIX, randStr2)
# 生成payload
payload = "%s%s%s" % (prefix, "'%s" % value, suffix) payload = "%s%s%s" % (prefix, "'%s" % value, suffix)
# 使用agent.payload生成payload
payload = agent.payload(place, parameter, newValue=payload) payload = agent.payload(place, parameter, newValue=payload)
# 使用Request.queryPage查询页面
page, _, _ = Request.queryPage(payload, place, content=True, raise404=False) page, _, _ = Request.queryPage(payload, place, content=True, raise404=False)
# 获取paramType
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
# Reference: https://bugs.python.org/issue18183
if value.upper() in (page or "").upper(): if value.upper() in (page or "").upper():
# 如果value的大写字母在page的大写字母中则执行以下代码
infoMsg = "heuristic (XSS) test shows that %sparameter '%s' might be vulnerable to cross-site scripting (XSS) attacks" % ("%s " % paramType if paramType != parameter else "", parameter) infoMsg = "heuristic (XSS) test shows that %sparameter '%s' might be vulnerable to cross-site scripting (XSS) attacks" % ("%s " % paramType if paramType != parameter else "", parameter)
# 输出信息表示参数可能存在XSS攻击
logger.info(infoMsg) logger.info(infoMsg)
if conf.beep: if conf.beep:
# 如果配置文件中设置了beep则执行beep函数
beep() beep()
for match in re.finditer(FI_ERROR_REGEX, page or ""): for match in re.finditer(FI_ERROR_REGEX, page or ""):
# 在page中查找FI_ERROR_REGEX如果找到则执行以下代码
if randStr1.lower() in match.group(0).lower(): if randStr1.lower() in match.group(0).lower():
# 如果randStr1的小写字母在match.group(0)的小写字母中,则执行以下代码
infoMsg = "heuristic (FI) test shows that %sparameter '%s' might be vulnerable to file inclusion (FI) attacks" % ("%s " % paramType if paramType != parameter else "", parameter) infoMsg = "heuristic (FI) test shows that %sparameter '%s' might be vulnerable to file inclusion (FI) attacks" % ("%s " % paramType if paramType != parameter else "", parameter)
# 输出信息表示参数可能存在FI攻击
logger.info(infoMsg) logger.info(infoMsg)
if conf.beep: if conf.beep:
# 如果配置文件中设置了beep则执行beep函数
beep() beep()
break break
@ -1157,6 +1238,7 @@ def checkDynParam(place, parameter, value):
""" """
if kb.choices.redirect: if kb.choices.redirect:
# 如果kb.choices.redirect为True则返回None
return None return None
kb.matchRatio = None kb.matchRatio = None
@ -1166,11 +1248,14 @@ def checkDynParam(place, parameter, value):
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
infoMsg = "testing if %sparameter '%s' is dynamic" % ("%s " % paramType if paramType != parameter else "", parameter) infoMsg = "testing if %sparameter '%s' is dynamic" % ("%s " % paramType if paramType != parameter else "", parameter)
# 输出信息,表示正在测试参数是否动态
logger.info(infoMsg) logger.info(infoMsg)
try: try:
payload = agent.payload(place, parameter, value, getUnicode(randInt)) payload = agent.payload(place, parameter, value, getUnicode(randInt))
# 生成payload
dynResult = Request.queryPage(payload, place, raise404=False) dynResult = Request.queryPage(payload, place, raise404=False)
# 发送payload获取结果
except SqlmapConnectionException: except SqlmapConnectionException:
pass pass
@ -1184,40 +1269,51 @@ def checkDynamicContent(firstPage, secondPage):
This function checks for the dynamic content in the provided pages This function checks for the dynamic content in the provided pages
""" """
# 如果没有网络连接,则跳过动态内容检查
if kb.nullConnection: if kb.nullConnection:
debugMsg = "dynamic content checking skipped " debugMsg = "dynamic content checking skipped "
debugMsg += "because NULL connection used" debugMsg += "because NULL connection used"
logger.debug(debugMsg) logger.debug(debugMsg)
return return
# 如果没有提供页面内容,则无法检查动态内容
if any(page is None for page in (firstPage, secondPage)): if any(page is None for page in (firstPage, secondPage)):
warnMsg = "can't check dynamic content " warnMsg = "can't check dynamic content "
warnMsg += "because of lack of page content" warnMsg += "because of lack of page content"
logger.critical(warnMsg) logger.critical(warnMsg)
return return
# 如果提供了页面内容,并且页面内容长度超过最大长度,则无法计算相似度
if firstPage and secondPage and any(len(_) > MAX_DIFFLIB_SEQUENCE_LENGTH for _ in (firstPage, secondPage)): if firstPage and secondPage and any(len(_) > MAX_DIFFLIB_SEQUENCE_LENGTH for _ in (firstPage, secondPage)):
ratio = None ratio = None
else: else:
try: try:
# 获取当前线程的数据
seqMatcher = getCurrentThreadData().seqMatcher seqMatcher = getCurrentThreadData().seqMatcher
# 设置第一个序列
seqMatcher.set_seq1(firstPage) seqMatcher.set_seq1(firstPage)
# 设置第二个序列
seqMatcher.set_seq2(secondPage) seqMatcher.set_seq2(secondPage)
# 计算相似度
ratio = seqMatcher.quick_ratio() ratio = seqMatcher.quick_ratio()
except MemoryError: except MemoryError:
ratio = None ratio = None
# 如果无法计算相似度,则跳过动态内容检查
if ratio is None: if ratio is None:
kb.skipSeqMatcher = True kb.skipSeqMatcher = True
# In case of an intolerable difference turn on dynamicity removal engine # In case of an intolerable difference turn on dynamicity removal engine
elif ratio <= UPPER_RATIO_BOUND: elif ratio <= UPPER_RATIO_BOUND:
# 如果比率小于等于上限比率则调用findDynamicContent函数
findDynamicContent(firstPage, secondPage) findDynamicContent(firstPage, secondPage)
count = 0 count = 0
# 当Request.queryPage()返回False时循环执行
while not Request.queryPage(): while not Request.queryPage():
count += 1 count += 1
# 如果重试次数超过配置的最大重试次数则输出警告信息并将textOnly设置为True
if count > conf.retries: if count > conf.retries:
warnMsg = "target URL content appears to be too dynamic. " warnMsg = "target URL content appears to be too dynamic. "
warnMsg += "Switching to '--text-only' " warnMsg += "Switching to '--text-only' "
@ -1226,12 +1322,14 @@ def checkDynamicContent(firstPage, secondPage):
conf.textOnly = True conf.textOnly = True
return return
# 输出警告信息表示目标URL内容过于动态sqlmap将重试请求
warnMsg = "target URL content appears to be heavily dynamic. " warnMsg = "target URL content appears to be heavily dynamic. "
warnMsg += "sqlmap is going to retry the request(s)" warnMsg += "sqlmap is going to retry the request(s)"
singleTimeLogMessage(warnMsg, logging.CRITICAL) singleTimeLogMessage(warnMsg, logging.CRITICAL)
kb.heavilyDynamic = True kb.heavilyDynamic = True
# 重新查询页面内容
secondPage, _, _ = Request.queryPage(content=True) secondPage, _, _ = Request.queryPage(content=True)
findDynamicContent(firstPage, secondPage) findDynamicContent(firstPage, secondPage)
@ -1249,17 +1347,22 @@ def checkStability():
infoMsg = "testing if the target URL content is stable" infoMsg = "testing if the target URL content is stable"
logger.info(infoMsg) logger.info(infoMsg)
# 获取原始页面内容
firstPage = kb.originalPage # set inside checkConnection() firstPage = kb.originalPage # set inside checkConnection()
# 计算延迟时间
delay = MAX_STABILITY_DELAY - (time.time() - (kb.originalPageTime or 0)) delay = MAX_STABILITY_DELAY - (time.time() - (kb.originalPageTime or 0))
delay = max(0, min(MAX_STABILITY_DELAY, delay)) delay = max(0, min(MAX_STABILITY_DELAY, delay))
time.sleep(delay) time.sleep(delay)
# 重新查询页面内容
secondPage, _, _ = Request.queryPage(content=True, noteResponseTime=False, raise404=False) secondPage, _, _ = Request.queryPage(content=True, noteResponseTime=False, raise404=False)
# 如果存在重定向则返回None
if kb.choices.redirect: if kb.choices.redirect:
return None return None
# 比较两个页面内容是否相同
kb.pageStable = (firstPage == secondPage) kb.pageStable = (firstPage == secondPage)
if kb.pageStable: if kb.pageStable:
@ -1281,6 +1384,7 @@ def checkStability():
warnMsg += "'Page comparison'" warnMsg += "'Page comparison'"
logger.warning(warnMsg) logger.warning(warnMsg)
# 提示用户如何继续
message = "how do you want to proceed? [(C)ontinue/(s)tring/(r)egex/(q)uit] " message = "how do you want to proceed? [(C)ontinue/(s)tring/(r)egex/(q)uit] "
choice = readInput(message, default='C').upper() choice = readInput(message, default='C').upper()
@ -1288,6 +1392,7 @@ def checkStability():
raise SqlmapUserQuitException raise SqlmapUserQuitException
elif choice == 'S': elif choice == 'S':
# 如果用户选择字符串匹配则调用showStaticWords函数
showStaticWords(firstPage, secondPage) showStaticWords(firstPage, secondPage)
message = "please enter value for parameter 'string': " message = "please enter value for parameter 'string': "
@ -1307,6 +1412,7 @@ def checkStability():
raise SqlmapNoneDataException(errMsg) raise SqlmapNoneDataException(errMsg)
elif choice == 'R': elif choice == 'R':
# 如果用户选择正则表达式匹配,则提示用户输入正则表达式
message = "please enter value for parameter 'regex': " message = "please enter value for parameter 'regex': "
regex = readInput(message) regex = readInput(message)
@ -1324,6 +1430,7 @@ def checkStability():
raise SqlmapNoneDataException(errMsg) raise SqlmapNoneDataException(errMsg)
else: else:
# 如果用户选择继续则调用checkDynamicContent函数
checkDynamicContent(firstPage, secondPage) checkDynamicContent(firstPage, secondPage)
return kb.pageStable return kb.pageStable
@ -1334,12 +1441,15 @@ def checkWaf():
Reference: http://seclists.org/nmap-dev/2011/q2/att-1005/http-waf-detect.nse Reference: http://seclists.org/nmap-dev/2011/q2/att-1005/http-waf-detect.nse
""" """
# 如果配置中存在字符串匹配、不匹配、正则表达式匹配、假数据、离线模式或跳过WAF检测则返回None
if any((conf.string, conf.notString, conf.regexp, conf.dummy, conf.offline, conf.skipWaf)): if any((conf.string, conf.notString, conf.regexp, conf.dummy, conf.offline, conf.skipWaf)):
return None return None
# 如果原始HTTP状态码为404则返回None
if kb.originalCode == _http_client.NOT_FOUND: if kb.originalCode == _http_client.NOT_FOUND:
return None return None
# 从哈希数据库中获取WAF检测结果
_ = hashDBRetrieve(HASHDB_KEYS.CHECK_WAF_RESULT, True) _ = hashDBRetrieve(HASHDB_KEYS.CHECK_WAF_RESULT, True)
if _ is not None: if _ is not None:
if _: if _:
@ -1348,6 +1458,7 @@ def checkWaf():
logger.critical(warnMsg) logger.critical(warnMsg)
return _ return _
# 如果原始页面内容为空则返回None
if not kb.originalPage: if not kb.originalPage:
return None return None
@ -1356,36 +1467,44 @@ def checkWaf():
logger.info(infoMsg) logger.info(infoMsg)
retVal = False retVal = False
# 生成随机payload
payload = "%d %s" % (randomInt(), IPS_WAF_CHECK_PAYLOAD) payload = "%d %s" % (randomInt(), IPS_WAF_CHECK_PAYLOAD)
place = PLACE.GET place = PLACE.GET
# 如果URI参数存在则将payload添加到URI参数中
if PLACE.URI in conf.parameters: if PLACE.URI in conf.parameters:
value = "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload)) value = "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload))
else: else:
value = "" if not conf.parameters.get(PLACE.GET) else conf.parameters[PLACE.GET] + DEFAULT_GET_POST_DELIMITER value = "" if not conf.parameters.get(PLACE.GET) else conf.parameters[PLACE.GET] + DEFAULT_GET_POST_DELIMITER
value += "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload)) value += "%s=%s" % (randomStr(), agent.addPayloadDelimiters(payload))
# 保存当前状态
pushValue(kb.choices.redirect) pushValue(kb.choices.redirect)
pushValue(kb.resendPostOnRedirect) pushValue(kb.resendPostOnRedirect)
pushValue(conf.timeout) pushValue(conf.timeout)
# 设置重定向为True重发POST请求为False超时时间为IPS_WAF_CHECK_TIMEOUT
kb.choices.redirect = REDIRECTION.YES kb.choices.redirect = REDIRECTION.YES
kb.resendPostOnRedirect = False kb.resendPostOnRedirect = False
conf.timeout = IPS_WAF_CHECK_TIMEOUT conf.timeout = IPS_WAF_CHECK_TIMEOUT
try: try:
# 查询页面内容,并比较比率
retVal = (Request.queryPage(place=place, value=value, getRatioValue=True, noteResponseTime=False, silent=True, raise404=False, disableTampering=True)[1] or 0) < IPS_WAF_CHECK_RATIO retVal = (Request.queryPage(place=place, value=value, getRatioValue=True, noteResponseTime=False, silent=True, raise404=False, disableTampering=True)[1] or 0) < IPS_WAF_CHECK_RATIO
except SqlmapConnectionException: except SqlmapConnectionException:
retVal = True retVal = True
finally: finally:
kb.matchRatio = None kb.matchRatio = None
# 恢复之前的状态
conf.timeout = popValue() conf.timeout = popValue()
kb.resendPostOnRedirect = popValue() kb.resendPostOnRedirect = popValue()
kb.choices.redirect = popValue() kb.choices.redirect = popValue()
# 将WAF检测结果写入哈希数据库
hashDBWrite(HASHDB_KEYS.CHECK_WAF_RESULT, retVal, True) hashDBWrite(HASHDB_KEYS.CHECK_WAF_RESULT, retVal, True)
# 如果检测到WAF则输出警告信息并提示用户是否继续
if retVal: if retVal:
if not kb.identifiedWafs: if not kb.identifiedWafs:
warnMsg = "heuristics detected that the target " warnMsg = "heuristics detected that the target "
@ -1411,9 +1530,11 @@ def checkNullConnection():
Reference: http://www.wisec.it/sectou.php?id=472f952d79293 Reference: http://www.wisec.it/sectou.php?id=472f952d79293
""" """
# 如果存在POST数据则返回False
if conf.data: if conf.data:
return False return False
# 从哈希数据库中获取NULL连接检测结果
_ = hashDBRetrieve(HASHDB_KEYS.CHECK_NULL_CONNECTION_RESULT, True) _ = hashDBRetrieve(HASHDB_KEYS.CHECK_NULL_CONNECTION_RESULT, True)
if _ is not None: if _ is not None:
kb.nullConnection = _ kb.nullConnection = _
@ -1426,10 +1547,12 @@ def checkNullConnection():
infoMsg = "testing NULL connection to the target URL" infoMsg = "testing NULL connection to the target URL"
logger.info(infoMsg) logger.info(infoMsg)
# 保存当前状态
pushValue(kb.pageCompress) pushValue(kb.pageCompress)
kb.pageCompress = False kb.pageCompress = False
try: try:
# 使用HEAD方法测试NULL连接
page, headers, _ = Request.getPage(method=HTTPMETHOD.HEAD, raise404=False) page, headers, _ = Request.getPage(method=HTTPMETHOD.HEAD, raise404=False)
if not page and HTTP_HEADER.CONTENT_LENGTH in (headers or {}): if not page and HTTP_HEADER.CONTENT_LENGTH in (headers or {}):
@ -1438,6 +1561,7 @@ def checkNullConnection():
infoMsg = "NULL connection is supported with HEAD method ('Content-Length')" infoMsg = "NULL connection is supported with HEAD method ('Content-Length')"
logger.info(infoMsg) logger.info(infoMsg)
else: else:
# 使用GET方法测试NULL连接
page, headers, _ = Request.getPage(auxHeaders={HTTP_HEADER.RANGE: "bytes=-1"}) page, headers, _ = Request.getPage(auxHeaders={HTTP_HEADER.RANGE: "bytes=-1"})
if page and len(page) == 1 and HTTP_HEADER.CONTENT_RANGE in (headers or {}): if page and len(page) == 1 and HTTP_HEADER.CONTENT_RANGE in (headers or {}):
@ -1446,6 +1570,7 @@ def checkNullConnection():
infoMsg = "NULL connection is supported with GET method ('Range')" infoMsg = "NULL connection is supported with GET method ('Range')"
logger.info(infoMsg) logger.info(infoMsg)
else: else:
# 使用skip-read方法测试NULL连接
_, headers, _ = Request.getPage(skipRead=True) _, headers, _ = Request.getPage(skipRead=True)
if HTTP_HEADER.CONTENT_LENGTH in (headers or {}): if HTTP_HEADER.CONTENT_LENGTH in (headers or {}):
@ -1465,36 +1590,48 @@ def checkNullConnection():
return kb.nullConnection in getPublicTypeMembers(NULLCONNECTION, True) return kb.nullConnection in getPublicTypeMembers(NULLCONNECTION, True)
def checkConnection(suppressOutput=False): def checkConnection(suppressOutput=False):
# 获取当前线程数据
threadData = getCurrentThreadData() threadData = getCurrentThreadData()
# 检查主机名是否为IP地址
if not re.search(r"\A\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\Z", conf.hostname): if not re.search(r"\A\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\Z", conf.hostname):
# 如果没有代理、Tor、Dummy或离线模式
if not any((conf.proxy, conf.tor, conf.dummy, conf.offline)): if not any((conf.proxy, conf.tor, conf.dummy, conf.offline)):
try: try:
# 解析主机名
debugMsg = "resolving hostname '%s'" % conf.hostname debugMsg = "resolving hostname '%s'" % conf.hostname
logger.debug(debugMsg) logger.debug(debugMsg)
socket.getaddrinfo(conf.hostname, None) socket.getaddrinfo(conf.hostname, None)
except socket.gaierror: except socket.gaierror:
# 主机名不存在
errMsg = "host '%s' does not exist" % conf.hostname errMsg = "host '%s' does not exist" % conf.hostname
raise SqlmapConnectionException(errMsg) raise SqlmapConnectionException(errMsg)
except socket.error as ex: except socket.error as ex:
# 解析主机名时出现问题
errMsg = "problem occurred while " errMsg = "problem occurred while "
errMsg += "resolving a host name '%s' ('%s')" % (conf.hostname, getSafeExString(ex)) errMsg += "resolving a host name '%s' ('%s')" % (conf.hostname, getSafeExString(ex))
raise SqlmapConnectionException(errMsg) raise SqlmapConnectionException(errMsg)
except UnicodeError as ex: except UnicodeError as ex:
# 处理主机名时出现问题
errMsg = "problem occurred while " errMsg = "problem occurred while "
errMsg += "handling a host name '%s' ('%s')" % (conf.hostname, getSafeExString(ex)) errMsg += "handling a host name '%s' ('%s')" % (conf.hostname, getSafeExString(ex))
raise SqlmapDataException(errMsg) raise SqlmapDataException(errMsg)
# 如果没有抑制输出并且不是Dummy或离线模式
if not suppressOutput and not conf.dummy and not conf.offline: if not suppressOutput and not conf.dummy and not conf.offline:
# 测试连接到目标URL
infoMsg = "testing connection to the target URL" infoMsg = "testing connection to the target URL"
logger.info(infoMsg) logger.info(infoMsg)
try: try:
# 获取页面内容
kb.originalPageTime = time.time() kb.originalPageTime = time.time()
page, headers, _ = Request.queryPage(content=True, noteResponseTime=False) page, headers, _ = Request.queryPage(content=True, noteResponseTime=False)
# 获取原始响应
rawResponse = "%s%s" % (listToStrValue(headers.headers if headers else ""), page) rawResponse = "%s%s" % (listToStrValue(headers.headers if headers else ""), page)
# 如果提供了字符串,检查字符串是否在页面内容中
if conf.string: if conf.string:
infoMsg = "testing if the provided string is within the " infoMsg = "testing if the provided string is within the "
infoMsg += "target URL page content" infoMsg += "target URL page content"
@ -1506,6 +1643,7 @@ def checkConnection(suppressOutput=False):
warnMsg += "URL raw response, sqlmap will carry on anyway" warnMsg += "URL raw response, sqlmap will carry on anyway"
logger.warning(warnMsg) logger.warning(warnMsg)
# 如果提供了正则表达式,检查正则表达式是否匹配页面内容
if conf.regexp: if conf.regexp:
infoMsg = "testing if the provided regular expression matches within " infoMsg = "testing if the provided regular expression matches within "
infoMsg += "the target URL page content" infoMsg += "the target URL page content"
@ -1519,19 +1657,25 @@ def checkConnection(suppressOutput=False):
kb.errorIsNone = False kb.errorIsNone = False
# 如果服务器头包含不兼容的服务器,关闭预连接机制
if any(_ in (kb.serverHeader or "") for _ in PRECONNECT_INCOMPATIBLE_SERVERS): if any(_ in (kb.serverHeader or "") for _ in PRECONNECT_INCOMPATIBLE_SERVERS):
singleTimeWarnMessage("turning off pre-connect mechanism because of incompatible server ('%s')" % kb.serverHeader) singleTimeWarnMessage("turning off pre-connect mechanism because of incompatible server ('%s')" % kb.serverHeader)
conf.disablePrecon = True conf.disablePrecon = True
# 如果没有原始页面并且最后一个响应是HTTP错误
if not kb.originalPage and wasLastResponseHTTPError(): if not kb.originalPage and wasLastResponseHTTPError():
# 如果最后一个请求的HTTP错误代码不在忽略代码中
if getLastRequestHTTPError() not in (conf.ignoreCode or []): if getLastRequestHTTPError() not in (conf.ignoreCode or []):
errMsg = "unable to retrieve page content" errMsg = "unable to retrieve page content"
raise SqlmapConnectionException(errMsg) raise SqlmapConnectionException(errMsg)
# 如果最后一个响应是DBMS错误
elif wasLastResponseDBMSError(): elif wasLastResponseDBMSError():
warnMsg = "there is a DBMS error found in the HTTP response body " warnMsg = "there is a DBMS error found in the HTTP response body "
warnMsg += "which could interfere with the results of the tests" warnMsg += "which could interfere with the results of the tests"
logger.warning(warnMsg) logger.warning(warnMsg)
# 如果最后一个响应是HTTP错误
elif wasLastResponseHTTPError(): elif wasLastResponseHTTPError():
# 如果最后一个请求的HTTP错误代码不在忽略代码中
if getLastRequestHTTPError() not in (conf.ignoreCode or []): if getLastRequestHTTPError() not in (conf.ignoreCode or []):
warnMsg = "the web server responded with an HTTP error code (%d) " % getLastRequestHTTPError() warnMsg = "the web server responded with an HTTP error code (%d) " % getLastRequestHTTPError()
warnMsg += "which could interfere with the results of the tests" warnMsg += "which could interfere with the results of the tests"
@ -1539,14 +1683,20 @@ def checkConnection(suppressOutput=False):
else: else:
kb.errorIsNone = True kb.errorIsNone = True
# 如果重定向为是并且最后一个重定向URL和最后一个请求UID相同
if kb.choices.redirect == REDIRECTION.YES and threadData.lastRedirectURL and threadData.lastRedirectURL[0] == threadData.lastRequestUID: if kb.choices.redirect == REDIRECTION.YES and threadData.lastRedirectURL and threadData.lastRedirectURL[0] == threadData.lastRequestUID:
# 如果最后一个重定向URL以https://开头并且主机名在最后一个重定向URL中
if (threadData.lastRedirectURL[1] or "").startswith("https://") and conf.hostname in getUnicode(threadData.lastRedirectURL[1]): if (threadData.lastRedirectURL[1] or "").startswith("https://") and conf.hostname in getUnicode(threadData.lastRedirectURL[1]):
# 将URL改为https://
conf.url = re.sub(r"https?://", "https://", conf.url) conf.url = re.sub(r"https?://", "https://", conf.url)
# 获取端口号
match = re.search(r":(\d+)", threadData.lastRedirectURL[1]) match = re.search(r":(\d+)", threadData.lastRedirectURL[1])
port = match.group(1) if match else 443 port = match.group(1) if match else 443
# 将URL中的端口号改为最后一个重定向URL中的端口号
conf.url = re.sub(r":\d+(/|\Z)", r":%s\g<1>" % port, conf.url) conf.url = re.sub(r":\d+(/|\Z)", r":%s\g<1>" % port, conf.url)
except SqlmapConnectionException as ex: except SqlmapConnectionException as ex:
# 如果提供了IPv6地址检查连接
if conf.ipv6: if conf.ipv6:
warnMsg = "check connection to a provided " warnMsg = "check connection to a provided "
warnMsg += "IPv6 address with a tool like ping6 " warnMsg += "IPv6 address with a tool like ping6 "
@ -1555,13 +1705,16 @@ def checkConnection(suppressOutput=False):
warnMsg += "any addressing issues" warnMsg += "any addressing issues"
singleTimeWarnMessage(warnMsg) singleTimeWarnMessage(warnMsg)
# 如果HTTP错误代码为404
if any(code in kb.httpErrorCodes for code in (_http_client.NOT_FOUND, )): if any(code in kb.httpErrorCodes for code in (_http_client.NOT_FOUND, )):
errMsg = getSafeExString(ex) errMsg = getSafeExString(ex)
logger.critical(errMsg) logger.critical(errMsg)
# 如果有多个目标
if conf.multipleTargets: if conf.multipleTargets:
return False return False
# 提示用户是否退出
msg = "it is not recommended to continue in this kind of cases. Do you want to quit and make sure that everything is set up properly? [Y/n] " msg = "it is not recommended to continue in this kind of cases. Do you want to quit and make sure that everything is set up properly? [Y/n] "
if readInput(msg, default='Y', boolean=True): if readInput(msg, default='Y', boolean=True):
raise SqlmapSilentQuitException raise SqlmapSilentQuitException
@ -1570,12 +1723,16 @@ def checkConnection(suppressOutput=False):
else: else:
raise raise
finally: finally:
# 将原始页面和页面模板设置为最后一个页面和最后一个代码
kb.originalPage = kb.pageTemplate = threadData.lastPage kb.originalPage = kb.pageTemplate = threadData.lastPage
kb.originalCode = threadData.lastCode kb.originalCode = threadData.lastCode
# 如果提供了cookie并且没有声明cookie也没有在httpHeaders中声明cookie也没有设置dropSetCookie
if conf.cj and not conf.cookie and not any(_[0] == HTTP_HEADER.COOKIE for _ in conf.httpHeaders) and not conf.dropSetCookie: if conf.cj and not conf.cookie and not any(_[0] == HTTP_HEADER.COOKIE for _ in conf.httpHeaders) and not conf.dropSetCookie:
# 获取cookie
candidate = DEFAULT_COOKIE_DELIMITER.join("%s=%s" % (_.name, _.value) for _ in conf.cj) candidate = DEFAULT_COOKIE_DELIMITER.join("%s=%s" % (_.name, _.value) for _ in conf.cj)
# 提示用户是否使用这些cookie
message = "you have not declared cookie(s), while " message = "you have not declared cookie(s), while "
message += "server wants to set its own ('%s'). " % re.sub(r"(=[^=;]{10}[^=;])[^=;]+([^=;]{10})", r"\g<1>...\g<2>", candidate) message += "server wants to set its own ('%s'). " % re.sub(r"(=[^=;]{10}[^=;])[^=;]+([^=;]{10})", r"\g<1>...\g<2>", candidate)
message += "Do you want to use those [Y/n] " message += "Do you want to use those [Y/n] "
@ -1585,8 +1742,11 @@ def checkConnection(suppressOutput=False):
return True return True
# 检查网络连接
def checkInternet(): def checkInternet():
# 获取页面内容
content = Request.getPage(url=CHECK_INTERNET_ADDRESS, checking=True)[0] content = Request.getPage(url=CHECK_INTERNET_ADDRESS, checking=True)[0]
# 判断页面内容是否包含指定的值
return CHECK_INTERNET_VALUE in (content or "") return CHECK_INTERNET_VALUE in (content or "")
def setVerbosity(): # Cross-referenced function def setVerbosity(): # Cross-referenced function

Loading…
Cancel
Save