#!/usr/bin/python3 # coding=utf-8 """ OneForAll子域接管模块 :copyright: Copyright (c) 2019, Jing Ling. All rights reserved. :license: GNU General Public License v3.0, see LICENSE for more details. """ import time import json from threading import Thread from queue import Queue import fire from tablib import Dataset from tqdm import tqdm import client.subdomain.oneforall.config as config from client.subdomain.oneforall.config import logger from client.subdomain.oneforall.common import utils from client.subdomain.oneforall.common.module import Module from client.subdomain.oneforall.common.domain import Domain def get_fingerprint(): """ 获取指纹信息。 这个函数首先构建了一个指向指纹信息文件('fingerprints.json')的路径, 然后打开这个文件,以 UTF-8 编码读取内容并忽略可能出现的编码错误。 接着,使用 JSON 解析器将文件内容加载为 Python 对象,并将其作为指纹信息返回。 """ path = config.data_storage_dir.joinpath('fingerprints.json') with open(path, encoding='utf-8', errors='ignore') as file: fingerprints = json.load(file) return fingerprints def get_cname(subdomain): """ 获取给定子域名的 CNAME(规范名称)记录。 此函数首先获取一个 DNS 解析器,然后尝试查询给定子域名的 CNAME 记录。 如果在查询过程中出现异常,会记录跟踪信息并返回 None。 如果查询成功,会遍历查询结果并返回第一个 CNAME 记录(假设一个子域只有一个 CNAME 记录)。 """ resolver = utils.dns_resolver() try: answers = resolver.query(subdomain, 'CNAME') except Exception as e: logger.log('TRACE', e.args) return None for answer in answers: return answer.to_text() # 一个子域只有一个 CNAME 记录 def get_maindomain(subdomain): """ 获取给定子域名的主域名。 这个函数接受一个子域名作为参数,然后使用 Domain 类对其进行处理, 调用 registered 方法获取子域名对应的注册域名部分,即主域名,并返回该主域名。 """ return Domain(subdomain).registered() class Takeover(Module): """ OneForAll多线程子域接管风险检查模块 Example: python3 takeover.py --target www.example.com --format csv run python3 takeover.py --target ./subdomains.txt --thread 10 run Note: 参数format可选格式有'txt', 'rst', 'csv', 'tsv', 'json', 'yaml', 'html', 'jira', 'xls', 'xlsx', 'dbf', 'latex', 'ods' 参数path默认None使用OneForAll结果目录生成路径 :param any target: 单个子域或者每行一个子域的文件路径(必需参数) :param int thread: 线程数(默认100) :param str format: 导出格式(默认csv) :param str path: 导出路径(默认None) """ def __init__(self, target, thread=100, path=None, format='csv'): Module.__init__(self) self.subdomains = set() self.module = 'Check' self.source = 'Takeover' self.target = target self.thread = thread self.path = path self.format = format self.fingerprints = None self.subdomainq = Queue() self.cnames = list() self.results = Dataset() def save(self): logger.log('DEBUG', '正在保存检查结果') if self.format == 'txt': data = str(self.results) else: data = self.results.export(self.format) utils.save_data(self.path, data) def compare(self, subdomain, cname, responses): domain_resp = self.get('http://' + subdomain, check=False) cname_resp = self.get('http://' + cname, check=False) if domain_resp is None or cname_resp is None: return for resp in responses: if resp in domain_resp.text and resp in cname_resp.text: logger.log('ALERT', f'{subdomain}存在子域接管风险') self.results.append([subdomain, cname]) break def worker(self, subdomain): cname = get_cname(subdomain) if cname is None: return maindomain = get_maindomain(cname) for fingerprint in self.fingerprints: cnames = fingerprint.get('cname') if maindomain not in cnames: continue responses = fingerprint.get('response') self.compare(subdomain, cname, responses) def check(self): while not self.subdomainq.empty(): # 保证域名队列遍历结束后能退出线程 subdomain = self.subdomainq.get() # 从队列中获取域名 self.worker(subdomain) self.subdomainq.task_done() def progress(self): # 设置进度 bar = tqdm() bar.total = len(self.subdomains) bar.desc = 'Check Progress' bar.ncols = 80 while True: done = bar.total - self.subdomainq.qsize() bar.n = done bar.update() if done == bar.total: # 完成队列中所有子域的检查退出 break # bar.close() def run(self): start = time.time() logger.log('INFOR', f'开始执行{self.source}模块') self.subdomains = utils.get_domains(self.target) self.format = utils.check_format(self.format, len(self.subdomains)) timestamp = utils.get_timestamp() name = f'takeover_check_result_{timestamp}' self.path = utils.check_path(self.path, name, self.format) if self.subdomains: logger.log('INFOR', f'正在检查子域接管风险') self.fingerprints = get_fingerprint() self.results.headers = ['subdomain', 'cname'] # 创建待检查的子域队列 for domain in self.subdomains: self.subdomainq.put(domain) # 检查线程 for _ in range(self.thread): check_thread = Thread(target=self.check, daemon=True) check_thread.start() # 进度线程 progress_thread = Thread(target=self.progress, daemon=True) progress_thread.start() self.subdomainq.join() self.save() else: logger.log('FATAL', f'获取域名失败') end = time.time() elapse = round(end - start, 1) logger.log('INFOR', f'{self.source}模块耗时{elapse}秒' f'发现{len(self.results)}个子域存在接管风险') logger.log('INFOR', f'子域接管风险检查结果 {self.path}') logger.log('INFOR', f'结束执行{self.source}模块') if __name__ == '__main__': fire.Fire(Takeover) # takeover = Takeover('www.example.com') # takeover = Takeover('./subdomains.txt') # takeover.run()