You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
8.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
Python代码分析器
"""
import ast
import os
from typing import List, Dict, Any
from .base_analyzer import BaseAnalyzer
class PythonAnalyzer(BaseAnalyzer):
"""Python代码分析器"""
def __init__(self):
super().__init__()
self.name = "Python Analyzer"
self.version = "1.0.0"
self.supported_extensions = ["py"]
self.description = "Python代码静态分析器"
async def analyze(self, project_path: str, config: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""分析Python代码"""
vulnerabilities = []
# 获取所有Python文件
python_files = self.get_project_files(project_path)
for file_path in python_files:
try:
# 读取文件内容
content = self.read_file_content(file_path)
if not content:
continue
# 解析AST
tree = ast.parse(content, filename=file_path)
# 执行各种检查
vulnerabilities.extend(self._check_security_issues(tree, file_path, content))
vulnerabilities.extend(self._check_performance_issues(tree, file_path, content))
vulnerabilities.extend(self._check_maintainability_issues(tree, file_path, content))
except SyntaxError as e:
# 语法错误
vulnerabilities.append(self.create_vulnerability(
rule_id="PY001",
message=f"语法错误: {str(e)}",
file_path=file_path,
line_number=e.lineno,
severity="critical",
category="reliability"
))
except Exception as e:
# 其他错误
vulnerabilities.append(self.create_vulnerability(
rule_id="PY000",
message=f"分析错误: {str(e)}",
file_path=file_path,
severity="high",
category="reliability"
))
return vulnerabilities
def _check_security_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]:
"""检查安全问题"""
vulnerabilities = []
lines = content.split('\n')
for node in ast.walk(tree):
# 检查eval使用
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'eval':
vulnerabilities.append(self.create_vulnerability(
rule_id="PY101",
message="使用了eval()函数,存在代码注入风险",
file_path=file_path,
line_number=node.lineno,
severity="critical",
category="security",
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
))
# 检查exec使用
elif isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec':
vulnerabilities.append(self.create_vulnerability(
rule_id="PY102",
message="使用了exec()函数,存在代码注入风险",
file_path=file_path,
line_number=node.lineno,
severity="critical",
category="security",
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
))
# 检查pickle使用
elif isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
if (isinstance(node.func.value, ast.Name) and
node.func.value.id == 'pickle' and
node.func.attr in ['loads', 'load']):
vulnerabilities.append(self.create_vulnerability(
rule_id="PY103",
message="使用了pickle反序列化存在安全风险",
file_path=file_path,
line_number=node.lineno,
severity="high",
category="security",
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
))
return vulnerabilities
def _check_performance_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]:
"""检查性能问题"""
vulnerabilities = []
lines = content.split('\n')
for node in ast.walk(tree):
# 检查列表推导式中的循环
if isinstance(node, ast.ListComp):
if len(node.generators) > 1:
vulnerabilities.append(self.create_vulnerability(
rule_id="PY201",
message="复杂的列表推导式可能影响性能",
file_path=file_path,
line_number=node.lineno,
severity="medium",
category="performance",
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
))
# 检查全局变量使用
elif isinstance(node, ast.Global):
vulnerabilities.append(self.create_vulnerability(
rule_id="PY202",
message="使用全局变量可能影响性能",
file_path=file_path,
line_number=node.lineno,
severity="low",
category="performance",
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
))
return vulnerabilities
def _check_maintainability_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]:
"""检查可维护性问题"""
vulnerabilities = []
lines = content.split('\n')
# 检查函数长度
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
if len(node.body) > 20:
vulnerabilities.append(self.create_vulnerability(
rule_id="PY301",
message=f"函数 '{node.name}' 过长,建议拆分",
file_path=file_path,
line_number=node.lineno,
severity="medium",
category="maintainability",
code_snippet=f"def {node.name}(...):"
))
# 检查类长度
elif isinstance(node, ast.ClassDef):
if len(node.body) > 15:
vulnerabilities.append(self.create_vulnerability(
rule_id="PY302",
message=f"'{node.name}' 过长,建议拆分",
file_path=file_path,
line_number=node.lineno,
severity="medium",
category="maintainability",
code_snippet=f"class {node.name}:"
))
# 检查循环嵌套
elif isinstance(node, (ast.For, ast.While)):
nested_loops = 0
for child in ast.walk(node):
if isinstance(child, (ast.For, ast.While)) and child != node:
nested_loops += 1
if nested_loops > 2:
vulnerabilities.append(self.create_vulnerability(
rule_id="PY303",
message="循环嵌套过深,影响代码可读性",
file_path=file_path,
line_number=node.lineno,
severity="low",
category="maintainability",
code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else ""
))
return vulnerabilities