You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sqlmap/src/sqlmap-master/lib/utils/har.py

258 lines
9.6 KiB

#!/usr/bin/env python
"""
Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
# 导入所需的Python标准库
import base64 # 用于Base64编码解码
import datetime # 处理日期和时间
import io # 处理流式IO操作
import re # 正则表达式支持
import time # 时间相关功能
# 导入自定义和第三方库
from lib.core.bigarray import BigArray # 用于处理大型数组
from lib.core.convert import getBytes # 字符串转字节函数
from lib.core.convert import getText # 字节转字符串函数
from lib.core.settings import VERSION # 获取版本信息
from thirdparty.six.moves import BaseHTTPServer as _BaseHTTPServer # HTTP服务器基类
from thirdparty.six.moves import http_client as _http_client # HTTP客户端
# HAR(HTTP Archive)格式参考文档
# Reference: https://dvcs.w3.org/hg/webperf/raw-file/tip/specs/HAR/Overview.html
# http://www.softwareishard.com/har/viewer/
class HTTPCollectorFactory(object):
"""HTTP收集器工厂类,用于创建HTTP收集器实例"""
def __init__(self, harFile=False):
self.harFile = harFile
def create(self):
"""创建并返回一个新的HTTP收集器实例"""
return HTTPCollector()
class HTTPCollector(object):
"""HTTP收集器类,用于收集HTTP请求和响应信息"""
def __init__(self):
self.messages = BigArray() # 存储请求-响应对
self.extendedArguments = {} # 存储扩展参数
def setExtendedArguments(self, arguments):
"""设置扩展参数"""
self.extendedArguments = arguments
def collectRequest(self, requestMessage, responseMessage, startTime=None, endTime=None):
"""收集一对请求-响应消息"""
self.messages.append(RawPair(requestMessage, responseMessage,
startTime=startTime, endTime=endTime,
extendedArguments=self.extendedArguments))
def obtain(self):
"""获取HAR格式的日志数据"""
return {"log": {
"version": "1.2",
"creator": {"name": "sqlmap", "version": VERSION},
"entries": [pair.toEntry().toDict() for pair in self.messages],
}}
class RawPair(object):
"""原始请求-响应对类"""
def __init__(self, request, response, startTime=None, endTime=None, extendedArguments=None):
self.request = getBytes(request) # 请求数据
self.response = getBytes(response) # 响应数据
self.startTime = startTime # 开始时间
self.endTime = endTime # 结束时间
self.extendedArguments = extendedArguments or {} # 扩展参数
def toEntry(self):
"""转换为Entry对象"""
return Entry(request=Request.parse(self.request), response=Response.parse(self.response),
startTime=self.startTime, endTime=self.endTime,
extendedArguments=self.extendedArguments)
class Entry(object):
"""HAR条目类,表示一个完整的请求-响应交互"""
def __init__(self, request, response, startTime, endTime, extendedArguments):
self.request = request # 请求对象
self.response = response # 响应对象
self.startTime = startTime or 0 # 开始时间
self.endTime = endTime or 0 # 结束时间
self.extendedArguments = extendedArguments # 扩展参数
def toDict(self):
"""转换为字典格式"""
out = {
"request": self.request.toDict(),
"response": self.response.toDict(),
"cache": {}, # 缓存信息
"timings": { # 时间统计
"send": -1,
"wait": -1,
"receive": -1,
},
"time": int(1000 * (self.endTime - self.startTime)), # 总耗时(毫秒)
"startedDateTime": "%s%s" % (datetime.datetime.fromtimestamp(self.startTime).isoformat(), time.strftime("%z")) if self.startTime else None
}
out.update(self.extendedArguments)
return out
class Request(object):
"""HTTP请求类"""
def __init__(self, method, path, httpVersion, headers, postBody=None, raw=None, comment=None):
self.method = method # 请求方法(GET/POST等)
self.path = path # 请求路径
self.httpVersion = httpVersion # HTTP版本
self.headers = headers or {} # 请求头
self.postBody = postBody # POST请求体
self.comment = comment.strip() if comment else comment # 注释
self.raw = raw # 原始请求数据
@classmethod
def parse(cls, raw):
"""解析原始请求数据"""
request = HTTPRequest(raw)
return cls(method=request.command,
path=request.path,
httpVersion=request.request_version,
headers=request.headers,
postBody=request.rfile.read(),
comment=request.comment,
raw=raw)
@property
def url(self):
"""构建完整URL"""
host = self.headers.get("Host", "unknown")
return "http://%s%s" % (host, self.path)
def toDict(self):
"""转换为字典格式"""
out = {
"httpVersion": self.httpVersion,
"method": self.method,
"url": self.url,
"headers": [dict(name=key.capitalize(), value=value) for key, value in self.headers.items()],
"cookies": [],
"queryString": [],
"headersSize": -1,
"bodySize": -1,
"comment": getText(self.comment),
}
if self.postBody:
contentType = self.headers.get("Content-Type")
out["postData"] = {
"mimeType": contentType,
"text": getText(self.postBody).rstrip("\r\n"),
}
return out
class Response(object):
"""HTTP响应类"""
extract_status = re.compile(b'\\((\\d{3}) (.*)\\)') # 用于提取状态码的正则表达式
def __init__(self, httpVersion, status, statusText, headers, content, raw=None, comment=None):
self.raw = raw # 原始响应数据
self.httpVersion = httpVersion # HTTP版本
self.status = status # 状态码
self.statusText = statusText # 状态描述
self.headers = headers # 响应头
self.content = content # 响应内容
self.comment = comment.strip() if comment else comment # 注释
@classmethod
def parse(cls, raw):
"""解析原始响应数据"""
altered = raw
comment = b""
# 处理特殊格式的响应
if altered.startswith(b"HTTP response [") or altered.startswith(b"HTTP redirect ["):
stream = io.BytesIO(raw)
first_line = stream.readline()
parts = cls.extract_status.search(first_line)
status_line = "HTTP/1.0 %s %s" % (getText(parts.group(1)), getText(parts.group(2)))
remain = stream.read()
altered = getBytes(status_line) + b"\r\n" + remain
comment = first_line
response = _http_client.HTTPResponse(FakeSocket(altered))
response.begin()
try:
content = response.read()
except _http_client.IncompleteRead:
content = raw[raw.find(b"\r\n\r\n") + 4:].rstrip(b"\r\n")
return cls(httpVersion="HTTP/1.1" if response.version == 11 else "HTTP/1.0",
status=response.status,
statusText=response.reason,
headers=response.msg,
content=content,
comment=comment,
raw=raw)
def toDict(self):
"""转换为字典格式"""
content = {
"mimeType": self.headers.get("Content-Type"),
"text": self.content,
"size": len(self.content or "")
}
# 检测是否为二进制内容
binary = set([b'\0', b'\1'])
if any(c in binary for c in self.content):
content["encoding"] = "base64"
content["text"] = getText(base64.b64encode(self.content))
else:
content["text"] = getText(content["text"])
return {
"httpVersion": self.httpVersion,
"status": self.status,
"statusText": self.statusText,
"headers": [dict(name=key.capitalize(), value=value) for key, value in self.headers.items() if key.lower() != "uri"],
"cookies": [],
"content": content,
"headersSize": -1,
"bodySize": -1,
"redirectURL": "",
"comment": getText(self.comment),
}
class FakeSocket(object):
"""模拟Socket类,用于HTTP响应解析
原始来源: https://stackoverflow.com/questions/24728088/python-parse-http-response-string
"""
def __init__(self, response_text):
self._file = io.BytesIO(response_text)
def makefile(self, *args, **kwargs):
return self._file
class HTTPRequest(_BaseHTTPServer.BaseHTTPRequestHandler):
"""HTTP请求处理类
原始来源: https://stackoverflow.com/questions/4685217/parse-raw-http-headers
"""
def __init__(self, request_text):
self.comment = None
self.rfile = io.BytesIO(request_text)
self.raw_requestline = self.rfile.readline()
# 处理特殊格式的请求
if self.raw_requestline.startswith(b"HTTP request ["):
self.comment = self.raw_requestline
self.raw_requestline = self.rfile.readline()
self.error_code = self.error_message = None
self.parse_request()
def send_error(self, code, message):
"""记录错误信息"""
self.error_code = code
self.error_message = message