parent
5b94b9feb1
commit
d23ee5348f
@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
调试后端启动问题
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
|
||||
# 添加项目路径
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), 'backend'))
|
||||
|
||||
try:
|
||||
print("1. 测试导入数据库模块...")
|
||||
from app.database import init_db, get_db
|
||||
print("✓ 数据库模块导入成功")
|
||||
|
||||
print("2. 测试导入模型...")
|
||||
from app.models import Project, Scan, Vulnerability
|
||||
print("✓ 模型导入成功")
|
||||
|
||||
print("3. 测试导入API路由...")
|
||||
from app.api import projects, scans, reports, vulnerabilities, files
|
||||
print("✓ API路由导入成功")
|
||||
|
||||
print("4. 测试初始化数据库...")
|
||||
init_db()
|
||||
print("✓ 数据库初始化成功")
|
||||
|
||||
print("5. 测试创建FastAPI应用...")
|
||||
from fastapi import FastAPI
|
||||
app = FastAPI()
|
||||
print("✓ FastAPI应用创建成功")
|
||||
|
||||
print("\n所有测试通过!后端应该可以正常启动。")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 错误: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,8 @@
|
||||
# 示例配置文件
|
||||
database_host=localhost
|
||||
database_port=3306
|
||||
database_user=admin
|
||||
database_password=admin123
|
||||
api_key=sk-1234567890abcdef
|
||||
debug_mode=true
|
||||
log_level=debug
|
||||
@ -0,0 +1,109 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
示例Python项目 - 包含一些常见的代码漏洞
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import hashlib
|
||||
import subprocess
|
||||
from urllib.request import urlopen
|
||||
|
||||
class UserManager:
|
||||
def __init__(self):
|
||||
self.users = {}
|
||||
self.admin_password = "admin123" # 硬编码密码
|
||||
|
||||
def create_user(self, username, password):
|
||||
"""创建用户 - 存在SQL注入风险"""
|
||||
# 模拟SQL查询 - 未使用参数化查询
|
||||
query = f"INSERT INTO users (username, password) VALUES ('{username}', '{password}')"
|
||||
print(f"执行查询: {query}")
|
||||
|
||||
# 存储明文密码
|
||||
self.users[username] = password
|
||||
return True
|
||||
|
||||
def authenticate(self, username, password):
|
||||
"""用户认证"""
|
||||
if username in self.users:
|
||||
# 明文密码比较
|
||||
return self.users[username] == password
|
||||
return False
|
||||
|
||||
def hash_password(self, password):
|
||||
"""密码哈希 - 使用弱哈希算法"""
|
||||
# 使用MD5 - 已被认为不安全
|
||||
return hashlib.md5(password.encode()).hexdigest()
|
||||
|
||||
def read_file(filename):
|
||||
"""读取文件 - 未处理异常"""
|
||||
# 未检查文件是否存在
|
||||
with open(filename, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
def download_file(url):
|
||||
"""下载文件 - 存在安全风险"""
|
||||
# 未验证URL格式
|
||||
response = urlopen(url)
|
||||
return response.read()
|
||||
|
||||
def execute_command(cmd):
|
||||
"""执行系统命令 - 存在命令注入风险"""
|
||||
# 直接执行用户输入的命令
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||||
return result.stdout
|
||||
|
||||
def process_user_input(data):
|
||||
"""处理用户输入 - 未进行输入验证"""
|
||||
# 未验证输入长度和内容
|
||||
if len(data) > 1000: # 简单的长度检查
|
||||
return "输入过长"
|
||||
|
||||
# 未过滤危险字符
|
||||
return data.replace('<script>', '').replace('</script>', '')
|
||||
|
||||
def calculate_total(items):
|
||||
"""计算总数 - 存在除零风险"""
|
||||
total = 0
|
||||
for item in items:
|
||||
# 未检查除零
|
||||
total += item['price'] / item['quantity']
|
||||
return total
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
print("代码漏洞检测系统示例")
|
||||
|
||||
# 硬编码的敏感信息
|
||||
api_key = "sk-1234567890abcdef"
|
||||
database_url = "mysql://user:password@localhost/db"
|
||||
|
||||
# 未使用HTTPS
|
||||
external_url = "http://api.example.com/data"
|
||||
|
||||
# 创建用户管理器
|
||||
user_mgr = UserManager()
|
||||
|
||||
# 模拟用户操作
|
||||
username = input("请输入用户名: ")
|
||||
password = input("请输入密码: ")
|
||||
|
||||
# 未验证输入
|
||||
if user_mgr.create_user(username, password):
|
||||
print("用户创建成功")
|
||||
|
||||
# 尝试读取文件
|
||||
try:
|
||||
content = read_file("config.txt")
|
||||
print("配置文件内容:", content)
|
||||
except:
|
||||
print("文件读取失败")
|
||||
|
||||
# 执行命令
|
||||
command = input("请输入要执行的命令: ")
|
||||
output = execute_command(command)
|
||||
print("命令输出:", output)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,113 @@
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog, messagebox, scrolledtext
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
|
||||
# 替换为你的 Deepseek API 地址和密钥
|
||||
API_URL = "https://api.deepseek.com/v1/chat/completions"
|
||||
API_KEY = "your_deepseek_api_key_here"
|
||||
|
||||
HEADERS = {
|
||||
"Authorization": f"Bearer {API_KEY}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
def split_code(code, max_tokens=3000):
|
||||
"""将大段代码分割成适合 API 调用的块"""
|
||||
lines = code.splitlines()
|
||||
chunks = []
|
||||
current_chunk = []
|
||||
current_length = 0
|
||||
|
||||
for line in lines:
|
||||
line_length = len(line.encode('utf-8'))
|
||||
if current_length + line_length > max_tokens:
|
||||
chunks.append("\n".join(current_chunk))
|
||||
current_chunk = []
|
||||
current_length = 0
|
||||
current_chunk.append(line)
|
||||
current_length += line_length
|
||||
|
||||
if current_chunk:
|
||||
chunks.append("\n".join(current_chunk))
|
||||
|
||||
return chunks
|
||||
|
||||
def analyze_code(code_chunk):
|
||||
"""调用 Deepseek API 分析代码"""
|
||||
prompt = f"请检查以下代码中的潜在问题,包括语法错误、风格问题、逻辑漏洞等:\n\n{code_chunk}"
|
||||
data = {
|
||||
"model": "deepseek-chat",
|
||||
"messages": [
|
||||
{"role": "system", "content": "你是一个专业的代码审查助手。"},
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
"temperature": 0.3
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(API_URL, headers=HEADERS, json=data)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
return result['choices'][0]['message']['content']
|
||||
except Exception as e:
|
||||
return f"API 调用失败: {str(e)}"
|
||||
|
||||
def load_file():
|
||||
file_path = filedialog.askopenfilename(filetypes=[("Python 文件", "*.py"), ("所有文件", "*.*")])
|
||||
if file_path:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
code_text.delete(1.0, tk.END)
|
||||
code_text.insert(tk.END, f.read())
|
||||
|
||||
def analyze():
|
||||
code = code_text.get(1.0, tk.END)
|
||||
if not code.strip():
|
||||
messagebox.showwarning("警告", "请先加载代码")
|
||||
return
|
||||
|
||||
result_text.delete(1.0, tk.END)
|
||||
result_text.insert(tk.END, "正在分析代码...\n")
|
||||
root.update()
|
||||
|
||||
chunks = split_code(code)
|
||||
all_results = []
|
||||
|
||||
for i, chunk in enumerate(chunks):
|
||||
result = analyze_code(chunk)
|
||||
all_results.append(f"=== 第 {i+1} 段代码分析结果 ===\n{result}\n")
|
||||
time.sleep(1) # 避免 API 请求过快
|
||||
|
||||
result_text.delete(1.0, tk.END)
|
||||
result_text.insert(tk.END, "\n".join(all_results))
|
||||
|
||||
# GUI 构建
|
||||
root = tk.Tk()
|
||||
root.title("Deepseek 代码检查器")
|
||||
|
||||
frame_top = tk.Frame(root)
|
||||
frame_top.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||||
|
||||
code_label = tk.Label(frame_top, text="代码内容:")
|
||||
code_label.pack(anchor='w')
|
||||
|
||||
code_text = scrolledtext.ScrolledText(frame_top, height=20, width=100)
|
||||
code_text.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
frame_btn = tk.Frame(root)
|
||||
frame_btn.pack(pady=5)
|
||||
|
||||
load_btn = tk.Button(frame_btn, text="加载代码文件", command=load_file)
|
||||
load_btn.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
analyze_btn = tk.Button(frame_btn, text="开始分析", command=analyze)
|
||||
analyze_btn.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
result_label = tk.Label(root, text="分析结果:")
|
||||
result_label.pack(anchor='w', padx=10)
|
||||
|
||||
result_text = scrolledtext.ScrolledText(root, height=15, width=100)
|
||||
result_text.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10))
|
||||
|
||||
root.mainloop()
|
||||
@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
测试API连接
|
||||
"""
|
||||
import requests
|
||||
import json
|
||||
|
||||
def test_backend_api():
|
||||
"""测试后端API"""
|
||||
base_url = "http://localhost:8000"
|
||||
|
||||
try:
|
||||
# 测试健康检查
|
||||
print("测试健康检查...")
|
||||
response = requests.get(f"{base_url}/health")
|
||||
print(f"健康检查状态: {response.status_code}")
|
||||
print(f"响应: {response.json()}")
|
||||
|
||||
# 测试获取项目列表
|
||||
print("\n测试获取项目列表...")
|
||||
response = requests.get(f"{base_url}/api/projects")
|
||||
print(f"项目列表状态: {response.status_code}")
|
||||
if response.status_code == 200:
|
||||
print(f"项目数量: {len(response.json())}")
|
||||
else:
|
||||
print(f"错误: {response.text}")
|
||||
|
||||
# 测试创建项目
|
||||
print("\n测试创建项目...")
|
||||
project_data = {
|
||||
"name": "测试项目",
|
||||
"description": "这是一个测试项目",
|
||||
"language": "python",
|
||||
"project_path": "C:\\Users\\31576\\Desktop\\新建文件夹\\sample_project"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
f"{base_url}/api/projects",
|
||||
json=project_data,
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
print(f"创建项目状态: {response.status_code}")
|
||||
if response.status_code == 201:
|
||||
print("项目创建成功!")
|
||||
print(f"响应: {response.json()}")
|
||||
else:
|
||||
print(f"创建失败: {response.text}")
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
print("无法连接到后端服务,请确保后端正在运行")
|
||||
except Exception as e:
|
||||
print(f"测试失败: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_backend_api()
|
||||
Binary file not shown.
Loading…
Reference in new issue