import threading import time import random import string from flask import jsonify # 模拟队列 virus_queue = [] bugs_queue = [] # type 在选项中随机选择 virus_type_options = ["木马", "木马", "隐藏文件"] bugs_type_options = ["高危", "中危", "低危"] # virus_prefix 在三个选项中随机选择 virus_prefix_options = [ "Worm.Linux32.FakeFolder.BS", "VHEUR/QVM41.2.ABB5.Malware.Gen", "Hidden.dir.Generic.DWsdh" ] bugs_prefix_options = [ "KB3023052-Microsoft Edge 2023 BUG", "KB2880463-Kubectl Sslolo BUG-fix", "KG3054816-LiveLinux-KSDCO5" ] virus_size = 6 bugs_size = 5 def generate_random_string(length=16): letters = string.ascii_lowercase return ''.join(random.choice(letters) for _ in range(length)) def generate_random_path(): """生成随机路径,形式为 /tmp/{5位随机字符串}""" return f"/tmp/{generate_random_string(5)}" def initialize_virus_queue(): global virus_queue for _ in range(virus_size): random_string = generate_random_string() # 每次生成数据时,随机选择一个 virus_prefix virus_prefix = random.choice(virus_prefix_options) # 随机选择 virus_name virus_name = f'{virus_prefix}-{random_string}' path = generate_random_path() virus_type = random.choice(virus_type_options) # 生成 JSON 对象 virus_object = { "name": virus_name, "path": path, "type": virus_type } virus_queue.append(virus_object) def initialize_bugs_queue(): global bugs_queue for _ in range(bugs_size): random_string = generate_random_string() bugs_prefix = random.choice(bugs_prefix_options) bugs_name = f'{bugs_prefix}-{random_string}' bugs_type = random.choice(bugs_type_options) # 生成 JSON 对象 bugs_object = { "name": bugs_name, "type": bugs_type } bugs_queue.append(bugs_object) # 定时检查病毒和漏洞数量 def check_queue_length(): while True: time.sleep(600) # 每10分钟检查一次 if len(virus_queue) < virus_size: for _ in range(virus_size - len(virus_queue)): random_string = generate_random_string() virus_prefix = random.choice(virus_prefix_options) virus_name = random.choice(f'{virus_prefix}-{random_string}') path = generate_random_path() virus_type = random.choice(virus_type_options) virus_object = { "name": virus_name, "path": path, "type": virus_type } virus_queue.append(virus_object) if len(bugs_queue) < bugs_size: for _ in range(bugs_size - len(bugs_queue)): random_string = generate_random_string() bugs_prefix = random.choice(bugs_prefix_options) bugs_name = f'{bugs_prefix}-{random_string}' bugs_type = random.choice(bugs_type_options) # 生成 JSON 对象 bugs_object = { "name": bugs_name, "type": bugs_type } bugs_queue.append(bugs_object) def get_virus_queue_data(): return jsonify({"virus_queue": virus_queue}) def delete_virus_queue_data(names_list): global virus_queue for name in names_list: virus_queue = [data for data in virus_queue if data["name"] != name] return jsonify({"message": "Data deleted successfully"}) def get_bugs_queue_data(): return jsonify({"bugs_queue": bugs_queue}) def delete_bugs_queue_data(names_list): global bugs_queue for name in names_list: bugs_queue = [data for data in bugs_queue if data["name"] != name] return jsonify({"message": "Data deleted successfully"}) # 初始化队列 initialize_virus_queue() initialize_bugs_queue() # 启动后台线程 thread = threading.Thread(target=check_queue_length) thread.daemon = True thread.start()