You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client-1/urlscan/xray/app.py

88 lines
3.2 KiB

from flask import Flask, request
from urllib.parse import urlparse
from client.database import session, SrcVul
app = Flask(__name__)
NUM_SCAN = 1
@app.route('/webhook', methods=['POST'])
def xray_webhook():
try:
# 尝试将请求中的 JSON 数据解析为字典
vuln = request.json
except:
# 如果解析失败,不进行任何操作
pass
else:
# 如果成功解析为 JSON 数据
if 'create_time' in vuln:
# 获取漏洞插件名称和漏洞分类,并拼接成字符串
plugin = vuln.get('plugin', '') + ' ' + vuln.get('vuln_class', '')
# 获取漏洞所在的 URL
url = vuln['detail'].get('url')
# 获取漏洞的 payload
payload = vuln['detail'].get('payload', '')
# 获取漏洞参数
param = str(vuln['detail'].get('param', ''))
# 获取漏洞的请求信息
raw = vuln['detail'].get('request', '')
if param:
# 如果有参数,将参数和请求信息拼接
raws = param + '\n\n' + raw
else:
# 如果没有参数,直接使用请求信息
raws = raw
# 打印新漏洞信息
print(f'Xray 新漏洞:[{plugin}]-{url}')
# 将漏洞信息写入漏洞数据库
WriteVul(plugin, url, payload, raws, scan_name='xray')
else:
# 如果请求中没有 'create_time',检查是否有 'num_found_urls'
if 'num_found_urls' in vuln:
# 获取已发现的 URL 数量
num_found_urls = vuln.get('num_found_urls', 1)
# 获取已扫描的 URL 数量
num_scanned_urls = vuln.get('num_scanned_urls', 1)
# 计算未扫描的 URL 数量
pending = int(num_found_urls) - int(num_scanned_urls)
global NUM_SCAN
# 更新全局变量 NUM_SCAN
NUM_SCAN = pending
# 打印当前队列中的 URL 数量
print(f'Xray 当前队列[{NUM_SCAN}]')
finally:
# 无论如何都返回 "ok"
return "ok"
def WriteVul(plugin, url, payload, raw, scan_name):
'''漏洞入库'''
try:
# 解析 URL 获取主机名
host = urlparse(url).hostname
except Exception as e:
# 如果解析失败,打印错误信息并将主机名设为空字符串
print(f'Xray 解析 url 格式失败:{url}')
host = ''
else:
# 创建漏洞对象
vul_sql = SrcVul(vul_subdomain=host, vul_plugin=plugin, vul_url=url, vul_payload=payload, vul_raw=raw,
vul_scan_name=scan_name)
# 将漏洞对象添加到数据库会话中
session.add(vul_sql)
try:
# 提交事务,将漏洞信息写入数据库
session.commit()
except Exception as e:
# 如果写入失败,打印错误信息
print(f'xray 漏洞入库失败:{e}')
else:
# 如果写入成功,打印成功信息
print(f'Xray 漏洞入库成功:{url}')
def main():
app.run(port=8899)
if __name__ == '__main__':
main()