import api from client.subdomain.oneforall.common import utils from client.subdomain.oneforall.common.query import Query class SpyseAPI(Query): def __init__(self, domain): """ 初始化 SpyseAPI 查询类 :param domain: 待查询的域名 """ Query.__init__(self) # 调用父类 Query 的初始化方法 self.domain = domain # 设置目标域名 self.module = 'Certificate' # 模块名称 self.source = 'CertDBAPIQuery' # 数据源标识 self.addr = 'https://api.spyse.com/v1/subdomains' # Spyse API 地址 self.token = api.spyse_api_token # 从 api 配置中获取 API Token def query(self): """ 向 Spyse API 查询子域并进行子域匹配 """ page_num = 1 # 从第一页开始查询 while True: # 获取请求头和代理 self.header = self.get_header() self.proxy = self.get_proxy(self.source) # 构建查询参数,查询目标域名的子域信息 params = {'domain': self.domain, 'api_token': self.token, 'page': page_num} resp = self.get(self.addr, params) # 发送 GET 请求 # 如果响应为空,则返回 if not resp: return json = resp.json() # 获取响应的 JSON 数据 # 使用 utils.match_subdomain 方法匹配并提取子域名 subdomains = utils.match_subdomain(self.domain, str(json)) if not subdomains: # 如果没有找到子域名,则停止查询 break # 将找到的子域名合并到已知子域名集合中 self.subdomains = self.subdomains.union(subdomains) page_num += 1 # 查询下一页 # 如果当前查询结果少于 30 条记录,则表示查询已结束 if json.get('count') < 30: break def run(self): """ 类执行入口,执行查询操作并保存结果 """ # 检查 API Token 是否有效 if not self.check(self.token): return # 执行查询过程 self.begin() self.query() self.finish() # 保存查询结果 self.save_json() self.gen_result() self.save_db() def do(domain): # 统一入口,方便多线程调用 """ 类统一调用入口 :param str domain: 域名 """ query = SpyseAPI(domain) # 创建 SpyseAPI 查询实例 query.run() # 执行查询 if __name__ == '__main__': do('example.com') # 执行对 example.com 的查询