|
|
#!/usr/bin/python3
|
|
|
# coding=utf-8
|
|
|
|
|
|
"""
|
|
|
OneForAll子域接管模块
|
|
|
|
|
|
:copyright: Copyright (c) 2019, Jing Ling. All rights reserved.
|
|
|
:license: GNU General Public License v3.0, see LICENSE for more details.
|
|
|
"""
|
|
|
import time
|
|
|
import json
|
|
|
from threading import Thread
|
|
|
from queue import Queue
|
|
|
|
|
|
import fire
|
|
|
from tablib import Dataset
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
import client.subdomain.oneforall.config as config
|
|
|
from client.subdomain.oneforall.config import logger
|
|
|
from client.subdomain.oneforall.common import utils
|
|
|
from client.subdomain.oneforall.common.module import Module
|
|
|
from client.subdomain.oneforall.common.domain import Domain
|
|
|
|
|
|
|
|
|
def get_fingerprint():
|
|
|
"""
|
|
|
获取指纹信息。
|
|
|
|
|
|
这个函数首先构建了一个指向指纹信息文件('fingerprints.json')的路径,
|
|
|
然后打开这个文件,以 UTF-8 编码读取内容并忽略可能出现的编码错误。
|
|
|
接着,使用 JSON 解析器将文件内容加载为 Python 对象,并将其作为指纹信息返回。
|
|
|
"""
|
|
|
path = config.data_storage_dir.joinpath('fingerprints.json')
|
|
|
with open(path, encoding='utf-8', errors='ignore') as file:
|
|
|
fingerprints = json.load(file)
|
|
|
return fingerprints
|
|
|
|
|
|
|
|
|
def get_cname(subdomain):
|
|
|
"""
|
|
|
获取给定子域名的 CNAME(规范名称)记录。
|
|
|
|
|
|
此函数首先获取一个 DNS 解析器,然后尝试查询给定子域名的 CNAME 记录。
|
|
|
如果在查询过程中出现异常,会记录跟踪信息并返回 None。
|
|
|
如果查询成功,会遍历查询结果并返回第一个 CNAME 记录(假设一个子域只有一个 CNAME 记录)。
|
|
|
"""
|
|
|
resolver = utils.dns_resolver()
|
|
|
try:
|
|
|
answers = resolver.query(subdomain, 'CNAME')
|
|
|
except Exception as e:
|
|
|
logger.log('TRACE', e.args)
|
|
|
return None
|
|
|
for answer in answers:
|
|
|
return answer.to_text() # 一个子域只有一个 CNAME 记录
|
|
|
|
|
|
|
|
|
def get_maindomain(subdomain):
|
|
|
"""
|
|
|
获取给定子域名的主域名。
|
|
|
|
|
|
这个函数接受一个子域名作为参数,然后使用 Domain 类对其进行处理,
|
|
|
调用 registered 方法获取子域名对应的注册域名部分,即主域名,并返回该主域名。
|
|
|
"""
|
|
|
return Domain(subdomain).registered()
|
|
|
|
|
|
|
|
|
class Takeover(Module):
|
|
|
"""
|
|
|
OneForAll多线程子域接管风险检查模块
|
|
|
|
|
|
Example:
|
|
|
python3 takeover.py --target www.example.com --format csv run
|
|
|
python3 takeover.py --target ./subdomains.txt --thread 10 run
|
|
|
|
|
|
Note:
|
|
|
参数format可选格式有'txt', 'rst', 'csv', 'tsv', 'json', 'yaml', 'html',
|
|
|
'jira', 'xls', 'xlsx', 'dbf', 'latex', 'ods'
|
|
|
参数path默认None使用OneForAll结果目录生成路径
|
|
|
|
|
|
:param any target: 单个子域或者每行一个子域的文件路径(必需参数)
|
|
|
:param int thread: 线程数(默认100)
|
|
|
:param str format: 导出格式(默认csv)
|
|
|
:param str path: 导出路径(默认None)
|
|
|
"""
|
|
|
def __init__(self, target, thread=100, path=None, format='csv'):
|
|
|
Module.__init__(self)
|
|
|
self.subdomains = set()
|
|
|
self.module = 'Check'
|
|
|
self.source = 'Takeover'
|
|
|
self.target = target
|
|
|
self.thread = thread
|
|
|
self.path = path
|
|
|
self.format = format
|
|
|
self.fingerprints = None
|
|
|
self.subdomainq = Queue()
|
|
|
self.cnames = list()
|
|
|
self.results = Dataset()
|
|
|
|
|
|
def save(self):
|
|
|
logger.log('DEBUG', '正在保存检查结果')
|
|
|
if self.format == 'txt':
|
|
|
data = str(self.results)
|
|
|
else:
|
|
|
data = self.results.export(self.format)
|
|
|
utils.save_data(self.path, data)
|
|
|
|
|
|
def compare(self, subdomain, cname, responses):
|
|
|
domain_resp = self.get('http://' + subdomain, check=False)
|
|
|
cname_resp = self.get('http://' + cname, check=False)
|
|
|
if domain_resp is None or cname_resp is None:
|
|
|
return
|
|
|
|
|
|
for resp in responses:
|
|
|
if resp in domain_resp.text and resp in cname_resp.text:
|
|
|
logger.log('ALERT', f'{subdomain}存在子域接管风险')
|
|
|
self.results.append([subdomain, cname])
|
|
|
break
|
|
|
|
|
|
def worker(self, subdomain):
|
|
|
cname = get_cname(subdomain)
|
|
|
if cname is None:
|
|
|
return
|
|
|
maindomain = get_maindomain(cname)
|
|
|
for fingerprint in self.fingerprints:
|
|
|
cnames = fingerprint.get('cname')
|
|
|
if maindomain not in cnames:
|
|
|
continue
|
|
|
responses = fingerprint.get('response')
|
|
|
self.compare(subdomain, cname, responses)
|
|
|
|
|
|
def check(self):
|
|
|
while not self.subdomainq.empty(): # 保证域名队列遍历结束后能退出线程
|
|
|
subdomain = self.subdomainq.get() # 从队列中获取域名
|
|
|
self.worker(subdomain)
|
|
|
self.subdomainq.task_done()
|
|
|
|
|
|
def progress(self):
|
|
|
# 设置进度
|
|
|
bar = tqdm()
|
|
|
bar.total = len(self.subdomains)
|
|
|
bar.desc = 'Check Progress'
|
|
|
bar.ncols = 80
|
|
|
while True:
|
|
|
done = bar.total - self.subdomainq.qsize()
|
|
|
bar.n = done
|
|
|
bar.update()
|
|
|
if done == bar.total: # 完成队列中所有子域的检查退出
|
|
|
break
|
|
|
# bar.close()
|
|
|
|
|
|
def run(self):
|
|
|
start = time.time()
|
|
|
logger.log('INFOR', f'开始执行{self.source}模块')
|
|
|
self.subdomains = utils.get_domains(self.target)
|
|
|
self.format = utils.check_format(self.format, len(self.subdomains))
|
|
|
timestamp = utils.get_timestamp()
|
|
|
name = f'takeover_check_result_{timestamp}'
|
|
|
self.path = utils.check_path(self.path, name, self.format)
|
|
|
if self.subdomains:
|
|
|
logger.log('INFOR', f'正在检查子域接管风险')
|
|
|
self.fingerprints = get_fingerprint()
|
|
|
self.results.headers = ['subdomain', 'cname']
|
|
|
# 创建待检查的子域队列
|
|
|
for domain in self.subdomains:
|
|
|
self.subdomainq.put(domain)
|
|
|
# 检查线程
|
|
|
for _ in range(self.thread):
|
|
|
check_thread = Thread(target=self.check, daemon=True)
|
|
|
check_thread.start()
|
|
|
# 进度线程
|
|
|
progress_thread = Thread(target=self.progress, daemon=True)
|
|
|
progress_thread.start()
|
|
|
|
|
|
self.subdomainq.join()
|
|
|
self.save()
|
|
|
else:
|
|
|
logger.log('FATAL', f'获取域名失败')
|
|
|
end = time.time()
|
|
|
elapse = round(end - start, 1)
|
|
|
logger.log('INFOR', f'{self.source}模块耗时{elapse}秒'
|
|
|
f'发现{len(self.results)}个子域存在接管风险')
|
|
|
logger.log('INFOR', f'子域接管风险检查结果 {self.path}')
|
|
|
logger.log('INFOR', f'结束执行{self.source}模块')
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
fire.Fire(Takeover)
|
|
|
# takeover = Takeover('www.example.com')
|
|
|
# takeover = Takeover('./subdomains.txt')
|
|
|
# takeover.run()
|