You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sqlmap/src/sqlmap-master/lib/core/testing.py

309 lines
11 KiB

#!/usr/bin/env python
"""
Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
# 导入所需的标准库
import doctest # 用于运行文档测试
import logging # 用于日志记录
import os # 用于操作系统相关功能
import random # 用于生成随机数
import re # 用于正则表达式操作
import socket # 用于网络通信
import sqlite3 # 用于SQLite数据库操作
import sys # 用于系统相关功能
import tempfile # 用于创建临时文件
import threading # 用于多线程操作
import time # 用于时间相关操作
# 导入自定义模块
from extra.vulnserver import vulnserver # 导入漏洞测试服务器
from lib.core.common import clearConsoleLine # 用于清除控制台行
from lib.core.common import dataToStdout # 用于向标准输出写数据
from lib.core.common import randomInt # 用于生成随机整数
from lib.core.common import randomStr # 用于生成随机字符串
from lib.core.common import shellExec # 用于执行shell命令
from lib.core.compat import round # 用于数字四舍五入
from lib.core.convert import encodeBase64 # 用于Base64编码
from lib.core.data import kb # 用于存储全局知识库数据
from lib.core.data import logger # 用于日志记录
from lib.core.data import paths # 用于存储路径信息
from lib.core.data import queries # 用于存储SQL查询
from lib.core.patch import unisonRandom # 用于随机数生成
from lib.core.settings import IS_WIN # 用于判断是否Windows系统
def vulnTest():
"""
运行针对'vulnserver'的漏洞测试
这个函数执行一系列预定义的测试用例来验证sqlmap的功能
"""
# 定义测试用例元组,每个测试用例包含命令行选项和预期检查项
TESTS = (
("-h", ("to see full list of options run with '-hh'",)), # 帮助信息测试
("--dependencies", ("sqlmap requires", "third-party library")), # 依赖检查测试
# ... 更多测试用例 ...
)
retVal = True # 存储测试结果
count = 0 # 测试计数器
# 寻找可用的端口
while True:
address, port = "127.0.0.1", random.randint(10000, 65535)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex((address, port)): # 尝试连接端口
break
else:
time.sleep(1)
finally:
s.close()
# 定义运行漏洞服务器的线程函数
def _thread():
vulnserver.init(quiet=True)
vulnserver.run(address=address, port=port)
vulnserver._alive = True
# 启动漏洞服务器线程
thread = threading.Thread(target=_thread)
thread.daemon = True
thread.start()
# 等待服务器启动完成
while vulnserver._alive:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((address, port))
s.sendall(b"GET / HTTP/1.1\r\n\r\n")
result = b""
while True:
current = s.recv(1024)
if not current:
break
else:
result += current
if b"vulnserver" in result:
break
except:
pass
finally:
s.close()
time.sleep(1)
# 检查服务器是否成功启动
if not vulnserver._alive:
logger.error("problem occurred in vulnserver instantiation (address: 'http://%s:%s')" % (address, port))
return False
else:
logger.info("vulnserver running at 'http://%s:%s'..." % (address, port))
# 创建临时配置文件
handle, config = tempfile.mkstemp(suffix=".conf")
os.close(handle)
# 创建临时SQLite数据库
handle, database = tempfile.mkstemp(suffix=".sqlite")
os.close(handle)
# 初始化数据库架构
with sqlite3.connect(database) as conn:
c = conn.cursor()
c.executescript(vulnserver.SCHEMA)
# 创建临时请求文件
handle, request = tempfile.mkstemp(suffix=".req")
os.close(handle)
# 创建临时日志文件
handle, log = tempfile.mkstemp(suffix=".log")
os.close(handle)
# 创建临时多目标文件
handle, multiple = tempfile.mkstemp(suffix=".lst")
os.close(handle)
# 准备HTTP请求内容
content = "POST / HTTP/1.0\nUser-Agent: foobar\nHost: %s:%s\n\nid=1\n" % (address, port)
with open(request, "w+") as f:
f.write(content)
f.flush()
# 准备日志内容
content = '<port>%d</port><request base64="true"><![CDATA[%s]]></request>' % (port, encodeBase64(content, binary=False))
with open(log, "w+") as f:
f.write(content)
f.flush()
# 设置基本URL和测试参数
base = "http://%s:%d/" % (address, port)
url = "%s?id=1" % base
direct = "sqlite3://%s" % database
tmpdir = tempfile.mkdtemp()
# 读取并修改配置文件
with open(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "sqlmap.conf"))) as f:
content = f.read().replace("url =", "url = %s" % url)
with open(config, "w+") as f:
f.write(content)
f.flush()
# 准备多目标测试文件
content = "%s?%s=%d\n%s?%s=%d\n%s&%s=1" % (base, randomStr(), randomInt(), base, randomStr(), randomInt(), url, randomStr())
with open(multiple, "w+") as f:
f.write(content)
f.flush()
# 执行所有测试用例
for options, checks in TESTS:
status = '%d/%d (%d%%) ' % (count, len(TESTS), round(100.0 * count / len(TESTS)))
dataToStdout("\r[%s] [INFO] complete: %s" % (time.strftime("%X"), status))
# Windows系统特殊字符处理
if IS_WIN and "uraj" in options:
options = options.replace(u"\u0161u\u0107uraj", "sucuraj")
checks = [check.replace(u"\u0161u\u0107uraj", "sucuraj") for check in checks]
# 替换测试命令中的占位符
for tag, value in (("<url>", url), ("<base>", base), ("<direct>", direct), ("<tmpdir>", tmpdir),
("<request>", request), ("<log>", log), ("<multiple>", multiple),
("<config>", config), ("<base64>", url.replace("id=1", "id=MZ=%3d"))):
options = options.replace(tag, value)
# 构建完整的测试命令
cmd = "%s \"%s\" %s --batch --non-interactive --debug --time-sec=1" % (
sys.executable if ' ' not in sys.executable else '"%s"' % sys.executable,
os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "sqlmap.py")),
options
)
# 处理临时文件
if "<tmpfile>" in cmd:
handle, tmp = tempfile.mkstemp()
os.close(handle)
cmd = cmd.replace("<tmpfile>", tmp)
# 执行测试命令并检查输出
output = shellExec(cmd)
# 验证测试结果
if not all((check in output if not check.startswith('~') else check[1:] not in output) for check in checks) or "unhandled exception" in output:
dataToStdout("---\n\n$ %s\n" % cmd)
dataToStdout("%s---\n" % output, coloring=False)
retVal = False
count += 1
# 清理并显示最终结果
clearConsoleLine()
if retVal:
logger.info("vuln test final result: PASSED")
else:
logger.error("vuln test final result: FAILED")
return retVal
def smokeTest():
"""
运行程序的基本冒烟测试
验证基本功能是否正常工作
"""
unisonRandom() # 初始化随机数生成器
# 验证错误正则表达式的有效性
with open(paths.ERRORS_XML, "r") as f:
content = f.read()
for regex in re.findall(r'<error regexp="(.+?)"/>', content):
try:
re.compile(regex)
except re.error:
errMsg = "smoke test failed at compiling '%s'" % regex
logger.error(errMsg)
return False
retVal = True
count, length = 0, 0
# 统计需要测试的Python文件数量
for root, _, files in os.walk(paths.SQLMAP_ROOT_PATH):
if any(_ in root for _ in ("thirdparty", "extra", "interbase")):
continue
for filename in files:
if os.path.splitext(filename)[1].lower() == ".py" and filename != "__init__.py":
length += 1
# 对每个Python文件进行测试
for root, _, files in os.walk(paths.SQLMAP_ROOT_PATH):
if any(_ in root for _ in ("thirdparty", "extra", "interbase")):
continue
for filename in files:
if os.path.splitext(filename)[1].lower() == ".py" and filename not in ("__init__.py", "gui.py"):
path = os.path.join(root, os.path.splitext(filename)[0])
path = path.replace(paths.SQLMAP_ROOT_PATH, '.')
path = path.replace(os.sep, '.').lstrip('.')
try:
__import__(path)
module = sys.modules[path]
except Exception as ex:
retVal = False
dataToStdout("\r")
errMsg = "smoke test failed at importing module '%s' (%s):\n%s" % (path, os.path.join(root, filename), ex)
logger.error(errMsg)
else:
logger.setLevel(logging.CRITICAL)
kb.smokeMode = True
# 运行文档测试
(failure_count, _) = doctest.testmod(module)
kb.smokeMode = False
logger.setLevel(logging.INFO)
if failure_count > 0:
retVal = False
count += 1
status = '%d/%d (%d%%) ' % (count, length, round(100.0 * count / length))
dataToStdout("\r[%s] [INFO] complete: %s" % (time.strftime("%X"), status))
# 验证正则表达式的递归函数
def _(node):
for __ in dir(node):
if not __.startswith('_'):
candidate = getattr(node, __)
if isinstance(candidate, str):
if '\\' in candidate:
try:
re.compile(candidate)
except:
errMsg = "smoke test failed at compiling '%s'" % candidate
logger.error(errMsg)
raise
else:
_(candidate)
# 验证所有数据库查询中的正则表达式
for dbms in queries:
try:
_(queries[dbms])
except:
retVal = False
# 显示最终测试结果
clearConsoleLine()
if retVal:
logger.info("smoke test final result: PASSED")
else:
logger.error("smoke test final result: FAILED")
return retVal