You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client-1/subdomain/oneforall/brute.py

637 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/python3
# coding=utf-8
"""
OneForAll子域爆破模块
:copyright: Copyright (c) 2019, Jing Ling. All rights reserved.
:license: GNU General Public License v3.0, see LICENSE for more details.
"""
import gc
import json
import time
import random
import secrets
import exrex
import fire
import tenacity
from tenacity import RetryError
from dns.exception import Timeout
from dns.resolver import NXDOMAIN, YXDOMAIN, NoAnswer, NoNameservers
import client.subdomain.oneforall.config as config
import client.subdomain.oneforall.dbexport as dbexport
from client.subdomain.oneforall.common import utils
from client.subdomain.oneforall.common.module import Module
from client.subdomain.oneforall.config import logger
@tenacity.retry(reraise=True, stop=tenacity.stop_after_attempt(3))
def do_query_a(domain, resolver):
try:
answer = resolver.query(domain, 'A')
# 如果查询随机域名A记录时抛出Timeout异常则重新探测
except Timeout as e:
logger.log('ALERT', f'探测超时重新探测中')
logger.log('DEBUG', e.args)
raise tenacity.TryAgain
# 如果查询随机域名A记录时抛出NXDOMAIN异常
# 则说明不存在随机子域的A记录 即没有开启泛解析
except (NXDOMAIN, YXDOMAIN, NoAnswer, NoNameservers) as e:
logger.log('DEBUG', e.args)
logger.log('INFOR', f'{domain}没有使用泛解析')
return False
except Exception as e:
logger.log('ALERT', f'探测{domain}是否使用泛解析出错')
logger.log('FATAL', e.args)
exit(1)
else:
if answer.rrset is None:
logger.log('ALERT', f'结果无记录重新探测中')
raise tenacity.TryAgain
ttl = answer.ttl
name = answer.name
ips = {item.address for item in answer}
logger.log('ALERT', f'{domain}使用了泛解析')
logger.log('ALERT', f'{domain} 解析到域名: {name} '
f'IP: {ips} TTL: {ttl}')
return True
def detect_wildcard(domain, authoritative_ns):
"""
探测域名是否使用泛解析
:param str domain: 域名
:param list authoritative_ns: 权威DNS
:return: 是否使用泛解析
"""
logger.log('INFOR', f'正在探测{domain}是否使用泛解析')
token = secrets.token_hex(4)
random_subdomain = f'{token}.{domain}'
resolver = utils.dns_resolver()
resolver.nameservers = authoritative_ns
resolver.rotate = True
resolver.cache = None
try:
wildcard = do_query_a(random_subdomain, resolver)
except Timeout as e:
logger.log('DEBUG', e.args)
logger.log('ALERT', f'多次探测超时暂且认为{domain}没有使用泛解析')
return False
else:
return wildcard
def gen_fuzz_subdomains(expression, rule):
"""
生成基于fuzz模式的爆破子域
:param str expression: 子域域名生成表达式
:param str rule: 生成子域所需的正则规则
:return: 用于爆破的子域
"""
subdomains = list()
fuzz_count = exrex.count(rule)
if fuzz_count > 10000000:
logger.log('ALERT', f'请注意该规则生成的字典太大:{fuzz_count} > 10000000')
logger.log('DEBUG', f'fuzz模式下生成的字典大小{fuzz_count}')
for fuzz_string in exrex.generate(rule):
fuzz_string = fuzz_string.lower()
if not fuzz_string.isalnum():
continue
fuzz_domain = expression.replace('*', fuzz_string)
subdomains.append(fuzz_domain)
random_domain = random.choice(subdomains)
logger.log('ALERT', f'请注意检查基于fuzz模式生成的{random_domain}是否正确')
return subdomains
def gen_word_subdomains(expression, path):
"""
生成基于word模式的爆破子域
:param str expression: 子域域名生成表达式
:param str path: 字典路径
:return: 用于爆破的子域
"""
subdomains = list()
with open(path, encoding='utf-8', errors='ignore') as fd:
for line in fd:
word = line.strip().lower()
if not word.isalnum():
continue
if word.endswith('.'):
word = word[:-1]
subdomain = expression.replace('*', word)
subdomains.append(subdomain)
random_domain = random.choice(subdomains)
logger.log('DEBUG', f'fuzz模式下生成的字典大小{len(subdomains)}')
logger.log('ALERT', f'请注意检查基于word模式生成的{random_domain}是否正确')
return subdomains
def query_domain_ns_a(ns_list):
logger.log('INFOR', f'正在查询权威DNS名称服务器{ns_list}的A记录')
if not isinstance(ns_list, list):
return list()
ns_ip_list = []
resolver = utils.dns_resolver()
for ns in ns_list:
try:
answer = resolver.query(ns, 'A')
except Exception as e:
logger.log('ERROR', e.args)
logger.log('ERROR', f'查询权威DNS名称服务器{ns}的A记录出错')
continue
if answer:
for item in answer:
ns_ip_list.append(item.address)
logger.log('INFOR', f'权威DNS名称服务器对应A记录 {ns_ip_list}')
return ns_ip_list
def query_domain_ns(domain):
logger.log('INFOR', f'正在查询{domain}的NS记录')
domain = utils.get_maindomain(domain)
resolver = utils.dns_resolver()
try:
answer = resolver.query(domain, 'NS')
except Exception as e:
logger.log('ERROR', e.args)
logger.log('ERROR', f'查询{domain}的NS记录出错')
return list()
ns = [item.to_text() for item in answer]
logger.log('INFOR', f'{domain}的权威DNS名称服务器 {ns}')
return ns
@tenacity.retry(reraise=True, stop=tenacity.stop_after_attempt(2))
def get_wildcard_record(domain, resolver):
logger.log('INFOR', f'查询{domain}在权威DNS名称服务器的泛解析记录')
try:
answer = resolver.query(domain, 'A')
# 如果查询随机域名A记录时抛出Timeout异常则重新查询
except Timeout as e:
logger.log('ALERT', f'查询超时重新查询中')
logger.log('DEBUG', e.args)
raise tenacity.TryAgain
except (NXDOMAIN, YXDOMAIN, NoAnswer, NoNameservers) as e:
logger.log('DEBUG', e.args)
logger.log('INFOR', f'{domain}在权威DNS名称服务器上没有A记录')
return None, None
except Exception as e:
logger.log('ERROR', e.args)
logger.log('ERROR', f'查询{domain}在权威DNS名称服务器泛解析记录出错')
exit(1)
else:
if answer.rrset is None:
logger.log('DEBUG', f'查询结果无记录')
return None, None
name = answer.name
ip = {item.address for item in answer}
ttl = answer.ttl
logger.log('INFOR', f'{domain} 在权威DNS上解析到域名: {name} '
f'IP: {ip} TTL: {ttl}')
return ip, ttl
def collect_wildcard_record(domain, authoritative_ns):
logger.log('INFOR', f'正在收集{domain}的泛解析记录')
if not authoritative_ns:
return list(), int()
resolver = utils.dns_resolver()
resolver.nameservers = authoritative_ns
resolver.rotate = True
resolver.cache = None
ips = set()
ips_stat = dict()
while True:
token = secrets.token_hex(4)
random_subdomain = f'{token}.{domain}'
try:
ip, ttl = get_wildcard_record(random_subdomain, resolver)
except Timeout as e:
logger.log('DEBUG', e.args)
logger.log('ALERT', f'多次查询超时将尝试查询新的随机子域')
continue
if ip is None:
continue
ips = ips.union(ip)
# 统计每个泛解析IP出现次数
for addr in ip:
count = ips_stat.setdefault(addr, 0)
ips_stat[addr] = count + 1
# 筛选出出现次数2次以上的IP地址
addrs = list()
for addr, times in ips_stat.items():
if times >= 2:
addrs.append(addr)
# 大部分的IP地址出现次数大于2次停止收集泛解析IP记录
if len(addrs) / len(ips) >= 0.8:
break
logger.log('DEBUG', f'收集到{domain}的泛解析记录\n{ips}\n{ttl}')
return ips, ttl
def get_nameservers_path(enable_wildcard, ns_ip_list):
path = config.brute_nameservers_path
if not enable_wildcard:
return path
if not ns_ip_list:
return path
path = config.authoritative_dns_path
ns_data = '\n'.join(ns_ip_list)
utils.save_data(path, ns_data)
return path
def check_dict():
if not config.enable_check_dict:
return
sec = config.check_time
logger.log('ALERT', f'你有{sec}秒时间检查爆破配置是否正确')
logger.log('ALERT', f'退出爆破请使用`Ctrl+C`')
try:
time.sleep(sec)
except KeyboardInterrupt:
logger.log('INFOR', '爆破配置有误退出爆破')
exit(0)
def gen_records(items, records, subdomains, ip_times, wc_ips, wc_ttl):
qname = items.get('name')[:-1] # 去出最右边的`.`点号
reason = items.get('status')
resolver = items.get('resolver')
data = items.get('data')
answers = data.get('answers')
record = dict()
cname = list()
ips = list()
public = list()
times = list()
ttls = list()
is_valid_flags = list()
have_a_record = False
for answer in answers:
if answer.get('type') != 'A':
logger.log('TRACE', f'查询{qname}返回的应答没有A记录\n{answer}')
continue
logger.log('TRACE', f'查询{qname}返回的应答具有A记录\n{answer}')
have_a_record = True
ttl = answer.get('ttl')
ttls.append(ttl)
cname.append(answer.get('name')[:-1]) # 去出最右边的`.`点号
ip = answer.get('data')
ips.append(ip)
public.append(utils.ip_is_public(ip))
num = ip_times.get(ip)
times.append(num)
isvalid, reason = is_valid_subdomain(ip, ttl, num, wc_ips, wc_ttl)
logger.log('TRACE', f'{ip}是否有效:{isvalid} 原因:{reason}')
is_valid_flags.append(isvalid)
if not have_a_record:
logger.log('TRACE', f'查询{qname}返回的所有应答都中没有A记录{answers}')
# 为了优化内存 只添加有A记录且通过判断的子域到记录中
if have_a_record and all(is_valid_flags):
record['resolve'] = 1
record['reason'] = reason
record['ttl'] = ttls
record['cname'] = cname
record['content'] = ips
record['public'] = public
record['times'] = times
record['resolver'] = resolver
records[qname] = record
subdomains.append(qname)
return records, subdomains
def stat_ip_times(result_path):
logger.log('INFOR', f'正在统计IP次数')
times = dict()
with open(result_path) as fd:
for line in fd:
line = line.strip()
try:
items = json.loads(line)
except Exception as e:
logger.log('ERROR', e.args)
logger.log('ERROR', f'解析行{line}出错跳过解析该行')
continue
status = items.get('status')
if status != 'NOERROR':
continue
data = items.get('data')
if 'answers' not in data:
continue
answers = data.get('answers')
for answer in answers:
if answer.get('type') == 'A':
ip = answer.get('data')
# 取值 如果是首次出现的IP集合 出现次数先赋值0
value = times.setdefault(ip, 0)
times[ip] = value + 1
return times
def deal_output(output_path, ip_times, wildcard_ips, wildcard_ttl):
logger.log('INFOR', f'正在处理解析结果')
records = dict() # 用来记录所有域名解析数据
subdomains = list() # 用来保存所有通过有效性检查的子域
with open(output_path) as fd:
for line in fd:
line = line.strip()
try:
items = json.loads(line)
except Exception as e:
logger.log('ERROR', e.args)
logger.log('ERROR', f'解析行{line}出错跳过解析该行')
continue
qname = items.get('name')[:-1] # 去出最右边的`.`点号
status = items.get('status')
if status != 'NOERROR':
logger.log('TRACE', f'处理{line}时发现{qname}查询结果状态{status}')
continue
data = items.get('data')
if 'answers' not in data:
logger.log('TRACE', f'处理{line}时发现{qname}返回的结果无应答')
continue
records, subdomains = gen_records(items, records, subdomains,
ip_times, wildcard_ips,
wildcard_ttl)
return records, subdomains
def check_by_compare(ip, ttl, wc_ips, wc_ttl):
"""
通过与泛解析返回的IP集合和返回的TTL值进行对比判断发现的子域是否是泛解析子域
:param set ip: 子域A记录查询出的IP
:param int ttl: 子域A记录查询出的TTL
:param set wc_ips: 泛解析的IP集合
:param int wc_ttl: 泛解析的TTL
:return: 判断结果
"""
# 参考http://sh3ll.me/archives/201704041222.txt
if ip not in wc_ips:
return False # 子域IP不在泛解析IP集合则不是泛解析
if ttl != wc_ttl and ttl % 60 == 0 and wc_ttl % 60 == 0:
return False
return True
def check_ip_times(times):
"""
根据ip出现次数判断是否为泛解析
:param times: 子域IP出现次数
:return: 判断结果
"""
if times > config.ip_appear_maximum:
return True
return False
def is_valid_subdomain(ip, ttl, times, wc_ips, wc_ttl):
ip_blacklist = config.brute_ip_blacklist
if ip in ip_blacklist: # 解析ip在黑名单ip则为非法子域
return 0, 'IP blacklist'
if all([wc_ips, wc_ttl]): # 有泛解析记录才进行对比
if check_by_compare(ip, ttl, wc_ips, wc_ttl):
return 0, 'IP wildcard'
if check_ip_times(times):
return 0, 'IP exceeded'
return 1, 'OK'
def save_brute_dict(dict_path, dict_set):
dict_data = '\n'.join(dict_set)
if not utils.save_data(dict_path, dict_data):
logger.log('FATAL', '保存生成的字典出错')
exit(1)
def delete_file(dict_path, output_path):
if config.delete_generated_dict:
dict_path.unlink()
if config.delete_massdns_result:
output_path.unlink()
class Brute(Module):
"""
OneForAll子域爆破模块
Example
brute.py --target domain.com --word True run
brute.py --target ./domains.txt --word True run
brute.py --target domain.com --word True --process 1 run
brute.py --target domain.com --word True --wordlist subnames.txt run
brute.py --target domain.com --word True --recursive True --depth 2 run
brute.py --target d.com --fuzz True --place m.*.d.com --rule '[a-z]' run
Note:
参数alive可选值TrueFalse分别表示导出存活全部子域结果
参数format可选格式有'txt', 'rst', 'csv', 'tsv', 'json', 'yaml', 'html',
'jira', 'xls', 'xlsx', 'dbf', 'latex', 'ods'
参数path默认None使用OneForAll结果目录自动生成路径
:param str target: 单个域名或者每行一个域名的文件路径
:param int process: 爆破进程数(默认1)
:param int concurrent: 并发爆破数量(默认10000)
:param bool word: 是否使用word模式进行爆破(默认False)
:param str wordlist: word模式爆破使用的字典路径(默认使用config.py配置)
:param bool recursive: 是否使用递归进行爆破(默认False)
:param int depth: 递归爆破的深度(默认2)
:param str nextlist: 递归爆破所使用的字典路径(默认使用config.py配置)
:param bool fuzz: 是否使用fuzz模式进行爆破(默认False)
:param str place: 指定爆破位置(开启fuzz模式时必需指定此参数)
:param str rule: 指定fuzz模式爆破使用的正则规则(开启fuzz模式时必需指定此参数)
:param bool export: 是否导出爆破结果(默认True)
:param bool alive: 只导出存活的子域结果(默认True)
:param str format: 结果导出格式(默认csv)
:param str path: 结果导出路径(默认None)
"""
def __init__(self, target, process=None, concurrent=None, word=False,
wordlist=None, recursive=False, depth=None, nextlist=None,
fuzz=False, place=None, rule=None, export=True, alive=True,
format='csv', path=None):
Module.__init__(self)
self.module = 'Brute'
self.source = 'Brute'
self.target = target
self.process_num = process or utils.get_process_num()
self.concurrent_num = concurrent or config.brute_concurrent_num
self.word = word
self.wordlist = wordlist or config.brute_wordlist_path
self.recursive_brute = recursive or config.enable_recursive_brute
self.recursive_depth = depth or config.brute_recursive_depth
self.recursive_nextlist = nextlist or config.recursive_nextlist_path
self.fuzz = fuzz or config.enable_fuzz
self.place = place or config.fuzz_place
self.rule = rule or config.fuzz_rule
self.export = export
self.alive = alive
self.format = format
self.path = path
self.bulk = False # 是否是批量爆破场景
self.domains = list() # 待爆破的所有域名集合
self.domain = str() # 当前正在进行爆破的域名
self.ips_times = dict() # IP集合出现次数
self.enable_wildcard = False # 当前域名是否使用泛解析
self.wildcard_check = config.enable_wildcard_check
self.wildcard_deal = config.enable_wildcard_deal
self.check_env = True
def gen_brute_dict(self, domain):
logger.log('INFOR', f'正在为{domain}生成爆破字典')
dict_set = set()
# 如果domain不是self.subdomain 而是self.domain的子域则生成递归爆破字典
if self.place is None:
self.place = '*.' + domain
wordlist = self.wordlist
main_domain = self.register(domain)
if domain != main_domain:
wordlist = self.recursive_nextlist
if self.word:
word_subdomains = gen_word_subdomains(self.place, wordlist)
# set可以合并list
dict_set = dict_set.union(word_subdomains)
if self.fuzz:
fuzz_subdomains = gen_fuzz_subdomains(self.place, self.rule)
dict_set = dict_set.union(fuzz_subdomains)
# logger.log('INFOR', f'正在去重爆破字典')
# dict_set = utils.uniq_dict_list(dict_set)
count = len(dict_set)
logger.log('INFOR', f'生成的爆破字典大小为{count}')
if count > 10000000:
logger.log('ALERT', f'注意生成的爆破字典太大:{count} > 10000000')
return dict_set
def check_brute_params(self):
if not (self.word or self.fuzz):
logger.log('FATAL', f'请至少指定一种爆破模式')
exit(1)
if len(self.domains) > 1:
self.bulk = True
if self.fuzz:
if self.place is None or self.rule is None:
logger.log('FATAL', f'没有指定fuzz位置或规则')
exit(1)
if self.bulk:
logger.log('FATAL', f'批量爆破的场景下不能使用fuzz模式')
exit(1)
if self.recursive_brute:
logger.log('FATAL', f'使用fuzz模式下不能使用递归爆破')
exit(1)
fuzz_count = self.place.count('*')
if fuzz_count < 1:
logger.log('FATAL', f'没有指定fuzz位置')
exit(1)
if fuzz_count > 1:
logger.log('FATAL', f'只能指定1个fuzz位置')
exit(1)
if self.domain not in self.place:
logger.log('FATAL', f'指定fuzz的域名有误')
exit(1)
def main(self, domain):
start = time.time()
logger.log('INFOR', f'正在爆破域名{domain}')
massdns_dir = config.third_party_dir.joinpath('massdns')
result_dir = config.result_save_dir
temp_dir = result_dir.joinpath('temp')
utils.check_dir(temp_dir)
massdns_path = utils.get_massdns_path(massdns_dir)
timestring = utils.get_timestring()
wildcard_ips = list() # 泛解析IP列表
wildcard_ttl = int() # 泛解析TTL整型值
ns_list = query_domain_ns(self.domain)
ns_ip_list = query_domain_ns_a(ns_list) # DNS权威名称服务器对应A记录列表
self.enable_wildcard = detect_wildcard(domain, ns_ip_list)
if self.enable_wildcard:
wildcard_ips, wildcard_ttl = collect_wildcard_record(domain,
ns_ip_list)
ns_path = get_nameservers_path(self.enable_wildcard, ns_ip_list)
dict_set = self.gen_brute_dict(domain)
dict_len = len(dict_set)
dict_name = f'generated_subdomains_{domain}_{timestring}.txt'
dict_path = temp_dir.joinpath(dict_name)
save_brute_dict(dict_path, dict_set)
del dict_set
gc.collect()
output_name = f'resolved_result_{domain}_{timestring}.json'
output_path = temp_dir.joinpath(output_name)
log_path = result_dir.joinpath('massdns.log')
check_dict()
utils.call_massdns(massdns_path, dict_path, ns_path, output_path,
log_path, process_num=self.process_num,
concurrent_num=self.concurrent_num)
ip_times = stat_ip_times(output_path)
self.records, self.subdomains = deal_output(output_path, ip_times,
wildcard_ips, wildcard_ttl)
delete_file(dict_path, output_path)
end = time.time()
self.elapse = round(end - start, 1)
logger.log('INFOR', f'{self.source}模块耗时{self.elapse}'
f'发现{domain}的子域{len(self.subdomains)}')
logger.log('DEBUG', f'{self.source}模块发现{domain}的子域:\n'
f'{self.subdomains}')
self.gen_result(brute=dict_len, valid=len(self.subdomains))
self.save_db()
return self.subdomains
def run(self):
logger.log('INFOR', f'开始执行{self.source}模块')
if self.check_env:
utils.check_env()
self.domains = utils.get_domains(self.target)
all_subdomains = list()
for self.domain in self.domains:
self.check_brute_params()
if self.recursive_brute:
logger.log('INFOR', f'开始递归爆破{self.domain}的第1层子域')
valid_subdomains = self.main(self.domain)
all_subdomains.extend(valid_subdomains)
# 递归爆破下一层的子域
# fuzz模式不使用递归爆破
if self.recursive_brute:
for layer_num in range(1, self.recursive_depth):
# 之前已经做过1层子域爆破 当前实际递归层数是layer+1
logger.log('INFOR', f'开始递归爆破{self.domain}'
f'{layer_num + 1}层子域')
for subdomain in all_subdomains:
self.place = '*.' + subdomain
# 进行下一层子域爆破的限制条件
num = subdomain.count('.') - self.domain.count('.')
if num == layer_num:
valid_subdomains = self.main(subdomain)
all_subdomains.extend(valid_subdomains)
logger.log('INFOR', f'结束执行{self.source}模块爆破域名{self.domain}')
if not self.path:
name = f'{self.domain}_brute_result.{self.format}'
self.path = config.result_save_dir.joinpath(name)
# 数据库导出
if self.export:
dbexport.export(self.domain,
alive=self.alive,
limit='resolve',
path=self.path,
format=self.format)
if __name__ == '__main__':
fire.Fire(Brute)