You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client/subdomain/oneforall/modules/certificates/censys_api.py

97 lines
3.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import api
from client.subdomain.oneforall.common.query import Query
from client.subdomain.oneforall.config import logger
class CensysAPI(Query):
def __init__(self, domain):
# 初始化父类 Query注册 domain设置 API 信息和查询配置
Query.__init__(self)
self.domain = self.register(domain) # 注册域名进行后续匹配
self.module = 'Certificate' # 模块名称,表示这是一个证书查询模块
self.source = "CensysAPIQuery" # 数据源标识
self.addr = 'https://www.censys.io/api/v1/search/certificates' # Censys API 查询地址
self.id = api.censys_api_id # 从 api 配置中获取 Censys API ID
self.secret = api.censys_api_secret # 从 api 配置中获取 Censys API Secret
self.delay = 3.0 # 设置查询速率限制Censys 接口最小查询间隔为 2.5 秒
def query(self):
"""
向接口查询子域并做子域匹配
"""
# 获取请求头和代理
self.header = self.get_header()
self.proxy = self.get_proxy(self.source)
# 构建查询数据
data = {
'query': f'parsed.names: {self.domain}', # 查询目标域名相关证书
'page': 1, # 默认查询第一页
'fields': ['parsed.subject_dn', 'parsed.names'], # 返回字段,获取证书的 subject_dn 和 names
'flatten': True # 扁平化数据结构
}
# 发送 POST 请求,查询 Censys 数据
resp = self.post(self.addr, json=data, auth=(self.id, self.secret))
# 如果没有响应,直接返回
if not resp:
return
# 解析返回的 JSON 数据
json = resp.json()
status = json.get('status') # 获取状态
# 如果状态不是 ok打印警告信息
if status != 'ok':
logger.log('ALERT', status)
return
# 匹配返回的子域名
subdomains = self.match(self.domain, str(json))
self.subdomains = self.subdomains.union(subdomains) # 将匹配到的子域加入集合
# 获取查询结果页数,并循环请求其他页
pages = json.get('metadata').get('pages')
for page in range(2, pages + 1): # 从第 2 页开始查询
data['page'] = page
resp = self.post(self.addr, json=data, auth=(self.id, self.secret))
# 如果没有响应,直接返回
if not resp:
return
# 匹配当前页面的子域名
subdomains = self.match(self.domain, str(resp.json()))
self.subdomains = self.subdomains.union(subdomains)
def run(self):
"""
类执行入口
"""
# 检查 API 密钥是否有效
if not self.check(self.id, self.secret):
return
# 开始查询流程
self.begin()
self.query() # 执行查询操作
self.finish() # 完成查询后进行清理操作
self.save_json() # 保存查询结果为 JSON 文件
self.gen_result() # 生成结果
self.save_db() # 将结果保存到数据库
def do(domain): # 统一入口名字 方便多线程调用
"""
类统一调用入口
:param str domain: 域名
"""
query = CensysAPI(domain) # 创建 CensysAPI 查询对象
query.run() # 执行查询
if __name__ == '__main__':
do('example.com') # 执行示例查询