sqlmap/src/sqlmap-master/lib/request/direct.py

118 lines
4.4 KiB

#!/usr/bin/env python
"""
Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
# 导入所需的模块
import re
import time
from lib.core.agent import agent
from lib.core.common import Backend
from lib.core.common import calculateDeltaSeconds
from lib.core.common import extractExpectedValue
from lib.core.common import getCurrentThreadData
from lib.core.common import hashDBRetrieve
from lib.core.common import hashDBWrite
from lib.core.common import isListLike
from lib.core.convert import getUnicode
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.dicts import SQL_STATEMENTS
from lib.core.enums import CUSTOM_LOGGING
from lib.core.enums import DBMS
from lib.core.enums import EXPECTED
from lib.core.enums import TIMEOUT_STATE
from lib.core.settings import UNICODE_ENCODING
from lib.utils.safe2bin import safecharencode
from lib.utils.timeout import timeout
def direct(query, content=True):
"""
直接执行SQL查询的主函数
参数:
query: 要执行的SQL查询语句
content: 是否返回查询内容,默认为True
"""
# 标记是否为SELECT查询
select = True
# 处理查询语句,添加必要的payload
query = agent.payloadDirect(query)
query = agent.adjustLateValues(query)
# 获取当前线程数据
threadData = getCurrentThreadData()
# 针对Oracle数据库的特殊处理:如果是不带FROM的SELECT语句,添加"FROM DUAL"
if Backend.isDbms(DBMS.ORACLE) and query.upper().startswith("SELECT ") and " FROM " not in query.upper():
query = "%s FROM DUAL" % query
# 通过遍历SQL语句字典判断是否为SELECT查询
for sqlTitle, sqlStatements in SQL_STATEMENTS.items():
for sqlStatement in sqlStatements:
if query.lower().startswith(sqlStatement) and sqlTitle != "SQL SELECT statement":
select = False
break
# 如果是SELECT查询,进行相应处理
if select:
# 如果查询不以SELECT开头,添加SELECT
if re.search(r"(?i)\ASELECT ", query) is None:
query = "SELECT %s" % query
# 处理二进制字段
if conf.binaryFields:
for field in conf.binaryFields:
field = field.strip()
if re.search(r"\b%s\b" % re.escape(field), query):
query = re.sub(r"\b%s\b" % re.escape(field), agent.hexConvertField(field), query)
# 记录查询语句到日志
logger.log(CUSTOM_LOGGING.PAYLOAD, query)
# 尝试从缓存中获取查询结果
output = hashDBRetrieve(query, True, True)
start = time.time()
# 执行查询
if not select and re.search(r"(?i)\bEXEC ", query) is None:
# 非SELECT且非EXEC语句的执行
timeout(func=conf.dbmsConnector.execute, args=(query,), duration=conf.timeout, default=None)
elif not (output and ("%soutput" % conf.tablePrefix) not in query and ("%sfile" % conf.tablePrefix) not in query):
# SELECT查询的执行
output, state = timeout(func=conf.dbmsConnector.select, args=(query,), duration=conf.timeout, default=None)
if state == TIMEOUT_STATE.NORMAL:
# 正常执行完成,将结果写入缓存
hashDBWrite(query, output, True)
elif state == TIMEOUT_STATE.TIMEOUT:
# 超时处理:关闭连接并重新连接
conf.dbmsConnector.close()
conf.dbmsConnector.connect()
elif output:
# 如果有缓存结果,显示提示信息
infoMsg = "resumed: %s..." % getUnicode(output, UNICODE_ENCODING)[:20]
logger.info(infoMsg)
# 记录查询执行时间
threadData.lastQueryDuration = calculateDeltaSeconds(start)
# 处理返回结果
if not output:
return output
elif content:
# 如果需要返回内容
if output and isListLike(output):
if len(output[0]) == 1:
# 如果结果只有一列,简化输出格式
output = [_[0] for _ in output]
# 转换为Unicode格式
retVal = getUnicode(output, noneToNull=True)
# 根据配置决定是否进行安全字符编码
return safecharencode(retVal) if kb.safeCharEncode else retVal
else:
# 如果不需要返回内容,提取预期的布尔值
return extractExpectedValue(output, EXPECTED.BOOL)