|
|
"""
|
|
|
AI增强服务 - 基于现有的DeepSeek集成
|
|
|
"""
|
|
|
import requests
|
|
|
import json
|
|
|
import time
|
|
|
from typing import Dict, Any, List
|
|
|
|
|
|
class AIService:
|
|
|
"""AI增强服务"""
|
|
|
|
|
|
def __init__(self):
|
|
|
# 从环境变量或配置文件读取API配置
|
|
|
self.api_url = "https://api.deepseek.com/v1/chat/completions"
|
|
|
self.api_key = "your_deepseek_api_key_here" # 实际使用时从环境变量获取
|
|
|
self.headers = {
|
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
|
|
|
async def enhance_vulnerability(self, vulnerability: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""AI增强漏洞分析"""
|
|
|
try:
|
|
|
# 构建AI分析提示
|
|
|
prompt = self._build_enhancement_prompt(vulnerability)
|
|
|
|
|
|
# 调用AI API
|
|
|
ai_response = await self._call_ai_api(prompt)
|
|
|
|
|
|
# 解析AI响应
|
|
|
enhancement = self._parse_ai_response(ai_response)
|
|
|
|
|
|
return {
|
|
|
'ai_enhanced': True,
|
|
|
'ai_confidence': enhancement.get('confidence', 0.8),
|
|
|
'ai_suggestion': enhancement.get('suggestion', ''),
|
|
|
'ai_explanation': enhancement.get('explanation', '')
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"AI增强失败: {str(e)}")
|
|
|
return {
|
|
|
'ai_enhanced': False,
|
|
|
'ai_confidence': 0.0,
|
|
|
'ai_suggestion': '',
|
|
|
'ai_explanation': f'AI分析失败: {str(e)}'
|
|
|
}
|
|
|
|
|
|
def _build_enhancement_prompt(self, vulnerability: Dict[str, Any]) -> str:
|
|
|
"""构建AI分析提示"""
|
|
|
prompt = f"""
|
|
|
请分析以下代码漏洞,并提供详细的修复建议:
|
|
|
|
|
|
漏洞信息:
|
|
|
- 规则ID: {vulnerability.get('rule_id', 'N/A')}
|
|
|
- 严重程度: {vulnerability.get('severity', 'N/A')}
|
|
|
- 分类: {vulnerability.get('category', 'N/A')}
|
|
|
- 文件路径: {vulnerability.get('file_path', 'N/A')}
|
|
|
- 行号: {vulnerability.get('line_number', 'N/A')}
|
|
|
- 描述: {vulnerability.get('message', 'N/A')}
|
|
|
|
|
|
相关代码:
|
|
|
```{vulnerability.get('language', 'text')}
|
|
|
{vulnerability.get('code_snippet', '')}
|
|
|
```
|
|
|
|
|
|
请提供:
|
|
|
1. 漏洞的详细解释
|
|
|
2. 可能的修复方案
|
|
|
3. 修复后的代码示例
|
|
|
4. 预防类似问题的最佳实践
|
|
|
|
|
|
请以JSON格式回复,包含以下字段:
|
|
|
- explanation: 详细解释
|
|
|
- suggestion: 修复建议
|
|
|
- fixed_code: 修复后的代码示例
|
|
|
- best_practices: 最佳实践建议
|
|
|
- confidence: 分析置信度(0-1)
|
|
|
"""
|
|
|
return prompt
|
|
|
|
|
|
async def _call_ai_api(self, prompt: str) -> str:
|
|
|
"""调用AI API"""
|
|
|
data = {
|
|
|
"model": "deepseek-chat",
|
|
|
"messages": [
|
|
|
{"role": "system", "content": "你是一个专业的代码安全分析专家。"},
|
|
|
{"role": "user", "content": prompt}
|
|
|
],
|
|
|
"temperature": 0.3,
|
|
|
"max_tokens": 2000
|
|
|
}
|
|
|
|
|
|
response = requests.post(self.api_url, headers=self.headers, json=data)
|
|
|
response.raise_for_status()
|
|
|
result = response.json()
|
|
|
|
|
|
return result['choices'][0]['message']['content']
|
|
|
|
|
|
def _parse_ai_response(self, response: str) -> Dict[str, Any]:
|
|
|
"""解析AI响应"""
|
|
|
try:
|
|
|
# 尝试解析JSON响应
|
|
|
if response.strip().startswith('{'):
|
|
|
return json.loads(response)
|
|
|
|
|
|
# 如果不是JSON,返回原始响应
|
|
|
return {
|
|
|
'explanation': response,
|
|
|
'suggestion': '',
|
|
|
'fixed_code': '',
|
|
|
'best_practices': '',
|
|
|
'confidence': 0.7
|
|
|
}
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
return {
|
|
|
'explanation': response,
|
|
|
'suggestion': '',
|
|
|
'fixed_code': '',
|
|
|
'best_practices': '',
|
|
|
'confidence': 0.7
|
|
|
}
|
|
|
|
|
|
async def batch_enhance_vulnerabilities(self, vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
"""批量AI增强漏洞"""
|
|
|
enhanced_vulnerabilities = []
|
|
|
|
|
|
for vulnerability in vulnerabilities:
|
|
|
enhancement = await self.enhance_vulnerability(vulnerability)
|
|
|
vulnerability.update(enhancement)
|
|
|
enhanced_vulnerabilities.append(vulnerability)
|
|
|
|
|
|
# 避免API请求过快
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
return enhanced_vulnerabilities
|