You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client-1/urlscan/url_probe/urlscan_run.py

261 lines
9.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

'''探测端口是否为HTTP服务在加入到web资产表'''
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../../')
import requests
import chardet
from bs4 import BeautifulSoup
import random
import ipaddress
from concurrent import futures
import time
from urllib.parse import urlparse
import threading
from client.database import session, SrcPorts, SrcAssets
from client.webinfo.run import SelectIP, Check_Waf
requests.packages.urllib3.disable_warnings()
LOCK = threading.RLock()
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/68.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:61.0) '
'Gecko/20100101 Firefox/68.0',
'Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/68.0']
class UrlProbe:
def __init__(self, ip_dict):
'''{'ip': 'xxx', 'port': 123}'''
self.ip_dict = ip_dict
def _gen_random_ip(self):
"""生成随机的点分十进制的IP字符串"""
while True:
ip = ipaddress.IPv4Address(random.randint(0, 2 ** 32 - 1))
if ip.is_global:
return ip.exploded
def _gen_fake_header(self):
"""生成伪造请求头"""
ua = random.choice(user_agents)
ip = self._gen_random_ip()
headers = {
'Accept': 'text/html,application/xhtml+xml,'
'application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'DNT': '1',
'Referer': 'https://www.google.com/',
'Upgrade-Insecure-Requests': '1',
'User-Agent': ua,
'X-Forwarded-For': ip,
'X-Real-IP': ip
}
return headers
def _check_http(self):
'''HTTP服务探测'''
url = f"http://{self.ip_dict['ip']}:{self.ip_dict['port']}"
headers = self._gen_fake_header()
try:
response = requests.get(url, timeout=20, headers=headers)
except requests.exceptions.SSLError:
url = f"https://{self.ip_dict['ip']}:{self.ip_dict['port']}"
try:
response = requests.get(url, timeout=20, verify=False, headers=headers)
except Exception as e:
return None
else:
return response
except Exception as e:
return None
else:
return response
def _get_banner(self, headers):
# 从 HTTP 响应头中获取 'Server' 的值
server = headers.get('Server')
# 从 HTTP 响应头中获取 'X-Powered-By' 的值
Powered = headers.get('X-Powered-By')
# 如果 'Server' 或者 'X-Powered-By' 有值
if server or Powered:
# 返回拼接后的字符串
return f'{server},{Powered}'
else:
# 如果两者都没有值,返回空字符串
return ''
def _get_title(self, markup):
'''获取网页标题'''
try:
soup = BeautifulSoup(markup, 'lxml')
except:
return None
title = soup.title
if title:
return title.text.strip()
h1 = soup.h1
if h1:
return h1.text.strip()
h2 = soup.h2
if h2:
return h2.text.strip()
h3 = soup.h3
if h2:
return h3.text.strip()
desc = soup.find('meta', attrs={'name': 'description'})
if desc:
return desc['content'].strip()
word = soup.find('meta', attrs={'name': 'keywords'})
if word:
return word['content'].strip()
if len(markup) <= 200:
return markup.strip()
text = soup.text
if len(text) <= 200:
return text.strip()
return None
def run(self):
# 打印正在探测的 URL 信息
print(f'[+]URL 开始探测:[{self.ip_dict}]')
# 检查是否为 HTTP 服务,如果不是则返回 None
response = self._check_http()
if response == None:
print(f'[-]URL 探测:[{self.ip_dict}]非 HTTP 服务')
return None
# 如果响应状态码为 200
if response.status_code == 200:
# 自动识别响应内容的编码
mychar = chardet.detect(response.content)
bianma = mychar['encoding']
response.encoding = bianma
# 获取页面标题
title = self._get_title(markup=response.text)
# 获取响应头中的 banner 信息
banner = self._get_banner(response.headers)
# 创建包含 IP、端口、标题和 banner 的字典
assets_dict = self.ip_dict
assets_dict['title'] = title
assets_dict['banner'] = banner
assets_dict['host'] = response.url
# 返回包含信息的字典
return assets_dict
else:
# 如果状态码不是 200打印错误信息并返回 None
print(f'[-]URL 探测:[{self.ip_dict}]状态码非 200')
return None
def ReadPorts():
'''读取端口数据'''
port_sql = session.query(SrcPorts).filter(SrcPorts.port_url_scan == False).limit(10).all()
session.commit()
if port_sql:
for port in port_sql:
port.port_url_scan = True
session.add(port)
try:
session.commit()
except Exception as error:
print(f'[-]URL扫描-修改端口扫描状态异常{error}')
else:
session.refresh(port, ['port_url_scan'])
return port_sql
def WriteAsset(http_info, port_sql):
# 加锁,防止多线程同时写入数据库产生冲突
LOCK.acquire()
# 查询数据库中是否已经存在相同主机的资产记录
asset_count = session.query(SrcAssets).filter(SrcAssets.asset_host == http_info['host']).count()
# 提交事务,确保查询结果的准确性
session.commit()
if not asset_count:
# 如果不存在相同主机的资产记录,则创建新的资产记录对象
srcasset_sql = SrcAssets(asset_name=port_sql.port_name, asset_host=http_info['host'],
asset_subdomain=http_info['subdomain'],
asset_title=http_info['title'],
asset_ip=port_sql.port_ip, asset_area=http_info['area'], asset_waf=http_info['waf'],
asset_cdn=False,
asset_banner=http_info['banner'], asset_info='', asset_whois='')
# 将新的资产记录添加到数据库会话中
session.add(srcasset_sql)
try:
# 提交事务,将新的资产记录写入数据库
session.commit()
except Exception as error:
# 如果写入过程中出现异常,回滚事务
session.rollback()
# 打印错误信息
print(f'[-]Url探测-子域名入库异常{error}')
finally:
# 释放锁
LOCK.release()
else:
# 如果已经存在相同主机的资产记录,直接释放锁
LOCK.release()
def main():
# 打印提示信息,表示 URL 扫描启动
print('[+]URL 扫描启动')
# 创建线程池,最大工作线程数为 10
pool = futures.ThreadPoolExecutor(max_workers=10)
# 进入无限循环
while True:
# 读取端口信息
port_sql = ReadPorts()
# 如果没有端口信息,等待 30 秒
if not port_sql:
time.sleep(30)
else:
# 使用线程池提交任务,对每个端口信息执行 action 函数
wait_for = [pool.submit(action, sql_port) for sql_port in port_sql]
# 遍历已完成的任务,获取结果
for f in futures.as_completed(wait_for):
f.result()
def action(sql_port):
# 如果端口是 80 端口,直接返回 None
if sql_port.port_port == 80:
return None
try:
# 解析端口对应的主机地址
host = urlparse(sql_port.port_host)
except:
# 解析出现异常,返回 None
return None
# 获取解析后的主机地址
host = host.netloc
# 打印正在探测的 URL 信息
print(f'[+]URL 开始探测:{host}:{sql_port.port_port}')
# 创建包含 IP 和端口的字典
ip_dict = {'ip': host, 'port': sql_port.port_port}
# 使用 UrlProbe 类对给定的 IP 和端口进行探测
http_info = UrlProbe(ip_dict)
# 执行探测并获取结果
info = http_info.run()
if info:
# 获取 IP 对应的地区信息
area = SelectIP(sql_port.port_ip)
# 检查是否有 WAF并得到标志和 WAF 信息
flag, waf = Check_Waf(info['host'])
# 将地区信息添加到探测结果中
info['area'] = area
# 将 WAF 信息添加到探测结果中
info['waf'] = waf
# 将子域名信息添加到探测结果中
info['subdomain'] = host
# 将探测结果写入资产
WriteAsset(info, sql_port)
if __name__ == '__main__':
main()