|
|
"""
|
|
|
分析器基类
|
|
|
"""
|
|
|
from abc import ABC, abstractmethod
|
|
|
from typing import List, Dict, Any
|
|
|
import os
|
|
|
import glob
|
|
|
|
|
|
class BaseAnalyzer(ABC):
|
|
|
"""分析器基类"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.name = "Base Analyzer"
|
|
|
self.version = "1.0.0"
|
|
|
self.supported_extensions = []
|
|
|
self.description = "基础分析器"
|
|
|
|
|
|
@abstractmethod
|
|
|
async def analyze(self, project_path: str, config: Dict[str, Any] = None) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
分析项目代码
|
|
|
|
|
|
Args:
|
|
|
project_path: 项目路径
|
|
|
config: 分析配置
|
|
|
|
|
|
Returns:
|
|
|
漏洞列表
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
def get_project_files(self, project_path: str) -> List[str]:
|
|
|
"""获取项目中的所有文件"""
|
|
|
files = []
|
|
|
for ext in self.supported_extensions:
|
|
|
pattern = os.path.join(project_path, "**", f"*.{ext}")
|
|
|
files.extend(glob.glob(pattern, recursive=True))
|
|
|
return files
|
|
|
|
|
|
def read_file_content(self, file_path: str) -> str:
|
|
|
"""读取文件内容"""
|
|
|
try:
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
return f.read()
|
|
|
except UnicodeDecodeError:
|
|
|
# 如果UTF-8解码失败,尝试其他编码
|
|
|
try:
|
|
|
with open(file_path, 'r', encoding='gbk') as f:
|
|
|
return f.read()
|
|
|
except:
|
|
|
return ""
|
|
|
except Exception:
|
|
|
return ""
|
|
|
|
|
|
def create_vulnerability(
|
|
|
self,
|
|
|
rule_id: str,
|
|
|
message: str,
|
|
|
file_path: str,
|
|
|
line_number: int = None,
|
|
|
severity: str = "medium",
|
|
|
category: str = "maintainability",
|
|
|
code_snippet: str = "",
|
|
|
context_before: str = "",
|
|
|
context_after: str = ""
|
|
|
) -> Dict[str, Any]:
|
|
|
"""创建漏洞对象"""
|
|
|
return {
|
|
|
'rule_id': rule_id,
|
|
|
'message': message,
|
|
|
'file_path': file_path,
|
|
|
'line_number': line_number,
|
|
|
'severity': severity,
|
|
|
'category': category,
|
|
|
'code_snippet': code_snippet,
|
|
|
'context_before': context_before,
|
|
|
'context_after': context_after
|
|
|
}
|