""" 工具函数 - 包含更多漏洞示例 """ import json import pickle import base64 import socket import threading import time class DataProcessor: def __init__(self): self.cache = {} self.lock = threading.Lock() def serialize_data(self, data): """序列化数据 - 使用不安全的pickle""" # pickle存在安全风险,可能执行任意代码 return pickle.dumps(data) def deserialize_data(self, data): """反序列化数据 - 使用不安全的pickle""" return pickle.loads(data) def base64_encode(self, data): """Base64编码""" return base64.b64encode(data.encode()).decode() def base64_decode(self, data): """Base64解码 - 未处理异常""" return base64.b64decode(data).decode() def json_parse(self, json_str): """JSON解析 - 未处理异常""" return json.loads(json_str) def process_large_file(self, filename): """处理大文件 - 可能导致内存溢出""" # 一次性读取整个文件到内存 with open(filename, 'r') as f: content = f.read() # 未限制处理的数据量 lines = content.split('\n') processed_lines = [] for line in lines: # 模拟复杂处理 processed_lines.append(line.upper()) return processed_lines class NetworkManager: def __init__(self): self.connections = [] def connect_to_server(self, host, port): """连接到服务器 - 未处理异常""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 未设置超时 sock.connect((host, port)) return sock def send_data(self, sock, data): """发送数据 - 未验证数据""" # 未检查数据大小 sock.send(data.encode()) def receive_data(self, sock, buffer_size=1024): """接收数据 - 缓冲区溢出风险""" # 固定缓冲区大小可能导致溢出 return sock.recv(buffer_size) def start_server(self, port): """启动服务器 - 未处理异常""" server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('0.0.0.0', port)) # 绑定到所有接口 server_socket.listen(5) while True: client_socket, addr = server_socket.accept() # 未验证客户端 print(f"连接来自: {addr}") # 在新线程中处理客户端 client_thread = threading.Thread( target=self.handle_client, args=(client_socket,) ) client_thread.start() def handle_client(self, client_socket): """处理客户端连接 - 未验证输入""" while True: data = client_socket.recv(1024) if not data: break # 直接执行接收到的命令 command = data.decode() if command.startswith('exec:'): # 命令注入风险 exec(command[5:]) client_socket.send(b"OK") class FileManager: def __init__(self): self.open_files = {} def read_config(self, filename): """读取配置文件 - 路径遍历风险""" # 未验证文件路径 with open(filename, 'r') as f: return f.read() def write_log(self, message): """写入日志 - 未检查磁盘空间""" with open('app.log', 'a') as f: f.write(f"{time.time()}: {message}\n") def backup_file(self, source, destination): """备份文件 - 未验证路径""" import shutil # 未检查目标路径 shutil.copy2(source, destination) def delete_file(self, filename): """删除文件 - 未验证权限""" import os # 未检查文件是否存在和权限 os.remove(filename) def vulnerable_function(data): """包含多个漏洞的函数""" # 1. 未验证输入 if isinstance(data, str): # 2. 字符串格式化漏洞 query = "SELECT * FROM users WHERE name = '%s'" % data print(query) # 3. 未检查异常 try: result = eval(data) # 使用eval存在代码注入风险 return result except: return None def race_condition_example(): """竞态条件示例""" counter = 0 def increment(): nonlocal counter temp = counter time.sleep(0.001) # 模拟处理时间 counter = temp + 1 # 创建多个线程同时修改counter threads = [] for i in range(10): thread = threading.Thread(target=increment) threads.append(thread) thread.start() for thread in threads: thread.join() print(f"最终计数器值: {counter}") # 可能不是10 if __name__ == "__main__": # 测试各种漏洞 processor = DataProcessor() network_mgr = NetworkManager() file_mgr = FileManager() # 测试序列化 test_data = {"user": "admin", "password": "secret"} serialized = processor.serialize_data(test_data) print("序列化完成") # 测试网络连接 try: sock = network_mgr.connect_to_server("example.com", 80) print("连接成功") sock.close() except: print("连接失败") # 测试竞态条件 race_condition_example()