|
|
"""
|
|
|
Python代码分析器
|
|
|
"""
|
|
|
import ast
|
|
|
import os
|
|
|
from typing import List, Dict, Any
|
|
|
from .base_analyzer import BaseAnalyzer
|
|
|
|
|
|
class PythonAnalyzer(BaseAnalyzer):
|
|
|
"""Python代码分析器"""
|
|
|
|
|
|
def __init__(self):
|
|
|
super().__init__()
|
|
|
self.name = "Python Analyzer"
|
|
|
self.version = "1.0.0"
|
|
|
self.supported_extensions = ["py"]
|
|
|
self.description = "Python代码静态分析器"
|
|
|
|
|
|
async def analyze(self, project_path: str, config: Dict[str, Any] = None) -> List[Dict[str, Any]]:
|
|
|
"""分析Python代码"""
|
|
|
vulnerabilities = []
|
|
|
|
|
|
# 获取所有Python文件
|
|
|
python_files = self.get_project_files(project_path)
|
|
|
|
|
|
for file_path in python_files:
|
|
|
try:
|
|
|
# 读取文件内容
|
|
|
content = self.read_file_content(file_path)
|
|
|
if not content:
|
|
|
continue
|
|
|
|
|
|
# 解析AST
|
|
|
tree = ast.parse(content, filename=file_path)
|
|
|
|
|
|
# 执行各种检查
|
|
|
vulnerabilities.extend(self._check_security_issues(tree, file_path, content))
|
|
|
vulnerabilities.extend(self._check_performance_issues(tree, file_path, content))
|
|
|
vulnerabilities.extend(self._check_maintainability_issues(tree, file_path, content))
|
|
|
|
|
|
except SyntaxError as e:
|
|
|
# 语法错误
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY001",
|
|
|
message=f"语法错误: {str(e)}",
|
|
|
file_path=file_path,
|
|
|
line_number=e.lineno,
|
|
|
severity="critical",
|
|
|
category="reliability"
|
|
|
))
|
|
|
except Exception as e:
|
|
|
# 其他错误
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY000",
|
|
|
message=f"分析错误: {str(e)}",
|
|
|
file_path=file_path,
|
|
|
severity="high",
|
|
|
category="reliability"
|
|
|
))
|
|
|
|
|
|
return vulnerabilities
|
|
|
|
|
|
def _check_security_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]:
|
|
|
"""检查安全问题"""
|
|
|
vulnerabilities = []
|
|
|
lines = content.split('\n')
|
|
|
|
|
|
for node in ast.walk(tree):
|
|
|
# 检查eval使用
|
|
|
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'eval':
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY101",
|
|
|
message="使用了eval()函数,存在代码注入风险",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="critical",
|
|
|
category="security",
|
|
|
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
|
|
|
))
|
|
|
|
|
|
# 检查exec使用
|
|
|
elif isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec':
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY102",
|
|
|
message="使用了exec()函数,存在代码注入风险",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="critical",
|
|
|
category="security",
|
|
|
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
|
|
|
))
|
|
|
|
|
|
# 检查pickle使用
|
|
|
elif isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
|
|
|
if (isinstance(node.func.value, ast.Name) and
|
|
|
node.func.value.id == 'pickle' and
|
|
|
node.func.attr in ['loads', 'load']):
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY103",
|
|
|
message="使用了pickle反序列化,存在安全风险",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="high",
|
|
|
category="security",
|
|
|
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
|
|
|
))
|
|
|
|
|
|
return vulnerabilities
|
|
|
|
|
|
def _check_performance_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]:
|
|
|
"""检查性能问题"""
|
|
|
vulnerabilities = []
|
|
|
lines = content.split('\n')
|
|
|
|
|
|
for node in ast.walk(tree):
|
|
|
# 检查列表推导式中的循环
|
|
|
if isinstance(node, ast.ListComp):
|
|
|
if len(node.generators) > 1:
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY201",
|
|
|
message="复杂的列表推导式可能影响性能",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="medium",
|
|
|
category="performance",
|
|
|
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
|
|
|
))
|
|
|
|
|
|
# 检查全局变量使用
|
|
|
elif isinstance(node, ast.Global):
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY202",
|
|
|
message="使用全局变量可能影响性能",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="low",
|
|
|
category="performance",
|
|
|
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
|
|
|
))
|
|
|
|
|
|
return vulnerabilities
|
|
|
|
|
|
def _check_maintainability_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]:
|
|
|
"""检查可维护性问题"""
|
|
|
vulnerabilities = []
|
|
|
lines = content.split('\n')
|
|
|
|
|
|
# 检查函数长度
|
|
|
for node in ast.walk(tree):
|
|
|
if isinstance(node, ast.FunctionDef):
|
|
|
if len(node.body) > 20:
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY301",
|
|
|
message=f"函数 '{node.name}' 过长,建议拆分",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="medium",
|
|
|
category="maintainability",
|
|
|
code_snippet=f"def {node.name}(...):"
|
|
|
))
|
|
|
|
|
|
# 检查类长度
|
|
|
elif isinstance(node, ast.ClassDef):
|
|
|
if len(node.body) > 15:
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY302",
|
|
|
message=f"类 '{node.name}' 过长,建议拆分",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="medium",
|
|
|
category="maintainability",
|
|
|
code_snippet=f"class {node.name}:"
|
|
|
))
|
|
|
|
|
|
# 检查循环嵌套
|
|
|
elif isinstance(node, (ast.For, ast.While)):
|
|
|
nested_loops = 0
|
|
|
for child in ast.walk(node):
|
|
|
if isinstance(child, (ast.For, ast.While)) and child != node:
|
|
|
nested_loops += 1
|
|
|
|
|
|
if nested_loops > 2:
|
|
|
vulnerabilities.append(self.create_vulnerability(
|
|
|
rule_id="PY303",
|
|
|
message="循环嵌套过深,影响代码可读性",
|
|
|
file_path=file_path,
|
|
|
line_number=node.lineno,
|
|
|
severity="low",
|
|
|
category="maintainability",
|
|
|
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
|
|
|
))
|
|
|
|
|
|
return vulnerabilities
|