@ -5,200 +5,211 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
import ntpath
import re
from lib.core.common import Backend
from lib.core.common import hashDBWrite
from lib.core.common import isStackingAvailable
from lib.core.common import normalizePath
from lib.core.common import ntToPosixSlashes
from lib.core.common import posixToNtSlashes
from lib.core.common import readInput
from lib.core.common import singleTimeDebugMessage
from lib.core.common import unArrayizeValue
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.data import queries
from lib.core.enums import DBMS
from lib.core.enums import HASHDB_KEYS
from lib.core.enums import OS
from lib.core.exception import SqlmapNoneDataException
from lib.request import inject
# 导入必要的模块
import ntpath # 导入 ntpath 模块,用于处理 Windows 路径
import re # 导入 re 模块,用于正则表达式
from lib.core.common import Backend # 导入 Backend 类,用于访问后端数据库信息
from lib.core.common import hashDBWrite # 导入 hashDBWrite 函数,用于写入哈希数据库
from lib.core.common import isStackingAvailable # 导入 isStackingAvailable 函数,用于检查是否支持堆叠查询
from lib.core.common import normalizePath # 导入 normalizePath 函数,用于规范化路径
from lib.core.common import ntToPosixSlashes # 导入 ntToPosixSlashes 函数,用于将 Windows 路径转换为 POSIX 路径
from lib.core.common import posixToNtSlashes # 导入 posixToNtSlashes 函数,用于将 POSIX 路径转换为 Windows 路径
from lib.core.common import readInput # 导入 readInput 函数,用于读取用户输入
from lib.core.common import singleTimeDebugMessage # 导入 singleTimeDebugMessage 函数,用于输出单次调试信息
from lib.core.common import unArrayizeValue # 导入 unArrayizeValue 函数,用于将数组值转换为单个值
from lib.core.data import conf # 导入 conf 对象,用于访问全局配置信息
from lib.core.data import kb # 导入 kb 对象,用于访问全局知识库
from lib.core.data import logger # 导入 logger 对象,用于输出日志
from lib.core.data import queries # 导入 queries 字典,存储数据库查询语句
from lib.core.enums import DBMS # 导入 DBMS 枚举,定义数据库管理系统类型
from lib.core.enums import HASHDB_KEYS # 导入 HASHDB_KEYS 枚举,定义哈希数据库键值
from lib.core.enums import OS # 导入 OS 枚举,定义操作系统类型
from lib.core.exception import SqlmapNoneDataException # 导入 SqlmapNoneDataException 异常类
from lib.request import inject # 导入 inject 函数,用于执行 SQL 注入请求
# 定义 Miscellaneous 类,用于实现杂项功能
class Miscellaneous(object):
This class defines miscellaneous functionalities for plugins.
# 初始化 Miscellaneous 类
def __init__(self):
# 定义 getRemoteTempPath 方法,用于获取远程临时路径
def getRemoteTempPath(self):
if not conf.tmpPath and Backend.isDbms(DBMS.MSSQL):
debugMsg = "identifying Microsoft SQL Server error log directory "
if not conf.tmpPath and Backend.isDbms(DBMS.MSSQL): # 如果没有设置临时路径且数据库是 MSSQL
debugMsg = "identifying Microsoft SQL Server error log directory " # 输出调试信息
debugMsg += "that sqlmap will use to store temporary files with "
debugMsg += "commands' output"
_ = unArrayizeValue(inject.getValue("SELECT SERVERPROPERTY('ErrorLogFileName')", safeCharEncode=False))
_ = unArrayizeValue(inject.getValue("SELECT SERVERPROPERTY('ErrorLogFileName')", safeCharEncode=False)) # 获取错误日志文件路径
if _:
conf.tmpPath = ntpath.dirname(_)
if _: # 如果获取到路径
conf.tmpPath = ntpath.dirname(_) # 设置临时路径为错误日志文件所在的目录
if not conf.tmpPath:
if Backend.isOs(OS.WINDOWS):
if conf.direct:
conf.tmpPath = "%TEMP%"
if not conf.tmpPath: # 如果没有设置临时路径
if Backend.isOs(OS.WINDOWS): # 如果操作系统是 Windows
if conf.direct: # 如果是直接连接
conf.tmpPath = "%TEMP%" # 设置临时路径为 %TEMP%
self.checkDbmsOs(detailed=True) # 检测数据库操作系统
if Backend.getOsVersion() in ("2000", "NT"):
conf.tmpPath = "C:/WINNT/Temp"
elif Backend.isOs("XP"):
conf.tmpPath = "C:/Documents and Settings/All Users/Application Data/Temp"
conf.tmpPath = "C:/Windows/Temp"
conf.tmpPath = "/tmp"
if Backend.getOsVersion() in ("2000", "NT"): # 如果是 Windows 2000 或 NT
conf.tmpPath = "C:/WINNT/Temp" # 设置临时路径
elif Backend.isOs("XP"): # 如果是 Windows XP
conf.tmpPath = "C:/Documents and Settings/All Users/Application Data/Temp" # 设置临时路径
else: # 如果是其他 Windows 版本
conf.tmpPath = "C:/Windows/Temp" # 设置临时路径
else: # 如果操作系统不是 Windows
conf.tmpPath = "/tmp" # 设置临时路径为 /tmp
if re.search(r"\A[\w]:[\/\\]+", conf.tmpPath, re.I):
if re.search(r"\A[\w]:[\/\$$+", conf.tmpPath, re.I): # 如果临时路径是 Windows 格式
Backend.setOs(OS.WINDOWS) # 设置操作系统为 Windows
conf.tmpPath = normalizePath(conf.tmpPath)
conf.tmpPath = ntToPosixSlashes(conf.tmpPath)
conf.tmpPath = normalizePath(conf.tmpPath) # 规范化临时路径
conf.tmpPath = ntToPosixSlashes(conf.tmpPath) # 将临时路径转换为 POSIX 格式
singleTimeDebugMessage("going to use '%s' as temporary files directory" % conf.tmpPath)
singleTimeDebugMessage("going to use '%s' as temporary files directory" % conf.tmpPath) # 输出调试信息
hashDBWrite(HASHDB_KEYS.CONF_TMP_PATH, conf.tmpPath)
hashDBWrite(HASHDB_KEYS.CONF_TMP_PATH, conf.tmpPath) # 写入哈希数据库
return conf.tmpPath
return conf.tmpPath # 返回临时路径
# 定义 getVersionFromBanner 方法,用于从 banner 中获取数据库版本
def getVersionFromBanner(self):
if "dbmsVersion" in kb.bannerFp:
if "dbmsVersion" in kb.bannerFp: # 如果 banner 中已经有版本信息
return # 直接返回
infoMsg = "detecting back-end DBMS version from its banner"
infoMsg = "detecting back-end DBMS version from its banner" # 输出信息
query = queries[Backend.getIdentifiedDbms()].banner.query
query = queries[Backend.getIdentifiedDbms()].banner.query # 获取 banner 查询语句
if conf.direct:
query = "SELECT %s" % query
if conf.direct: # 如果是直接连接
query = "SELECT %s" % query # 添加 SELECT 关键字
kb.bannerFp["dbmsVersion"] = unArrayizeValue(inject.getValue(query)) or ""
kb.bannerFp["dbmsVersion"] = unArrayizeValue(inject.getValue(query)) or "" # 获取 banner 信息
match = re.search(r"\d[\d.-]*", kb.bannerFp["dbmsVersion"])
if match:
kb.bannerFp["dbmsVersion"] = match.group(0)
match = re.search(r"\d[\d.-]*", kb.bannerFp["dbmsVersion"]) # 使用正则表达式匹配版本号
if match: # 如果匹配成功
kb.bannerFp["dbmsVersion"] = match.group(0) # 设置版本号
# 定义 delRemoteFile 方法,用于删除远程文件
def delRemoteFile(self, filename):
if not filename:
if not filename: # 如果文件名为空
return # 直接返回
self.checkDbmsOs() # 检测数据库操作系统
if Backend.isOs(OS.WINDOWS):
filename = posixToNtSlashes(filename)
cmd = "del /F /Q %s" % filename
cmd = "rm -f %s" % filename
if Backend.isOs(OS.WINDOWS): # 如果操作系统是 Windows
filename = posixToNtSlashes(filename) # 将路径转换为 Windows 格式
cmd = "del /F /Q %s" % filename # 构建删除命令
else: # 如果操作系统不是 Windows
cmd = "rm -f %s" % filename # 构建删除命令
self.execCmd(cmd, silent=True)
self.execCmd(cmd, silent=True) # 执行删除命令
# 定义 createSupportTbl 方法,用于创建支持表
def createSupportTbl(self, tblName, tblField, tblType):
inject.goStacked("DROP TABLE %s" % tblName, silent=True)
inject.goStacked("DROP TABLE %s" % tblName, silent=True) # 删除表(如果存在)
if Backend.isDbms(DBMS.MSSQL) and tblName == self.cmdTblName:
inject.goStacked("CREATE TABLE %s(id INT PRIMARY KEY IDENTITY, %s %s)" % (tblName, tblField, tblType))
inject.goStacked("CREATE TABLE %s(%s %s)" % (tblName, tblField, tblType))
if Backend.isDbms(DBMS.MSSQL) and tblName == self.cmdTblName: # 如果是 MSSQL 并且表名是命令表
inject.goStacked("CREATE TABLE %s(id INT PRIMARY KEY IDENTITY, %s %s)" % (tblName, tblField, tblType)) # 创建表,包含自增 id
else: # 如果不是 MSSQL 或表名不是命令表
inject.goStacked("CREATE TABLE %s(%s %s)" % (tblName, tblField, tblType)) # 创建表
# 定义 cleanup 方法,用于清理文件系统和数据库
def cleanup(self, onlyFileTbl=False, udfDict=None, web=False):
Cleanup file system and database from sqlmap create files, tables
and functions
if web and self.webBackdoorFilePath:
logger.info("cleaning up the web files uploaded")
if web and self.webBackdoorFilePath: # 如果是 web 模式且有 web 后门文件路径
logger.info("cleaning up the web files uploaded") # 输出信息
self.delRemoteFile(self.webStagerFilePath) # 删除 web stager 文件
self.delRemoteFile(self.webBackdoorFilePath) # 删除 web 后门文件
if (not isStackingAvailable() or kb.udfFail) and not conf.direct:
if (not isStackingAvailable() or kb.udfFail) and not conf.direct: # 如果不支持堆叠查询或 udf失败且不是直接连接
return # 直接返回
if any((conf.osCmd, conf.osShell)) and Backend.isDbms(DBMS.PGSQL) and kb.copyExecTest:
if any((conf.osCmd, conf.osShell)) and Backend.isDbms(DBMS.PGSQL) and kb.copyExecTest: # 如果执行系统命令/shell 且是 PostgreSQL 且 copyExecTest 为 True
return # 直接返回
if Backend.isOs(OS.WINDOWS):
libtype = "dynamic-link library"
if Backend.isOs(OS.WINDOWS): # 如果操作系统是 Windows
libtype = "dynamic-link library" # 设置库类型为动态链接库
elif Backend.isOs(OS.LINUX):
libtype = "shared object"
elif Backend.isOs(OS.LINUX): # 如果操作系统是 Linux
libtype = "shared object" # 设置库类型为共享对象
libtype = "shared library"
else: # 如果是其他操作系统
libtype = "shared library" # 设置库类型为共享库
if onlyFileTbl:
logger.debug("cleaning up the database management system")
logger.info("cleaning up the database management system")
if onlyFileTbl: # 如果只清理文件表
logger.debug("cleaning up the database management system") # 输出调试信息
else: # 如果清理所有
logger.info("cleaning up the database management system") # 输出信息
logger.debug("removing support tables")
inject.goStacked("DROP TABLE %s" % self.fileTblName, silent=True)
inject.goStacked("DROP TABLE %shex" % self.fileTblName, silent=True)
logger.debug("removing support tables") # 输出调试信息
inject.goStacked("DROP TABLE %s" % self.fileTblName, silent=True) # 删除文件表
inject.goStacked("DROP TABLE %shex" % self.fileTblName, silent=True) # 删除文件表 (hex)
if not onlyFileTbl:
inject.goStacked("DROP TABLE %s" % self.cmdTblName, silent=True)
if not onlyFileTbl: # 如果不是只清理文件表
inject.goStacked("DROP TABLE %s" % self.cmdTblName, silent=True) # 删除命令表
if Backend.isDbms(DBMS.MSSQL):
udfDict = {"master..new_xp_cmdshell": {}}
if Backend.isDbms(DBMS.MSSQL): # 如果是 MSSQL
udfDict = {"master..new_xp_cmdshell": {}} # 设置要删除的 udf (xp_cmdshell)
if udfDict is None:
udfDict = getattr(self, "sysUdfs", {})
if udfDict is None: # 如果 udfDict 为空
udfDict = getattr(self, "sysUdfs", {}) # 获取系统 udf
for udf, inpRet in udfDict.items():
message = "do you want to remove UDF '%s'? [Y/n] " % udf
for udf, inpRet in udfDict.items(): # 遍历 udf
message = "do you want to remove UDF '%s'? [Y/n] " % udf # 输出询问信息
if readInput(message, default='Y', boolean=True):
dropStr = "DROP FUNCTION %s" % udf
if readInput(message, default='Y', boolean=True): # 获取用户输入
dropStr = "DROP FUNCTION %s" % udf # 构建删除 udf 命令
if Backend.isDbms(DBMS.PGSQL):
inp = ", ".join(i for i in inpRet["input"])
dropStr += "(%s)" % inp
if Backend.isDbms(DBMS.PGSQL): # 如果是 PostgreSQL
inp = ", ".join(i for i in inpRet["input"]) # 获取输入参数
dropStr += "(%s)" % inp # 添加输入参数到删除命令
logger.debug("removing UDF '%s'" % udf)
inject.goStacked(dropStr, silent=True)
logger.debug("removing UDF '%s'" % udf) # 输出调试信息
inject.goStacked(dropStr, silent=True) # 删除 udf
logger.info("database management system cleanup finished")
logger.info("database management system cleanup finished") # 输出信息
warnMsg = "remember that UDF %s files " % libtype
warnMsg = "remember that UDF %s files " % libtype # 构建警告信息
if conf.osPwn:
warnMsg += "and Metasploit related files in the temporary "
if conf.osPwn: # 如果开启 osPwn 功能
warnMsg += "and Metasploit related files in the temporary " # 添加 Metasploit 相关文件信息
warnMsg += "folder "
warnMsg += "saved on the file system can only be deleted "
warnMsg += "saved on the file system can only be deleted " # 添加手动删除信息
warnMsg += "manually"
logger.warning(warnMsg) # 输出警告信息
# 定义 likeOrExact 方法,用于选择 LIKE 或精确匹配
def likeOrExact(self, what):
message = "do you want sqlmap to consider provided %s(s):\n" % what
message += "[1] as LIKE %s names (default)\n" % what
message = "do you want sqlmap to consider provided %s(s):\
" % what # 构建询问信息
message += "[1] as LIKE %s names (default)\
" % what
message += "[2] as exact %s names" % what
choice = readInput(message, default='1')
choice = readInput(message, default='1') # 获取用户选择
if not choice or choice == '1':
if not choice or choice == '1': # 如果选择 LIKE 匹配
choice = '1'
condParam = " LIKE '%%%s%%'"
elif choice == '2':
condParam = "='%s'"
errMsg = "invalid value"
raise SqlmapNoneDataException(errMsg)
return choice, condParam
condParam = " LIKE '%%%s%%'" # 设置 LIKE 条件
elif choice == '2': # 如果选择精确匹配
condParam = "='%s'" # 设置精确匹配条件
else: # 如果输入非法
errMsg = "invalid value" # 输出错误信息
raise SqlmapNoneDataException(errMsg) # 抛出异常
return choice, condParam # 返回选择和条件