You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
5.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
工具函数 - 包含更多漏洞示例
"""
import json
import pickle
import base64
import socket
import threading
import time
class DataProcessor:
def __init__(self):
self.cache = {}
self.lock = threading.Lock()
def serialize_data(self, data):
"""序列化数据 - 使用不安全的pickle"""
# pickle存在安全风险可能执行任意代码
return pickle.dumps(data)
def deserialize_data(self, data):
"""反序列化数据 - 使用不安全的pickle"""
return pickle.loads(data)
def base64_encode(self, data):
"""Base64编码"""
return base64.b64encode(data.encode()).decode()
def base64_decode(self, data):
"""Base64解码 - 未处理异常"""
return base64.b64decode(data).decode()
def json_parse(self, json_str):
"""JSON解析 - 未处理异常"""
return json.loads(json_str)
def process_large_file(self, filename):
"""处理大文件 - 可能导致内存溢出"""
# 一次性读取整个文件到内存
with open(filename, 'r') as f:
content = f.read()
# 未限制处理的数据量
lines = content.split('\n')
processed_lines = []
for line in lines:
# 模拟复杂处理
processed_lines.append(line.upper())
return processed_lines
class NetworkManager:
def __init__(self):
self.connections = []
def connect_to_server(self, host, port):
"""连接到服务器 - 未处理异常"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 未设置超时
sock.connect((host, port))
return sock
def send_data(self, sock, data):
"""发送数据 - 未验证数据"""
# 未检查数据大小
sock.send(data.encode())
def receive_data(self, sock, buffer_size=1024):
"""接收数据 - 缓冲区溢出风险"""
# 固定缓冲区大小可能导致溢出
return sock.recv(buffer_size)
def start_server(self, port):
"""启动服务器 - 未处理异常"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', port)) # 绑定到所有接口
server_socket.listen(5)
while True:
client_socket, addr = server_socket.accept()
# 未验证客户端
print(f"连接来自: {addr}")
# 在新线程中处理客户端
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket,)
)
client_thread.start()
def handle_client(self, client_socket):
"""处理客户端连接 - 未验证输入"""
while True:
data = client_socket.recv(1024)
if not data:
break
# 直接执行接收到的命令
command = data.decode()
if command.startswith('exec:'):
# 命令注入风险
exec(command[5:])
client_socket.send(b"OK")
class FileManager:
def __init__(self):
self.open_files = {}
def read_config(self, filename):
"""读取配置文件 - 路径遍历风险"""
# 未验证文件路径
with open(filename, 'r') as f:
return f.read()
def write_log(self, message):
"""写入日志 - 未检查磁盘空间"""
with open('app.log', 'a') as f:
f.write(f"{time.time()}: {message}\n")
def backup_file(self, source, destination):
"""备份文件 - 未验证路径"""
import shutil
# 未检查目标路径
shutil.copy2(source, destination)
def delete_file(self, filename):
"""删除文件 - 未验证权限"""
import os
# 未检查文件是否存在和权限
os.remove(filename)
def vulnerable_function(data):
"""包含多个漏洞的函数"""
# 1. 未验证输入
if isinstance(data, str):
# 2. 字符串格式化漏洞
query = "SELECT * FROM users WHERE name = '%s'" % data
print(query)
# 3. 未检查异常
try:
result = eval(data) # 使用eval存在代码注入风险
return result
except:
return None
def race_condition_example():
"""竞态条件示例"""
counter = 0
def increment():
nonlocal counter
temp = counter
time.sleep(0.001) # 模拟处理时间
counter = temp + 1
# 创建多个线程同时修改counter
threads = []
for i in range(10):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"最终计数器值: {counter}") # 可能不是10
if __name__ == "__main__":
# 测试各种漏洞
processor = DataProcessor()
network_mgr = NetworkManager()
file_mgr = FileManager()
# 测试序列化
test_data = {"user": "admin", "password": "secret"}
serialized = processor.serialize_data(test_data)
print("序列化完成")
# 测试网络连接
try:
sock = network_mgr.connect_to_server("example.com", 80)
print("连接成功")
sock.close()
except:
print("连接失败")
# 测试竞态条件
race_condition_example()