You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
4.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
AI增强服务 - 基于现有的DeepSeek集成
"""
import requests
import json
import time
from typing import Dict, Any, List
class AIService:
"""AI增强服务"""
def __init__(self):
# 从环境变量或配置文件读取API配置
self.api_url = "https://api.deepseek.com/v1/chat/completions"
self.api_key = "your_deepseek_api_key_here" # 实际使用时从环境变量获取
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async def enhance_vulnerability(self, vulnerability: Dict[str, Any]) -> Dict[str, Any]:
"""AI增强漏洞分析"""
try:
# 构建AI分析提示
prompt = self._build_enhancement_prompt(vulnerability)
# 调用AI API
ai_response = await self._call_ai_api(prompt)
# 解析AI响应
enhancement = self._parse_ai_response(ai_response)
return {
'ai_enhanced': True,
'ai_confidence': enhancement.get('confidence', 0.8),
'ai_suggestion': enhancement.get('suggestion', ''),
'ai_explanation': enhancement.get('explanation', '')
}
except Exception as e:
print(f"AI增强失败: {str(e)}")
return {
'ai_enhanced': False,
'ai_confidence': 0.0,
'ai_suggestion': '',
'ai_explanation': f'AI分析失败: {str(e)}'
}
def _build_enhancement_prompt(self, vulnerability: Dict[str, Any]) -> str:
"""构建AI分析提示"""
prompt = f"""
请分析以下代码漏洞,并提供详细的修复建议:
漏洞信息:
- 规则ID: {vulnerability.get('rule_id', 'N/A')}
- 严重程度: {vulnerability.get('severity', 'N/A')}
- 分类: {vulnerability.get('category', 'N/A')}
- 文件路径: {vulnerability.get('file_path', 'N/A')}
- 行号: {vulnerability.get('line_number', 'N/A')}
- 描述: {vulnerability.get('message', 'N/A')}
相关代码:
```{vulnerability.get('language', 'text')}
{vulnerability.get('code_snippet', '')}
```
请提供:
1. 漏洞的详细解释
2. 可能的修复方案
3. 修复后的代码示例
4. 预防类似问题的最佳实践
请以JSON格式回复包含以下字段
- explanation: 详细解释
- suggestion: 修复建议
- fixed_code: 修复后的代码示例
- best_practices: 最佳实践建议
- confidence: 分析置信度(0-1)
"""
return prompt
async def _call_ai_api(self, prompt: str) -> str:
"""调用AI API"""
data = {
"model": "deepseek-chat",
"messages": [
{"role": "system", "content": "你是一个专业的代码安全分析专家。"},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 2000
}
response = requests.post(self.api_url, headers=self.headers, json=data)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
def _parse_ai_response(self, response: str) -> Dict[str, Any]:
"""解析AI响应"""
try:
# 尝试解析JSON响应
if response.strip().startswith('{'):
return json.loads(response)
# 如果不是JSON返回原始响应
return {
'explanation': response,
'suggestion': '',
'fixed_code': '',
'best_practices': '',
'confidence': 0.7
}
except json.JSONDecodeError:
return {
'explanation': response,
'suggestion': '',
'fixed_code': '',
'best_practices': '',
'confidence': 0.7
}
async def batch_enhance_vulnerabilities(self, vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""批量AI增强漏洞"""
enhanced_vulnerabilities = []
for vulnerability in vulnerabilities:
enhancement = await self.enhance_vulnerability(vulnerability)
vulnerability.update(enhancement)
enhanced_vulnerabilities.append(vulnerability)
# 避免API请求过快
time.sleep(0.5)
return enhanced_vulnerabilities