You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client/subdomain/oneforall/modules/crawl/commoncrawl.py

69 lines
2.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import cdx_toolkit
from tqdm import tqdm
from client.subdomain.oneforall.common.crawl import Crawl
from client.subdomain.oneforall.config import logger
class CommonCrawl(Crawl):
def __init__(self, domain):
# 初始化父类 Crawl设置爬取域名和相关信息
Crawl.__init__(self)
self.domain = domain # 注册目标域名
self.module = 'Crawl' # 爬虫模块
self.source = 'CommonCrawl' # 数据源标识
def crawl(self, domain, limit):
"""
使用 Common Crawl 爬取目标域名的历史记录以发现子域名
:param domain: 目标域名
:param limit: 每次爬取的最大历史记录数量
"""
# 获取请求头和代理
self.header = self.get_header()
self.proxy = self.get_proxy(self.source)
# 使用 cdx_toolkit 库与 Common Crawl 交互
cdx = cdx_toolkit.CDXFetcher() # 初始化 CDXFetcher 类
url = f'*.{domain}/*' # 构建 URL 模式,表示查询该域名下的所有子域
size = cdx.get_size_estimate(url) # 获取该模式的历史记录大小估算
print(url, 'CommonCrawl size estimate', size) # 输出估算结果
# 遍历爬取的 URL 响应
for resp in tqdm(cdx.iter(url, limit=limit), total=limit):
# 如果状态码不是 301 或 302则认为是有效的记录
if resp.data.get('status') not in ['301', '302']:
subdomains = self.match(self.register(domain), resp.text) # 匹配子域名
# 合并找到的子域名
self.subdomains = self.subdomains.union(subdomains)
def run(self):
"""
类执行入口
"""
self.begin() # 开始执行
self.crawl(self.domain, 50) # 爬取目标域名历史记录,限制为 50 条
# 对已发现的子域名进行进一步爬取,以发现更多子域
for subdomain in self.subdomains:
if subdomain != self.domain: # 避免重复爬取目标域名本身
self.crawl(subdomain, 10) # 对子域名进行爬取,限制为 10 条历史记录
self.finish() # 爬取结束
self.save_json() # 保存结果为 JSON 格式
self.gen_result() # 生成最终结果
self.save_db() # 将结果保存到数据库
def do(domain): # 统一入口名字,方便多线程调用
"""
类统一调用入口
:param str domain: 域名
"""
crawl = CommonCrawl(domain) # 创建 CommonCrawl 对象
crawl.run() # 执行爬取操作
if __name__ == '__main__':
do('example.com') # 执行对 example.com 域名的爬取