import cdx_toolkit from client.subdomain.oneforall.common.crawl import Crawl from client.subdomain.oneforall.config import logger class ArchiveCrawl(Crawl): def __init__(self, domain): # 初始化父类 Crawl,设置爬取域名和相关信息 Crawl.__init__(self) self.domain = domain # 注册目标域名 self.module = 'Crawl' # 爬虫模块 self.source = 'ArchiveCrawl' # 数据源标识 def crawl(self, domain, limit): """ 使用 Wayback Machine 爬取目标域名的历史记录以发现子域名 :param domain: 目标域名 :param limit: 每次爬取的最大历史记录数量 """ # 获取请求头和代理 self.header = self.get_header() self.proxy = self.get_proxy(self.source) # 使用 cdx_toolkit 库与 Internet Archive 交互 cdx = cdx_toolkit.CDXFetcher(source='ia') # 通过 Internet Archive 的 API 获取数据 url = f'*.{domain}/*' # 构建 URL 模式,表示查询该域名下的所有子域 size = cdx.get_size_estimate(url) # 获取该模式的历史记录大小估算 logger.log('DEBUG', f'{url} ArchiveCrawl size estimate {size}') # 输出估算结果 # 遍历爬取的 URL 响应 for resp in cdx.iter(url, limit=limit): # 如果状态码不是 301 或 302,则认为是有效的记录 if resp.data.get('status') not in ['301', '302']: url = resp.data.get('url') # 获取 URL subdomains = self.match(self.register(domain), url + resp.text) # 匹配子域名 # 合并找到的子域名 self.subdomains = self.subdomains.union(subdomains) def run(self): """ 类执行入口 """ self.begin() # 开始执行 self.crawl(self.domain, 50) # 爬取目标域名历史记录,限制为 50 条 # 对已发现的子域名进行进一步爬取,以发现更多子域 for subdomain in self.subdomains: if subdomain != self.domain: # 避免重复爬取目标域名本身 self.crawl(subdomain, 10) # 对子域名进行爬取,限制为 10 条历史记录 self.finish() # 爬取结束 self.save_json() # 保存结果为 JSON 格式 self.gen_result() # 生成最终结果 self.save_db() # 将结果保存到数据库 def do(domain): # 统一入口名字,方便多线程调用 """ 类统一调用入口 :param str domain: 域名 """ crawl = ArchiveCrawl(domain) # 创建 ArchiveCrawl 对象 crawl.run() # 执行爬取操作 if __name__ == '__main__': do('example.com') # 执行对 example.com 域名的爬取