You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client/subdomain/oneforall/takeover.py

192 lines
6.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/python3
# coding=utf-8
"""
OneForAll子域接管模块
:copyright: Copyright (c) 2019, Jing Ling. All rights reserved.
:license: GNU General Public License v3.0, see LICENSE for more details.
"""
import time
import json
from threading import Thread
from queue import Queue
import fire
from tablib import Dataset
from tqdm import tqdm
import client.subdomain.oneforall.config as config
from client.subdomain.oneforall.config import logger
from client.subdomain.oneforall.common import utils
from client.subdomain.oneforall.common.module import Module
from client.subdomain.oneforall.common.domain import Domain
def get_fingerprint():
"""
获取指纹信息。
这个函数首先构建了一个指向指纹信息文件('fingerprints.json')的路径,
然后打开这个文件,以 UTF-8 编码读取内容并忽略可能出现的编码错误。
接着,使用 JSON 解析器将文件内容加载为 Python 对象,并将其作为指纹信息返回。
"""
path = config.data_storage_dir.joinpath('fingerprints.json')
with open(path, encoding='utf-8', errors='ignore') as file:
fingerprints = json.load(file)
return fingerprints
def get_cname(subdomain):
"""
获取给定子域名的 CNAME规范名称记录。
此函数首先获取一个 DNS 解析器,然后尝试查询给定子域名的 CNAME 记录。
如果在查询过程中出现异常,会记录跟踪信息并返回 None。
如果查询成功,会遍历查询结果并返回第一个 CNAME 记录(假设一个子域只有一个 CNAME 记录)。
"""
resolver = utils.dns_resolver()
try:
answers = resolver.query(subdomain, 'CNAME')
except Exception as e:
logger.log('TRACE', e.args)
return None
for answer in answers:
return answer.to_text() # 一个子域只有一个 CNAME 记录
def get_maindomain(subdomain):
"""
获取给定子域名的主域名。
这个函数接受一个子域名作为参数,然后使用 Domain 类对其进行处理,
调用 registered 方法获取子域名对应的注册域名部分,即主域名,并返回该主域名。
"""
return Domain(subdomain).registered()
class Takeover(Module):
"""
OneForAll多线程子域接管风险检查模块
Example:
python3 takeover.py --target www.example.com --format csv run
python3 takeover.py --target ./subdomains.txt --thread 10 run
Note:
参数format可选格式有'txt', 'rst', 'csv', 'tsv', 'json', 'yaml', 'html',
'jira', 'xls', 'xlsx', 'dbf', 'latex', 'ods'
参数path默认None使用OneForAll结果目录生成路径
:param any target: 单个子域或者每行一个子域的文件路径(必需参数)
:param int thread: 线程数(默认100)
:param str format: 导出格式(默认csv)
:param str path: 导出路径(默认None)
"""
def __init__(self, target, thread=100, path=None, format='csv'):
Module.__init__(self)
self.subdomains = set()
self.module = 'Check'
self.source = 'Takeover'
self.target = target
self.thread = thread
self.path = path
self.format = format
self.fingerprints = None
self.subdomainq = Queue()
self.cnames = list()
self.results = Dataset()
def save(self):
logger.log('DEBUG', '正在保存检查结果')
if self.format == 'txt':
data = str(self.results)
else:
data = self.results.export(self.format)
utils.save_data(self.path, data)
def compare(self, subdomain, cname, responses):
domain_resp = self.get('http://' + subdomain, check=False)
cname_resp = self.get('http://' + cname, check=False)
if domain_resp is None or cname_resp is None:
return
for resp in responses:
if resp in domain_resp.text and resp in cname_resp.text:
logger.log('ALERT', f'{subdomain}存在子域接管风险')
self.results.append([subdomain, cname])
break
def worker(self, subdomain):
cname = get_cname(subdomain)
if cname is None:
return
maindomain = get_maindomain(cname)
for fingerprint in self.fingerprints:
cnames = fingerprint.get('cname')
if maindomain not in cnames:
continue
responses = fingerprint.get('response')
self.compare(subdomain, cname, responses)
def check(self):
while not self.subdomainq.empty(): # 保证域名队列遍历结束后能退出线程
subdomain = self.subdomainq.get() # 从队列中获取域名
self.worker(subdomain)
self.subdomainq.task_done()
def progress(self):
# 设置进度
bar = tqdm()
bar.total = len(self.subdomains)
bar.desc = 'Check Progress'
bar.ncols = 80
while True:
done = bar.total - self.subdomainq.qsize()
bar.n = done
bar.update()
if done == bar.total: # 完成队列中所有子域的检查退出
break
# bar.close()
def run(self):
start = time.time()
logger.log('INFOR', f'开始执行{self.source}模块')
self.subdomains = utils.get_domains(self.target)
self.format = utils.check_format(self.format, len(self.subdomains))
timestamp = utils.get_timestamp()
name = f'takeover_check_result_{timestamp}'
self.path = utils.check_path(self.path, name, self.format)
if self.subdomains:
logger.log('INFOR', f'正在检查子域接管风险')
self.fingerprints = get_fingerprint()
self.results.headers = ['subdomain', 'cname']
# 创建待检查的子域队列
for domain in self.subdomains:
self.subdomainq.put(domain)
# 检查线程
for _ in range(self.thread):
check_thread = Thread(target=self.check, daemon=True)
check_thread.start()
# 进度线程
progress_thread = Thread(target=self.progress, daemon=True)
progress_thread.start()
self.subdomainq.join()
self.save()
else:
logger.log('FATAL', f'获取域名失败')
end = time.time()
elapse = round(end - start, 1)
logger.log('INFOR', f'{self.source}模块耗时{elapse}'
f'发现{len(self.results)}个子域存在接管风险')
logger.log('INFOR', f'子域接管风险检查结果 {self.path}')
logger.log('INFOR', f'结束执行{self.source}模块')
if __name__ == '__main__':
fire.Fire(Takeover)
# takeover = Takeover('www.example.com')
# takeover = Takeover('./subdomains.txt')
# takeover.run()