You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client/subdomain/oneforall/sbudomain_run.py

291 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../../')
import pkgutil
import json
import time
import datetime
import socket
from IPy import IP as IPyIp
import re
from client.database import session, SrcTask, SrcAssets, SrcPorts
from client.subdomain.oneforall.oneforall import OneForAll as OneForAll
from client.webinfo.run import SelectIP, Check_Waf
from client.urlscan.url_probe.urlscan_run import UrlProbe
from client.portscan.portscan_run import PortScan
class SubDomain:
"""
一个用于处理子域名相关操作的类。
"""
def __init__(self, host):
"""
初始化方法。
:param host: 传入的主机名,去除了前缀的"*."
"""
self.host = host.replace('*.', '')
def run(self):
"""
运行方法,调用 OneForAll 进行子域名扫描。
会针对当前对象的 host 属性代表的主机名运行 OneForAll 的扫描任务。
"""
OneForAll(self.host).run()
def pars_dns(self):
"""
解析 DNS 结果的方法。
尝试读取和解析子域名扫描结果的 JSON 文件。
:return: 如果成功解析,返回解析后的字典;否则返回 None。
"""
try:
# 尝试获取指定模块中的数据,这里可能是从子域名扫描结果存储路径获取 JSON 数据
data_bytes = pkgutil.get_data('client.subdomain.oneforall', f'results/{self.host}.json')
except Exception as error:
# 如果获取数据失败,打印错误信息
print(f'[-]子域名[{self.host}]扫描-读取结果失败,{error}')
else:
# 如果成功获取数据,将字节数据转换为字符串
data_str = data_bytes.decode('utf-8')
try:
# 尝试将字符串形式的 JSON 数据转换为字典
dns_dict = json.loads(data_str)
except Exception as error:
# 如果解析 JSON 失败,打印错误信息
print(f'[-]子域名[{self.host}]扫描-解析结果失败,{error}')
else:
# 如果解析成功,返回解析后的字典
return dns_dict
def ReadTask():
'''读取任务'''
# 从数据库中查询任务标志为 False 的第一个任务
task_sql = session.query(SrcTask).filter(SrcTask.task_flag == False).first()
# 提交事务,刷新查询缓存
session.commit()
if task_sql:
# 将任务标志设置为 True表示任务正在处理
task_sql.task_flag = True
# 设置任务处理时间为当前时间
task_sql.task_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 再次提交事务
session.commit()
# 刷新任务对象,确保 task_flag 的更改在当前会话中可见
session.refresh(task_sql, ['task_flag'])
# 返回任务对象,如果没有任务则返回 None
return task_sql
def WriteAssets(dns_list, task_name):
# 遍历传入的 DNS 列表
for info in dns_list:
# 获取 IP 地址,如果有多个 IP 以逗号分隔,则取第一个
ip = info.get('content', '')
try:
if ip.find(','):
ip = ip.split(',')[0]
except Exception as e:
# 如果获取 IP 过程中出现错误,打印错误信息
print(f'[-]子域名多IP获取失败:{e}')
# 获取主机名、状态等信息
host = info.get('url', None)
status = info.get('status', None)
if ip and host and status:
# 统计同名主机在数据库中的数量
asset_count = session.query(SrcAssets).filter(SrcAssets.asset_host == host).count()
session.commit()
if not asset_count:
# 根据 IP 确定地区
area = SelectIP(ip)
# 检查主机是否有 WAF并确定 WAF 类型
flag, waf = Check_Waf(host)
# 创建数据库对象,表示一个资产信息
srcasset_sql = SrcAssets(asset_name=task_name, asset_host=host, asset_subdomain=info.get('subdomain'),
asset_title=info.get('title'),
asset_ip=ip, asset_area=area, asset_waf=waf, asset_cdn=False,
asset_banner=info.get('banner'), asset_info='', asset_whois='')
# 将资产信息添加到数据库会话中
session.add(srcasset_sql)
try:
# 提交事务,将资产信息写入数据库
session.commit()
except Exception as error:
# 如果写入过程中出现错误,回滚事务并打印错误信息
session.rollback()
print(f'[-]子域名入库异常{error}')
# 打印入库完成信息
print(f'[+]子域名[{task_name}]入库完成')
def main():
"""
主函数,持续读取任务并根据任务类型执行相应操作。
"""
print(f'[+]子域名扫描启动')
while True:
# 读取任务
task_sql = ReadTask()
if not task_sql:
# 如果没有任务,等待 30 秒后继续查询
time.sleep(30)
else:
# 根据任务的域名格式判断任务类型
if task_sql.task_domain.startswith('*'): # 子域名扫描
# 创建子域名处理对象
subdomain = SubDomain(task_sql.task_domain)
# 运行子域名扫描
subdomain.run()
# 解析子域名扫描结果
dns_list = subdomain.pars_dns()
if dns_list:
# 将扫描结果写入数据库
WriteAssets(dns_list, task_sql.task_name)
elif task_sql.task_domain.find('/')!= -1: # IP 段扫描
# 执行 IP 段扫描任务
ip_main(task_sql.task_domain, task_sql)
elif check_ip(task_sql.task_domain): # IP 扫描
# 执行 IP 扫描任务
ip_scan(task_sql.task_domain, task_sql)
else: # 子域名扫描
# 执行子域名扫描任务
subdomain_scan(task_sql.task_domain, task_sql.task_name)
def subdomain_scan(host, asset_name):
'''host 探测'''
# 获取指定主机名的 IP 地址,如果获取失败则返回 None
ip = domain_ip(host)
if not ip:
return None
# 创建字典表示 IP 和端口信息
ip_dict = {'ip': host, 'port': 80}
# 创建 URL 探测对象
http_info = UrlProbe(ip_dict)
# 运行 URL 探测
info = http_info.run()
if not info:
return None
# 根据 IP 确定地区
area = SelectIP(ip)
# 检查主机是否有 WAF并确定 WAF 类型
flag, waf = Check_Waf(info['host'])
# 更新探测结果中的地区和 WAF 信息
info['area'] = area
info['waf'] = waf
info['subdomain'] = host
info['ip'] = ip
print('[+]Url 探测完成')
# 将探测结果写入数据库
WriteAsset(info, asset_name)
def domain_ip(subdomain):
'''域名转 IP'''
try:
# 使用 socket 模块将域名转换为 IP 地址
ip = socket.gethostbyname(subdomain)
except:
# 如果转换过程中出现错误,返回 None
return None
else:
# 如果转换成功,返回 IP 地址
return ip
def ip_to_string(IP_info):
'''将 IP 段解析为 IP'''
try:
# 使用 IPyIp 进行 IP 段解析,可能是自定义的函数或类
ip_list = IPyIp(IP_info)
except Exception as e:
# 如果解析过程中出现异常,打印异常信息
print(f'IP 段解析异常:{e}')
else:
# 创建一个空列表用于存储解析后的 IP
ip_list1 = []
for ip in ip_list:
# 将 IP 对象转换为字符串并添加到列表中
ip_list1.append(str(ip))
# 移除列表中的前三个和最后一个元素,具体原因可能与特定的 IP 段处理逻辑有关
ip_list1.pop(0)
ip_list1.pop(0)
ip_list1.pop()
# 返回解析后的 IP 列表
return ip_list1
def ip_main(IP_info, task_sql):
'''端口探测 IP 段'''
# 将 IP 段解析为 IP 列表
ip_list = ip_to_string(IP_info)
if not ip_list:
return None
for ip_tmp in ip_list:
# 创建端口扫描对象,传入单个 IP
portscan = PortScan(ip_tmp)
# 运行端口扫描,获取端口信息字典和漏洞列表
port_dict, vulns_list = portscan.run()
if port_dict:
# 将端口扫描结果写入数据库
WritePosts(port_dict, task_sql, ip_tmp)
def check_ip(ip):
'''检测任务是否为 IP 格式'''
# 使用正则表达式判断输入是否为合法的 IP 格式
p = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if p.match(ip):
return True
else:
return False
def WritePosts(port_dict, assets_sql, ip):
'''端口扫描入库'''
for info in port_dict:
# 创建端口信息数据库对象
port_sql = SrcPorts(port_name=assets_sql.task_name, port_host='http://' + ip, port_ip=ip,
port_port=port_dict[info]['port'], port_service=port_dict[info]['name'],
port_product=port_dict[info]['product'], port_version=port_dict[info]['version'])
# 将端口信息添加到数据库会话中
session.add(port_sql)
try:
# 提交事务,将端口信息写入数据库
session.commit()
except Exception as error:
# 如果写入过程中出现错误,回滚事务并打印错误信息
session.rollback()
print(f'[-]端口入库异常{error}')
# 打印端口入库完成信息
print(f'[+]端口[{ip}]入库完成')
def WriteAsset(http_info, asset_name):
# 统计同名主机在数据库中的数量
asset_count = session.query(SrcAssets).filter(SrcAssets.asset_host == http_info['host']).count()
if not asset_count:
# 创建资产信息数据库对象
srcasset_sql = SrcAssets(asset_name=asset_name, asset_host=http_info['host'],
asset_subdomain=http_info['subdomain'],
asset_title=http_info['title'],
asset_ip=http_info['ip'], asset_area=http_info['area'], asset_waf=http_info['waf'],
asset_cdn=False,
asset_banner=http_info['banner'], asset_info='', asset_whois='')
# 将资产信息添加到数据库会话中
session.add(srcasset_sql)
try:
# 提交事务,将资产信息写入数据库
session.commit()
except Exception as error:
# 如果写入过程中出现错误,回滚事务并打印错误信息
session.rollback()
print(f'[-]Url 探测-子域名入库异常{error}')
def ip_scan(ip, task_sql):
# 创建端口扫描对象,传入单个 IP
portscan = PortScan(ip)
# 运行端口扫描,获取端口信息字典和漏洞列表
port_dict, vulns_list = portscan.run()
if port_dict:
# 将端口扫描结果写入数据库
WritePosts(port_dict, task_sql, ip)
if __name__ == '__main__':
main()