""" Python代码分析器 """ import ast import os from typing import List, Dict, Any from .base_analyzer import BaseAnalyzer class PythonAnalyzer(BaseAnalyzer): """Python代码分析器""" def __init__(self): super().__init__() self.name = "Python Analyzer" self.version = "1.0.0" self.supported_extensions = ["py"] self.description = "Python代码静态分析器" async def analyze(self, project_path: str, config: Dict[str, Any] = None) -> List[Dict[str, Any]]: """分析Python代码""" vulnerabilities = [] # 获取所有Python文件 python_files = self.get_project_files(project_path) for file_path in python_files: try: # 读取文件内容 content = self.read_file_content(file_path) if not content: continue # 解析AST tree = ast.parse(content, filename=file_path) # 执行各种检查 vulnerabilities.extend(self._check_security_issues(tree, file_path, content)) vulnerabilities.extend(self._check_performance_issues(tree, file_path, content)) vulnerabilities.extend(self._check_maintainability_issues(tree, file_path, content)) except SyntaxError as e: # 语法错误 vulnerabilities.append(self.create_vulnerability( rule_id="PY001", message=f"语法错误: {str(e)}", file_path=file_path, line_number=e.lineno, severity="critical", category="reliability" )) except Exception as e: # 其他错误 vulnerabilities.append(self.create_vulnerability( rule_id="PY000", message=f"分析错误: {str(e)}", file_path=file_path, severity="high", category="reliability" )) return vulnerabilities def _check_security_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]: """检查安全问题""" vulnerabilities = [] lines = content.split('\n') for node in ast.walk(tree): # 检查eval使用 if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'eval': vulnerabilities.append(self.create_vulnerability( rule_id="PY101", message="使用了eval()函数,存在代码注入风险", file_path=file_path, line_number=node.lineno, severity="critical", category="security", code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else "" )) # 检查exec使用 elif isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec': vulnerabilities.append(self.create_vulnerability( rule_id="PY102", message="使用了exec()函数,存在代码注入风险", file_path=file_path, line_number=node.lineno, severity="critical", category="security", code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else "" )) # 检查pickle使用 elif isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute): if (isinstance(node.func.value, ast.Name) and node.func.value.id == 'pickle' and node.func.attr in ['loads', 'load']): vulnerabilities.append(self.create_vulnerability( rule_id="PY103", message="使用了pickle反序列化,存在安全风险", file_path=file_path, line_number=node.lineno, severity="high", category="security", code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else "" )) return vulnerabilities def _check_performance_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]: """检查性能问题""" vulnerabilities = [] lines = content.split('\n') for node in ast.walk(tree): # 检查列表推导式中的循环 if isinstance(node, ast.ListComp): if len(node.generators) > 1: vulnerabilities.append(self.create_vulnerability( rule_id="PY201", message="复杂的列表推导式可能影响性能", file_path=file_path, line_number=node.lineno, severity="medium", category="performance", code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else "" )) # 检查全局变量使用 elif isinstance(node, ast.Global): vulnerabilities.append(self.create_vulnerability( rule_id="PY202", message="使用全局变量可能影响性能", file_path=file_path, line_number=node.lineno, severity="low", category="performance", code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else "" )) return vulnerabilities def _check_maintainability_issues(self, tree: ast.AST, file_path: str, content: str) -> List[Dict[str, Any]]: """检查可维护性问题""" vulnerabilities = [] lines = content.split('\n') # 检查函数长度 for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): if len(node.body) > 20: vulnerabilities.append(self.create_vulnerability( rule_id="PY301", message=f"函数 '{node.name}' 过长,建议拆分", file_path=file_path, line_number=node.lineno, severity="medium", category="maintainability", code_snippet=f"def {node.name}(...):" )) # 检查类长度 elif isinstance(node, ast.ClassDef): if len(node.body) > 15: vulnerabilities.append(self.create_vulnerability( rule_id="PY302", message=f"类 '{node.name}' 过长,建议拆分", file_path=file_path, line_number=node.lineno, severity="medium", category="maintainability", code_snippet=f"class {node.name}:" )) # 检查循环嵌套 elif isinstance(node, (ast.For, ast.While)): nested_loops = 0 for child in ast.walk(node): if isinstance(child, (ast.For, ast.While)) and child != node: nested_loops += 1 if nested_loops > 2: vulnerabilities.append(self.create_vulnerability( rule_id="PY303", message="循环嵌套过深,影响代码可读性", file_path=file_path, line_number=node.lineno, severity="low", category="maintainability", code_snippet=lines[node.lineno - 1].strip() if node.lineno <= len(lines) else "" )) return vulnerabilities