You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client/subdomain/oneforall/takeover.py

172 lines
5.8 KiB

#!/usr/bin/python3
# coding=utf-8
"""
OneForAll子域接管模块
:copyright: Copyright (c) 2019, Jing Ling. All rights reserved.
:license: GNU General Public License v3.0, see LICENSE for more details.
"""
import time
import json
from threading import Thread
from queue import Queue
import fire
from tablib import Dataset
from tqdm import tqdm
import client.subdomain.oneforall.config as config
from client.subdomain.oneforall.config import logger
from client.subdomain.oneforall.common import utils
from client.subdomain.oneforall.common.module import Module
from client.subdomain.oneforall.common.domain import Domain
def get_fingerprint():
path = config.data_storage_dir.joinpath('fingerprints.json')
with open(path, encoding='utf-8', errors='ignore') as file:
fingerprints = json.load(file)
return fingerprints
def get_cname(subdomain):
resolver = utils.dns_resolver()
try:
answers = resolver.query(subdomain, 'CNAME')
except Exception as e:
logger.log('TRACE', e.args)
return None
for answer in answers:
return answer.to_text() # 一个子域只有一个CNAME记录
def get_maindomain(subdomain):
return Domain(subdomain).registered()
class Takeover(Module):
"""
OneForAll多线程子域接管风险检查模块
Example:
python3 takeover.py --target www.example.com --format csv run
python3 takeover.py --target ./subdomains.txt --thread 10 run
Note:
参数format可选格式有'txt', 'rst', 'csv', 'tsv', 'json', 'yaml', 'html',
'jira', 'xls', 'xlsx', 'dbf', 'latex', 'ods'
参数path默认None使用OneForAll结果目录生成路径
:param any target: 单个子域或者每行一个子域的文件路径(必需参数)
:param int thread: 线程数(默认100)
:param str format: 导出格式(默认csv)
:param str path: 导出路径(默认None)
"""
def __init__(self, target, thread=100, path=None, format='csv'):
Module.__init__(self)
self.subdomains = set()
self.module = 'Check'
self.source = 'Takeover'
self.target = target
self.thread = thread
self.path = path
self.format = format
self.fingerprints = None
self.subdomainq = Queue()
self.cnames = list()
self.results = Dataset()
def save(self):
logger.log('DEBUG', '正在保存检查结果')
if self.format == 'txt':
data = str(self.results)
else:
data = self.results.export(self.format)
utils.save_data(self.path, data)
def compare(self, subdomain, cname, responses):
domain_resp = self.get('http://' + subdomain, check=False)
cname_resp = self.get('http://' + cname, check=False)
if domain_resp is None or cname_resp is None:
return
for resp in responses:
if resp in domain_resp.text and resp in cname_resp.text:
logger.log('ALERT', f'{subdomain}存在子域接管风险')
self.results.append([subdomain, cname])
break
def worker(self, subdomain):
cname = get_cname(subdomain)
if cname is None:
return
maindomain = get_maindomain(cname)
for fingerprint in self.fingerprints:
cnames = fingerprint.get('cname')
if maindomain not in cnames:
continue
responses = fingerprint.get('response')
self.compare(subdomain, cname, responses)
def check(self):
while not self.subdomainq.empty(): # 保证域名队列遍历结束后能退出线程
subdomain = self.subdomainq.get() # 从队列中获取域名
self.worker(subdomain)
self.subdomainq.task_done()
def progress(self):
# 设置进度
bar = tqdm()
bar.total = len(self.subdomains)
bar.desc = 'Check Progress'
bar.ncols = 80
while True:
done = bar.total - self.subdomainq.qsize()
bar.n = done
bar.update()
if done == bar.total: # 完成队列中所有子域的检查退出
break
# bar.close()
def run(self):
start = time.time()
logger.log('INFOR', f'开始执行{self.source}模块')
self.subdomains = utils.get_domains(self.target)
self.format = utils.check_format(self.format, len(self.subdomains))
timestamp = utils.get_timestamp()
name = f'takeover_check_result_{timestamp}'
self.path = utils.check_path(self.path, name, self.format)
if self.subdomains:
logger.log('INFOR', f'正在检查子域接管风险')
self.fingerprints = get_fingerprint()
self.results.headers = ['subdomain', 'cname']
# 创建待检查的子域队列
for domain in self.subdomains:
self.subdomainq.put(domain)
# 检查线程
for _ in range(self.thread):
check_thread = Thread(target=self.check, daemon=True)
check_thread.start()
# 进度线程
progress_thread = Thread(target=self.progress, daemon=True)
progress_thread.start()
self.subdomainq.join()
self.save()
else:
logger.log('FATAL', f'获取域名失败')
end = time.time()
elapse = round(end - start, 1)
logger.log('INFOR', f'{self.source}模块耗时{elapse}'
f'发现{len(self.results)}个子域存在接管风险')
logger.log('INFOR', f'子域接管风险检查结果 {self.path}')
logger.log('INFOR', f'结束执行{self.source}模块')
if __name__ == '__main__':
fire.Fire(Takeover)
# takeover = Takeover('www.example.com')
# takeover = Takeover('./subdomains.txt')
# takeover.run()