import api from client.subdomain.oneforall.common.query import Query from client.subdomain.oneforall.config import logger class CensysAPI(Query): def __init__(self, domain): # 初始化父类 Query,注册 domain,设置 API 信息和查询配置 Query.__init__(self) self.domain = self.register(domain) # 注册域名进行后续匹配 self.module = 'Certificate' # 模块名称,表示这是一个证书查询模块 self.source = "CensysAPIQuery" # 数据源标识 self.addr = 'https://www.censys.io/api/v1/search/certificates' # Censys API 查询地址 self.id = api.censys_api_id # 从 api 配置中获取 Censys API ID self.secret = api.censys_api_secret # 从 api 配置中获取 Censys API Secret self.delay = 3.0 # 设置查询速率限制,Censys 接口最小查询间隔为 2.5 秒 def query(self): """ 向接口查询子域并做子域匹配 """ # 获取请求头和代理 self.header = self.get_header() self.proxy = self.get_proxy(self.source) # 构建查询数据 data = { 'query': f'parsed.names: {self.domain}', # 查询目标域名相关证书 'page': 1, # 默认查询第一页 'fields': ['parsed.subject_dn', 'parsed.names'], # 返回字段,获取证书的 subject_dn 和 names 'flatten': True # 扁平化数据结构 } # 发送 POST 请求,查询 Censys 数据 resp = self.post(self.addr, json=data, auth=(self.id, self.secret)) # 如果没有响应,直接返回 if not resp: return # 解析返回的 JSON 数据 json = resp.json() status = json.get('status') # 获取状态 # 如果状态不是 ok,打印警告信息 if status != 'ok': logger.log('ALERT', status) return # 匹配返回的子域名 subdomains = self.match(self.domain, str(json)) self.subdomains = self.subdomains.union(subdomains) # 将匹配到的子域加入集合 # 获取查询结果页数,并循环请求其他页 pages = json.get('metadata').get('pages') for page in range(2, pages + 1): # 从第 2 页开始查询 data['page'] = page resp = self.post(self.addr, json=data, auth=(self.id, self.secret)) # 如果没有响应,直接返回 if not resp: return # 匹配当前页面的子域名 subdomains = self.match(self.domain, str(resp.json())) self.subdomains = self.subdomains.union(subdomains) def run(self): """ 类执行入口 """ # 检查 API 密钥是否有效 if not self.check(self.id, self.secret): return # 开始查询流程 self.begin() self.query() # 执行查询操作 self.finish() # 完成查询后进行清理操作 self.save_json() # 保存查询结果为 JSON 文件 self.gen_result() # 生成结果 self.save_db() # 将结果保存到数据库 def do(domain): # 统一入口名字 方便多线程调用 """ 类统一调用入口 :param str domain: 域名 """ query = CensysAPI(domain) # 创建 CensysAPI 查询对象 query.run() # 执行查询 if __name__ == '__main__': do('example.com') # 执行示例查询