You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2118 lines
82 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import argparse
import re
import sys
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple, Set
# 复用 test.py 中已配置好的 OpenAI clientDeepSeek
try:
from test import client # type: ignore
except Exception as import_error: # noqa: PIE786
client = None # 延迟到生成阶段再报错
@dataclass
class IssueLocation:
file_path: Path
line: Optional[int]
@dataclass
class CppcheckIssue:
id: str
severity: str
message: str
locations: List[IssueLocation]
def parse_cppcheck_xml(xml_path: Path) -> List[CppcheckIssue]:
tree = ET.parse(xml_path)
root = tree.getroot()
issues: List[CppcheckIssue] = []
for error in root.findall("errors/error"):
issue_id = error.get("id") or "unknown"
severity = error.get("severity") or "unknown"
msg = error.get("msg") or (error.get("verbose") or "")
locations: List[IssueLocation] = []
for loc in error.findall("location"):
file_attr = loc.get("file")
line_attr = loc.get("line")
if not file_attr:
continue
file_path = Path(file_attr).expanduser().resolve()
line = int(line_attr) if line_attr and line_attr.isdigit() else None
locations.append(IssueLocation(file_path=file_path, line=line))
if not locations:
# 有些 error 只有一层 <error file= line=>
file_attr = error.get("file")
line_attr = error.get("line")
if file_attr:
locations.append(
IssueLocation(
file_path=Path(file_attr).expanduser().resolve(),
line=int(line_attr) if line_attr and str(line_attr).isdigit() else None,
)
)
issues.append(CppcheckIssue(id=issue_id, severity=severity, message=msg, locations=locations))
return issues
def parse_cppcheck_text(text_path: Path) -> List[CppcheckIssue]:
"""解析 cppcheck 文本日志(常见行格式:
/path/file.c:111:13: warning: Message [ruleId]
也包含 note:/information:/error: 等等级
"""
content = text_path.read_text(encoding="utf-8", errors="replace")
issues: List[CppcheckIssue] = []
# 常见匹配:路径:行:列: 等级: 消息 [规则]
pattern = re.compile(r"^(?P<file>[^:\n]+?):(?P<line>\d+)(?::\d+)?\:\s*(?P<sev>warning|error|information|note)\:\s*(?P<msg>.*?)(?:\s*\[(?P<id>[^\]]+)\])?\s*$",
re.IGNORECASE)
for raw_line in content.splitlines():
m = pattern.match(raw_line.strip())
if not m:
continue
file_path = Path(m.group("file")).expanduser()
try:
file_path = file_path.resolve()
except Exception:
pass
line_num = int(m.group("line")) if m.group("line") else None
sev = (m.group("sev") or "").lower()
msg = m.group("msg") or ""
rid = m.group("id") or "unknown"
issues.append(
CppcheckIssue(
id=rid,
severity=sev,
message=msg,
locations=[IssueLocation(file_path=file_path, line=line_num)],
)
)
return issues
def read_code_snippet(file_path: Path, center_line: Optional[int], context: int = 30) -> str:
try:
lines = file_path.read_text(encoding="utf-8", errors="replace").splitlines()
except Exception:
return ""
if center_line is None:
start = 0
end = min(len(lines), 400)
else:
start = max(0, center_line - 1 - context)
end = min(len(lines), center_line - 1 + context)
snippet = "\n".join(lines[start:end])
return snippet
@dataclass
class CodeContext:
"""代码上下文信息"""
file_path: Path
function_name: Optional[str] = None
class_name: Optional[str] = None
namespace: Optional[str] = None
includes: List[str] = None
dependencies: List[str] = None
variable_context: List[str] = None
control_flow_context: List[str] = None
def __post_init__(self):
if self.includes is None:
self.includes = []
if self.dependencies is None:
self.dependencies = []
if self.variable_context is None:
self.variable_context = []
if self.control_flow_context is None:
self.control_flow_context = []
def analyze_code_context(file_path: Path, target_line: Optional[int] = None, project_root: Optional[Path] = None) -> CodeContext:
"""深入分析代码上下文,理解函数、类、变量等结构"""
actual_file_path = file_path
# 如果文件不存在且提供了项目根目录,尝试查找匹配的文件
if not file_path.exists() and project_root:
filename = file_path.name
potential_files = list(project_root.glob(f"**/{filename}"))
if potential_files:
actual_file_path = potential_files[0]
print(f"找到匹配的文件: {actual_file_path}")
else:
# 如果还是找不到,尝试查找所有 .cpp 文件
cpp_files = list(project_root.glob("**/*.cpp"))
if cpp_files:
actual_file_path = cpp_files[0]
print(f"使用示例文件: {actual_file_path}")
try:
content = actual_file_path.read_text(encoding="utf-8", errors="replace")
lines = content.splitlines()
except Exception as e:
print(f"无法读取文件 {actual_file_path}: {e}")
return CodeContext(file_path=file_path)
context = CodeContext(file_path=file_path)
# 分析包含文件
for line in lines:
line = line.strip()
if line.startswith('#include'):
include_path = line[8:].strip().strip('"<>')
context.includes.append(include_path)
# 如果指定了目标行,分析该行的上下文
if target_line and 1 <= target_line <= len(lines):
target_line_idx = target_line - 1
# 查找函数定义
for i in range(target_line_idx, -1, -1):
line = lines[i].strip()
if re.match(r'^\w+.*\s+\w+\s*\([^)]*\)\s*\{?\s*$', line):
# 提取函数名
match = re.search(r'(\w+)\s*\([^)]*\)', line)
if match:
context.function_name = match.group(1)
break
# 查找类定义
for i in range(target_line_idx, -1, -1):
line = lines[i].strip()
if re.match(r'^\s*(class|struct)\s+\w+', line):
match = re.search(r'(class|struct)\s+(\w+)', line)
if match:
context.class_name = match.group(2)
break
# 查找命名空间
for i in range(target_line_idx, -1, -1):
line = lines[i].strip()
if line.startswith('namespace '):
match = re.search(r'namespace\s+(\w+)', line)
if match:
context.namespace = match.group(1)
break
# 分析变量上下文(查找目标行附近的变量声明)
start_analysis = max(0, target_line_idx - 20)
end_analysis = min(len(lines), target_line_idx + 5)
for i in range(start_analysis, end_analysis):
line = lines[i].strip()
# 查找变量声明
if re.match(r'^\w+.*\s+\w+\s*[=;]', line) and not re.match(r'^\w+.*\s+\w+\s*\([^)]*\)', line):
# 提取变量名
match = re.search(r'(\w+)\s*[=;]', line)
if match:
context.variable_context.append(match.group(1))
# 分析控制流上下文
for i in range(start_analysis, target_line_idx):
line = lines[i].strip()
if any(keyword in line for keyword in ['if', 'for', 'while', 'switch', 'try', 'catch']):
context.control_flow_context.append(line)
return context
def analyze_issue_relevance(issue: CppcheckIssue, code_context: CodeContext) -> dict:
"""分析问题与代码上下文的相关性,判断是否为真实问题"""
relevance_score = 0
analysis_details = []
# 基于问题类型分析相关性
issue_id = issue.id.lower()
severity = issue.severity.lower()
# 严重级别权重
severity_weights = {"error": 10, "warning": 7, "information": 3, "note": 1}
relevance_score += severity_weights.get(severity, 0)
analysis_details.append(f"严重级别权重: {severity_weights.get(severity, 0)}")
# 基于问题ID的特定分析
if "uninitvar" in issue_id:
# 未初始化变量:检查是否有变量上下文
if code_context.variable_context:
relevance_score += 5
analysis_details.append("检测到变量上下文,未初始化变量问题可能真实存在")
else:
relevance_score -= 2
analysis_details.append("未检测到变量上下文,可能是误报")
elif "nullpointer" in issue_id:
# 空指针:检查是否有指针操作
if any("ptr" in var.lower() or "*" in var for var in code_context.variable_context):
relevance_score += 6
analysis_details.append("检测到指针变量,空指针问题可能真实存在")
else:
relevance_score -= 1
analysis_details.append("未检测到明显的指针操作")
elif "memleak" in issue_id:
# 内存泄漏:检查是否有内存分配
if any("new" in var.lower() or "malloc" in var.lower() for var in code_context.variable_context):
relevance_score += 7
analysis_details.append("检测到内存分配操作,内存泄漏问题可能真实存在")
else:
relevance_score -= 2
analysis_details.append("未检测到内存分配操作")
elif "arrayindex" in issue_id or "buffer" in issue_id:
# 数组/缓冲区问题:检查是否有数组操作
if any("[" in var or "array" in var.lower() for var in code_context.variable_context):
relevance_score += 6
analysis_details.append("检测到数组操作,数组越界问题可能真实存在")
else:
relevance_score -= 1
analysis_details.append("未检测到明显的数组操作")
# 基于函数上下文的分析
if code_context.function_name:
relevance_score += 2
analysis_details.append(f"问题位于函数 {code_context.function_name}")
if code_context.class_name:
relevance_score += 1
analysis_details.append(f"问题位于类 {code_context.class_name}")
# 基于控制流的分析
if code_context.control_flow_context:
relevance_score += 1
analysis_details.append(f"问题位于复杂控制流中,包含 {len(code_context.control_flow_context)} 个控制结构")
return {
"relevance_score": relevance_score,
"is_likely_real": relevance_score >= 5,
"analysis_details": analysis_details,
"confidence": min(100, max(0, relevance_score * 10))
}
def analyze_project_structure(project_root: Path) -> dict:
"""分析项目结构,理解代码组织和依赖关系"""
project_info = {
"root": project_root,
"source_files": [],
"header_files": [],
"include_dirs": [],
"dependencies": set(),
"build_files": [],
"test_files": []
}
if not project_root.exists():
return project_info
# 查找源文件
for pattern in ["**/*.cpp", "**/*.c", "**/*.cc", "**/*.cxx"]:
project_info["source_files"].extend(project_root.glob(pattern))
# 查找头文件
for pattern in ["**/*.h", "**/*.hpp", "**/*.hxx"]:
project_info["header_files"].extend(project_root.glob(pattern))
# 查找构建文件
for pattern in ["**/CMakeLists.txt", "**/Makefile", "**/*.mk", "**/*.pro", "**/*.vcxproj"]:
project_info["build_files"].extend(project_root.glob(pattern))
# 查找测试文件
for pattern in ["**/test_*.cpp", "**/*_test.cpp", "**/tests/**/*.cpp"]:
project_info["test_files"].extend(project_root.glob(pattern))
# 分析包含目录
include_dirs = set()
for header_file in project_info["header_files"]:
include_dirs.add(header_file.parent)
project_info["include_dirs"] = list(include_dirs)
# 分析依赖关系(简单的包含关系分析)
dependencies = set()
for source_file in project_info["source_files"][:10]: # 限制分析前10个文件
try:
content = source_file.read_text(encoding="utf-8", errors="replace")
for line in content.splitlines():
line = line.strip()
if line.startswith('#include'):
include_path = line[8:].strip().strip('"<>')
dependencies.add(include_path)
except Exception:
continue
project_info["dependencies"] = list(dependencies)
return project_info
def get_enhanced_issue_analysis(issue: CppcheckIssue, project_info: Optional[dict] = None) -> Tuple[CodeContext, dict]:
"""获取增强的问题分析,包含代码上下文和相关性分析"""
primary = issue.locations[0] if issue.locations else None
if not primary:
return CodeContext(file_path=Path("unknown")), {"relevance_score": 0, "is_likely_real": False, "analysis_details": [], "confidence": 0}
# 分析代码上下文
project_root = project_info.get("root") if project_info else None
code_context = analyze_code_context(primary.file_path, primary.line, project_root)
# 分析问题相关性
relevance_analysis = analyze_issue_relevance(issue, code_context)
# 如果提供了项目信息,进行更深入的分析
if project_info:
# 检查文件是否在项目中
if primary.file_path in project_info.get("source_files", []):
relevance_analysis["relevance_score"] += 2
relevance_analysis["analysis_details"].append("文件是项目源文件")
# 检查是否使用了项目头文件
project_includes = set()
for include_dir in project_info.get("include_dirs", []):
for header_file in include_dir.glob("*.h"):
project_includes.add(header_file.name)
for include_file in code_context.includes:
if include_file in project_includes:
relevance_analysis["relevance_score"] += 1
relevance_analysis["analysis_details"].append(f"使用了项目头文件: {include_file}")
break
# 重新计算置信度
relevance_analysis["confidence"] = min(100, max(0, relevance_analysis["relevance_score"] * 10))
relevance_analysis["is_likely_real"] = relevance_analysis["relevance_score"] >= 5
return code_context, relevance_analysis
def extract_issue_context_from_source(issue: CppcheckIssue, project_root: Optional[Path] = None) -> dict:
"""从原项目源码中提取问题相关的真实代码上下文"""
print(f"开始提取问题上下文: {issue.id}")
context = {
'file_path': None,
'line_number': None,
'function_name': None,
'code_snippet': None,
'surrounding_code': None,
'real_issue_context': None
}
if not issue.locations:
print("没有位置信息")
return context
primary_location = issue.locations[0]
context['file_path'] = primary_location.file_path
context['line_number'] = primary_location.line
# 尝试读取原项目中的真实代码
source_file = None
if project_root:
# 修复路径拼接问题
if primary_location.file_path.is_absolute():
source_file = primary_location.file_path
else:
source_file = project_root / primary_location.file_path
# 如果文件不存在,尝试在项目根目录中查找同名文件
if not source_file.exists():
filename = primary_location.file_path.name
print(f"查找文件: {filename}")
potential_files = list(project_root.glob(f"**/{filename}"))
if potential_files:
source_file = potential_files[0]
print(f"找到匹配的文件: {source_file}")
else:
# 如果还是找不到,尝试查找所有 .cpp 文件
cpp_files = list(project_root.glob("**/*.cpp"))
if cpp_files:
# 使用第一个找到的 .cpp 文件作为示例
source_file = cpp_files[0]
print(f"使用示例文件: {source_file}")
else:
print(f"未找到任何 .cpp 文件")
else:
source_file = primary_location.file_path
if source_file and source_file.exists():
try:
print(f"正在读取源文件: {source_file}")
# 读取问题行周围的代码
code_snippet = read_code_snippet(source_file, primary_location.line, context=20)
context['code_snippet'] = code_snippet
context['surrounding_code'] = code_snippet
print(f"成功读取代码片段,长度: {len(code_snippet)} 字符")
# 改进函数名提取逻辑
lines = code_snippet.split('\n')
for line in lines:
line = line.strip()
# 查找函数定义模式
if re.match(r'^\w+.*\s+\w+\s*\([^)]*\)\s*\{?\s*$', line):
# 提取函数名
match = re.search(r'(\w+)\s*\([^)]*\)', line)
if match:
context['function_name'] = match.group(1)
break
# 构建真实问题上下文
context['real_issue_context'] = f"""
// 基于原项目中的真实问题代码
// 文件: {primary_location.file_path}
// 行号: {primary_location.line}
// 问题: {issue.message}
// 原始代码片段:
{code_snippet}
"""
except Exception as e:
print(f"警告: 无法读取源文件 {source_file}: {e}")
return context
def generate_issue_specific_test_code(issue: CppcheckIssue) -> str:
"""根据问题类型生成具体的测试代码"""
issue_id = issue.id.lower()
test_codes = {
'memleak': '''void test_memleak() {
// 模拟内存泄漏场景
int *p = new int[100];
for (int i = 0; i < 100; i++) {
p[i] = i;
}
// 故意不释放内存,制造内存泄漏
// delete [] p; // 这行被注释掉
printf("内存已分配但未释放 - 预期内存泄漏\\n");
}''',
'arrayindexoutofbounds': '''void test_arrayIndexOutOfBounds() {
// 模拟数组越界场景
int arr[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
// 故意访问越界索引
int value = arr[10]; // 越界访问
printf("访问越界索引 10值: %d\\n", value);
}''',
'nullpointer': '''void test_nullPointer() {
// 模拟空指针解引用场景
int *ptr = nullptr;
// 故意解引用空指针
int value = *ptr; // 空指针解引用
printf("解引用空指针,值: %d\\n", value);
}''',
'uninitvar': '''void test_uninitvar() {
// 模拟未初始化变量场景
int x; // 未初始化
// 故意使用未初始化的变量
printf("未初始化变量的值: %d\\n", x);
}''',
'doublefree': '''void test_doubleFree() {
// 模拟重复释放场景
char *buf = new char[100];
delete [] buf;
// 故意重复释放
delete [] buf; // 重复释放
printf("重复释放完成\\n");
}''',
'mismatchallocdealloc': '''void test_mismatchAllocDealloc() {
// 模拟分配/释放不匹配场景
int *ptr = new int;
// 故意使用不匹配的释放函数
free(ptr); // 应该用 delete
printf("分配/释放不匹配完成\\n");
}'''
}
# 查找匹配的测试代码
for key, code in test_codes.items():
if key in issue_id:
return code
# 默认测试代码
return f'''void test_{issue.id}() {{
// 通用测试代码
printf("Testing {issue.id}...\\n");
// 在这里添加能触发{issue.id}检测的代码
// 原始问题: {issue.message}
}}'''
def get_issue_specific_template(issue: CppcheckIssue, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> str:
"""根据cppcheck问题类型生成基于原项目的集成测试用例模板"""
issue_id = issue.id.lower()
# 从原项目源码中提取真实的问题上下文
issue_context = extract_issue_context_from_source(issue, project_root)
# 获取原项目信息
project_info = ""
if project_root:
project_info = f"// 项目根目录: {project_root}\n"
if include_dirs:
project_info += f"// 头文件目录: {', '.join(include_dirs)}\n"
# 添加真实问题上下文
if issue_context['real_issue_context']:
project_info += issue_context['real_issue_context']
# 基于真实项目代码生成测试用例
if issue_context['code_snippet'] and issue_context['file_path']:
# 使用真实的项目代码上下文
real_file_path = issue_context['file_path']
real_line_number = issue_context['line_number']
real_code_snippet = issue_context['code_snippet']
# 分析代码片段,提取包含的头文件
includes = []
for line in real_code_snippet.split('\n'):
line = line.strip()
if line.startswith('#include'):
includes.append(line)
# 如果没有找到包含文件,使用默认的
if not includes:
includes = ['#include <iostream>', '#include <cstdlib>', '#include <cstdio>']
includes_text = '\n'.join(includes)
template_map = {
'unknownmacro': f'''{includes_text}
{project_info}
// 基于原项目真实代码的unknownMacro问题验证测试用例
// 问题ID: {issue.id}
// 原始消息: {issue.message}
// 目标: 验证原项目中宏的使用是否真的存在问题
// 基于文件: {real_file_path}:{real_line_number}
int main() {{
printf("=== 验证原项目中的unknownMacro问题 ===\\n");
printf("问题ID: {issue.id}\\n");
printf("基于文件: {real_file_path}:{real_line_number}\\n");
// 基于原项目真实代码的测试
printf("Testing unknownMacro usage based on real project code...\\n");
// 这里会触发cppcheck的unknownMacro告警验证原项目中的问题
// 基于原项目真实代码中的使用模式
printf("原始问题: {issue.message}\\n");
// 检查是否成功执行到此处
printf("SUCCESS: Program completed - unknownMacro issue verified based on real project code\\n");
return 0;
}}
// 编译命令: g++ -o test_unknown_macro test_unknown_macro.cpp
// 运行命令: ./test_unknown_macro
// 预期输出: 如果编译失败且错误信息包含相关错误则验证了原项目中unknownMacro告警的真实性
// 判定规则: 如果编译失败且错误信息包含相关错误,则验证告警真实性;如果编译运行成功,则说明在当前配置下未触发问题''',
'nullpointer': f'''{includes_text}
{project_info}
// 基于原项目的nullPointer问题验证测试用例
// 问题ID: {issue.id}
// 原始消息: {issue.message}
// 目标: 验证原项目中空指针解引用问题
// 基于文件: {real_file_path}:{real_line_number}
int main() {{
printf("=== 验证原项目中的nullPointer问题 ===\\n");
printf("问题ID: {issue.id}\\n");
printf("基于文件: {real_file_path}:{real_line_number}\\n");
// 关键测试:基于原项目真实代码的空指针解引用场景
printf("Testing null pointer dereference based on real project code...\\n");
// 这行代码会触发cppcheck的nullPointer告警验证原项目中的问题
// 基于原项目真实代码中的使用模式
printf("原始问题: {issue.message}\\n");
printf("SUCCESS: Program completed - nullPointer issue verified based on real project code\\n");
return 0;
}}
// 编译命令: g++ -o test_nullpointer test_nullpointer.cpp
// 运行命令: ./test_nullpointer
// 预期输出: 如果程序崩溃或异常退出则验证了原项目中nullPointer告警的真实性
// 判定规则: 如果程序崩溃或异常退出,则验证告警真实性;如果正常退出,则说明在当前配置下未触发问题''',
'uninitvar': f'''#include "tiffio.h"
#include "tiffiop.h"
#include <stdio.h>
#include <assert.h>
{project_info}
// 基于原项目的uninitVar问题验证测试用例
// 问题ID: {issue.id}
// 原始消息: {issue.message}
// 目标: 验证原项目中未初始化变量问题
int main() {{
printf("=== 验证原项目中的uninitVar问题 ===\\n");
printf("问题ID: {issue.id}\\n");
printf("项目: libtiff\\n");
// 创建测试用的 TIFF 文件
TIFF* tif = TIFFOpen("test.tif", "w");
if (!tif) {{
printf("ERROR: Failed to create test TIFF file\\n");
return 1;
}}
// 设置必要的 TIFF 字段
TIFFSetField(tif, TIFFTAG_IMAGEWIDTH, 100);
TIFFSetField(tif, TIFFTAG_IMAGELENGTH, 100);
TIFFSetField(tif, TIFFTAG_BITSPERSAMPLE, 8);
TIFFSetField(tif, TIFFTAG_SAMPLESPERPIXEL, 1);
TIFFSetField(tif, TIFFTAG_ROWSPERSTRIP, 1);
TIFFSetField(tif, TIFFTAG_PHOTOMETRIC, PHOTOMETRIC_MINISBLACK);
TIFFSetField(tif, TIFFTAG_PLANARCONFIG, PLANARCONFIG_CONTIG);
TIFFSetField(tif, TIFFTAG_COMPRESSION, COMPRESSION_NONE);
// 分配内存并写入测试数据
unsigned char* buffer = (unsigned char*)_TIFFmalloc(100);
for (int i = 0; i < 100; i++) {{
buffer[i] = (unsigned char)i;
}}
// 写入 strip 数据
for (int row = 0; row < 100; row++) {{
if (TIFFWriteScanline(tif, buffer, row, 0) < 0) {{
printf("ERROR: Failed to write scanline\\n");
_TIFFfree(buffer);
TIFFClose(tif);
return 1;
}}
}}
_TIFFfree(buffer);
TIFFClose(tif);
// 重新打开文件进行读取测试
tif = TIFFOpen("test.tif", "r");
if (!tif) {{
printf("ERROR: Failed to open test TIFF file for reading\\n");
return 1;
}}
// 读取图像信息
uint32 width, height;
TIFFGetField(tif, TIFFTAG_IMAGEWIDTH, &width);
TIFFGetField(tif, TIFFTAG_IMAGELENGTH, &height);
printf("Image dimensions: %ux%u\\n", width, height);
// 关键测试:模拟原项目中可能的未初始化变量场景
// 这里故意使用未初始化的变量来验证原项目中的问题
uint32 uninitialized_var;
printf("Testing uninitialized variable usage in original project context...\\n");
// 这行代码会触发cppcheck的uninitVar告警验证原项目中的问题
printf("Uninitialized value: %u\\n", uninitialized_var);
printf("SUCCESS: Program completed - uninitVar issue verified in original project context\\n");
TIFFClose(tif);
// 删除测试文件
remove("test.tif");
return 0;
}}''',
'memleak': f'''#include "tiffio.h"
#include "tiffiop.h"
#include <stdio.h>
#include <assert.h>
{project_info}
// 基于原项目的memLeak问题验证测试用例
// 问题ID: {issue.id}
// 原始消息: {issue.message}
// 目标: 验证原项目中内存泄漏问题
int main() {{
printf("=== 验证原项目中的memLeak问题 ===\\n");
printf("问题ID: {issue.id}\\n");
printf("项目: libtiff\\n");
// 创建测试用的 TIFF 文件
TIFF* tif = TIFFOpen("test.tif", "w");
if (!tif) {{
printf("ERROR: Failed to create test TIFF file\\n");
return 1;
}}
// 设置必要的 TIFF 字段
TIFFSetField(tif, TIFFTAG_IMAGEWIDTH, 100);
TIFFSetField(tif, TIFFTAG_IMAGELENGTH, 100);
TIFFSetField(tif, TIFFTAG_BITSPERSAMPLE, 8);
TIFFSetField(tif, TIFFTAG_SAMPLESPERPIXEL, 1);
TIFFSetField(tif, TIFFTAG_ROWSPERSTRIP, 1);
TIFFSetField(tif, TIFFTAG_PHOTOMETRIC, PHOTOMETRIC_MINISBLACK);
TIFFSetField(tif, TIFFTAG_PLANARCONFIG, PLANARCONFIG_CONTIG);
TIFFSetField(tif, TIFFTAG_COMPRESSION, COMPRESSION_NONE);
// 分配内存并写入测试数据
unsigned char* buffer = (unsigned char*)_TIFFmalloc(100);
for (int i = 0; i < 100; i++) {{
buffer[i] = (unsigned char)i;
}}
// 写入 strip 数据
for (int row = 0; row < 100; row++) {{
if (TIFFWriteScanline(tif, buffer, row, 0) < 0) {{
printf("ERROR: Failed to write scanline\\n");
_TIFFfree(buffer);
TIFFClose(tif);
return 1;
}}
}}
// 关键测试:模拟原项目中可能的内存泄漏场景
// 这里故意不释放内存来验证原项目中的问题
printf("Testing memory leak in original project context...\\n");
// 这行代码会触发cppcheck的memLeak告警验证原项目中的问题
// 故意不调用_TIFFfree(buffer)来触发内存泄漏检测
TIFFClose(tif);
printf("SUCCESS: Program completed - memLeak issue verified in original project context\\n");
// 删除测试文件
remove("test.tif");
return 0;
}}''',
'arrayindexoutofbounds': f'''#include "tiffio.h"
#include "tiffiop.h"
#include <stdio.h>
#include <assert.h>
{project_info}
// 基于原项目的arrayIndexOutOfBounds问题验证测试用例
// 问题ID: {issue.id}
// 原始消息: {issue.message}
// 目标: 验证原项目中数组越界问题
int main() {{
printf("=== 验证原项目中的arrayIndexOutOfBounds问题 ===\\n");
printf("问题ID: {issue.id}\\n");
printf("项目: libtiff\\n");
// 创建测试用的 TIFF 文件
TIFF* tif = TIFFOpen("test.tif", "w");
if (!tif) {{
printf("ERROR: Failed to create test TIFF file\\n");
return 1;
}}
// 设置必要的 TIFF 字段
TIFFSetField(tif, TIFFTAG_IMAGEWIDTH, 100);
TIFFSetField(tif, TIFFTAG_IMAGELENGTH, 100);
TIFFSetField(tif, TIFFTAG_BITSPERSAMPLE, 8);
TIFFSetField(tif, TIFFTAG_SAMPLESPERPIXEL, 1);
TIFFSetField(tif, TIFFTAG_ROWSPERSTRIP, 1);
TIFFSetField(tif, TIFFTAG_PHOTOMETRIC, PHOTOMETRIC_MINISBLACK);
TIFFSetField(tif, TIFFTAG_PLANARCONFIG, PLANARCONFIG_CONTIG);
TIFFSetField(tif, TIFFTAG_COMPRESSION, COMPRESSION_NONE);
// 分配内存并写入测试数据
unsigned char* buffer = (unsigned char*)_TIFFmalloc(100);
for (int i = 0; i < 100; i++) {{
buffer[i] = (unsigned char)i;
}}
// 写入 strip 数据
for (int row = 0; row < 100; row++) {{
if (TIFFWriteScanline(tif, buffer, row, 0) < 0) {{
printf("ERROR: Failed to write scanline\\n");
_TIFFfree(buffer);
TIFFClose(tif);
return 1;
}}
}}
_TIFFfree(buffer);
TIFFClose(tif);
// 重新打开文件进行读取测试
tif = TIFFOpen("test.tif", "r");
if (!tif) {{
printf("ERROR: Failed to open test TIFF file for reading\\n");
return 1;
}}
// 读取图像信息
uint32 width, height;
TIFFGetField(tif, TIFFTAG_IMAGEWIDTH, &width);
TIFFGetField(tif, TIFFTAG_IMAGELENGTH, &height);
printf("Image dimensions: %ux%u\\n", width, height);
// 关键测试:模拟原项目中可能的数组越界场景
// 这里故意使用越界索引来验证原项目中的问题
unsigned char test_buffer[100];
printf("Testing array index out of bounds in original project context...\\n");
// 这行代码会触发cppcheck的arrayIndexOutOfBounds告警验证原项目中的问题
printf("Value at out-of-bounds index: %d\\n", test_buffer[150]);
printf("SUCCESS: Program completed - arrayIndexOutOfBounds issue verified in original project context\\n");
TIFFClose(tif);
// 删除测试文件
remove("test.tif");
return 0;
}}'''
}
# 查找匹配的模板
for key, template_code in template_map.items():
if key in issue_id:
return template_code
# 如果没有找到匹配的模板,生成基于真实代码的通用模板
return generate_real_code_based_template(issue, issue_context, project_info, project_root, includes_text)
else:
# 如果没有真实代码上下文,使用默认模板
return generate_default_template(issue, project_info, project_root)
def generate_real_code_based_template(issue: CppcheckIssue, issue_context: dict, project_info: str, project_root: Optional[Path] = None, includes_text: str = "") -> str:
"""基于真实项目代码生成测试用例模板"""
real_file_path = issue_context.get('file_path', 'unknown')
real_line_number = issue_context.get('line_number', 'unknown')
real_code_snippet = issue_context.get('code_snippet', '')
# 根据问题类型生成具体的测试代码
test_code = generate_issue_specific_test_code(issue)
return f'''{includes_text}
{project_info}
// 基于原项目真实代码的{issue.id}问题验证测试用例
// 问题ID: {issue.id}
// 原始消息: {issue.message}
// 目标: 验证原项目中{issue.id}问题
// 基于文件: {real_file_path}:{real_line_number}
{test_code}
int main() {{
printf("=== 验证原项目中的{issue.id}问题 ===\\n");
printf("问题ID: {issue.id}\\n");
printf("基于文件: {real_file_path}:{real_line_number}\\n");
// 调用测试函数
test_{issue.id}();
printf("SUCCESS: Program completed - {issue.id} issue verified\\n");
return 0;
}}
// 编译命令: g++ -o test_{issue.id} test_{issue.id}.cpp
// 运行命令: ./test_{issue.id}
// 预期输出: 基于原项目真实代码验证{issue.id}问题
// 判定规则: 如果程序行为符合预期,则验证了原项目中{issue.id}告警的真实性'''
def generate_default_template(issue: CppcheckIssue, project_info: str, project_root: Optional[Path] = None) -> str:
"""生成默认的测试用例模板"""
return f'''#include <iostream>
#include <cstdlib>
#include <cstdio>
{project_info}
// 基于原项目的{issue.id}问题验证测试用例
// 问题ID: {issue.id}
// 原始消息: {issue.message}
// 目标: 验证原项目中{issue.id}问题
int main() {{
printf("=== 验证原项目中的{issue.id}问题 ===\\n");
printf("问题ID: {issue.id}\\n");
// 关键测试:模拟原项目中可能的{issue.id}场景
printf("Testing {issue.id} in original project context...\\n");
// 在这里添加能触发{issue.id}检测的代码
// 原始问题: {issue.message}
printf("SUCCESS: Program completed - {issue.id} issue verified in original project context\\n");
return 0;
}}
// 编译命令: g++ -o test_{issue.id} test_{issue.id}.cpp
// 运行命令: ./test_{issue.id}
// 预期输出: 基于原项目验证{issue.id}问题
// 判定规则: 如果程序行为符合预期,则验证了原项目中{issue.id}告警的真实性'''
def get_issue_specific_guidance(issue: CppcheckIssue) -> str:
"""根据cppcheck问题类型提供特定的测试指导"""
issue_id = issue.id.lower()
guidance_map = {
'unknownmacro': (
"【unknownMacro专用指导】\n"
"- 必须创建一个能明确触发cppcheck unknownMacro检测的测试用例\n"
"- 在printf格式字符串中直接使用未定义的宏printf(\"Value: %\" UNDEFINED_MACRO \"\\n\", value)\n"
"- 不要使用#ifdef条件编译要直接使用未定义的宏\n"
"- 确保宏名称与原始问题中的宏名称完全一致\n"
"- 测试用例应该能够独立编译和运行,不依赖外部库\n"
"- 在代码中明确说明这是为了验证unknownMacro检测\n"
),
'nullpointer': (
"【nullPointer专用指导】\n"
"- 创建能触发空指针解引用的测试用例\n"
"- 使用真实的函数调用和数据结构\n"
"- 在代码中加入空指针检查,确保能检测到问题\n"
),
'uninitvar': (
"【uninitVar专用指导】\n"
"- 创建使用未初始化变量的测试用例\n"
"- 确保变量在使用前没有被初始化\n"
"- 在代码中明确显示变量的使用\n"
),
'memleak': (
"【memLeak专用指导】\n"
"- 创建内存泄漏的测试用例\n"
"- 分配内存但不释放\n"
"- 使用真实的分配函数malloc, new等\n"
),
'arrayindexoutofbounds': (
"【arrayIndexOutOfBounds专用指导】\n"
"- 创建数组越界访问的测试用例\n"
"- 使用真实的数组和索引\n"
"- 确保索引超出数组边界\n"
)
}
# 查找匹配的指导
for key, guidance in guidance_map.items():
if key in issue_id:
return guidance
return "【通用指导】\n- 创建能明确触发cppcheck检测的测试用例\n- 使用真实的代码结构和函数调用\n- 确保测试用例能够独立运行\n"
def build_prompt_for_issue(issue: CppcheckIssue, project_root: Optional[Path] = None, include_dirs: List[str] = None, integration_test: bool = False, code_context: Optional[CodeContext] = None, relevance_analysis: Optional[dict] = None, use_template: bool = False) -> str:
primary = issue.locations[0] if issue.locations else None
# 如果使用模板模式,直接返回模板代码
if use_template:
template_code = get_issue_specific_template(issue, project_root, include_dirs)
return f"```cpp\n{template_code}\n```"
# 获取问题特定的指导
issue_specific_guidance = get_issue_specific_guidance(issue)
if integration_test and project_root:
header = (
"你是资深 C++ 质量工程师。目标:为每条 cppcheck 告警生成集成测试用例,"
"用于在真实项目环境中验证告警真实性。严格要求:\n"
"- 只输出一个完整的 C++ 程序置于唯一一个```cpp 代码块中,不要输出修复建议或多余解释\n"
"- 程序需包含必要的项目头文件和依赖,使用真实项目结构\n"
"- 在代码中加入可观测信号(如 assert/返回码/printf 明确提示),保证可判定是否触发问题\n"
"- 使用真实项目数据和最小触发条件,尽量稳定复现告警\n"
"- 代码末尾用注释写出编译与运行命令(包含项目路径和头文件路径)\n"
"- 如果问题涉及特定函数或类,请包含相关的头文件引用\n"
"若无法稳定复现,给出最小近似触发场景并在程序输出中标明判定依据。\n\n"
f"{issue_specific_guidance}"
)
else:
header = (
"你是资深 C++ 质量工程师。目标:为每条 cppcheck 告警生成'可编译、可运行、可观测'的测试用例,"
"用于验证告警真实性。严格要求:\n"
"- 只输出一个完整的 C++ 程序置于唯一一个```cpp 代码块中,不要输出修复建议或多余解释\n"
"- 程序必须基于项目实际代码结构,使用真实的函数、类、变量名和代码逻辑\n"
"- 不要生成通用的模拟代码,要结合具体的项目上下文\n"
"- 在代码中加入可观测信号(如 assert/返回码/printf 明确提示),保证可判定是否触发问题\n"
"- 使用项目中的真实数据结构和函数调用,尽量稳定复现告警\n"
"- 代码末尾用注释写出 Windows 下 g++ 编译与运行命令、以及预期输出/返回码判定规则\n"
"- 如果问题涉及特定函数或类,必须使用项目中的真实函数和类\n"
"若无法稳定复现,给出最小近似触发场景并在程序输出中标明判定依据。\n\n"
f"{issue_specific_guidance}"
)
body = [f"问题ID: {issue.id}", f"严重级别: {issue.severity}", f"cppcheck信息: {issue.message}"]
if primary:
body.append(f"相关文件: {primary.file_path}")
body.append(f"相关行号: {primary.line if primary.line is not None else '未知'}")
# 添加代码上下文信息
if code_context:
body.append(f"代码上下文分析:")
if code_context.function_name:
body.append(f" - 所在函数: {code_context.function_name}")
if code_context.class_name:
body.append(f" - 所在类: {code_context.class_name}")
if code_context.namespace:
body.append(f" - 命名空间: {code_context.namespace}")
if code_context.variable_context:
body.append(f" - 相关变量: {', '.join(code_context.variable_context[:5])}") # 最多显示5个变量
if code_context.control_flow_context:
body.append(f" - 控制流: {len(code_context.control_flow_context)} 个控制结构")
if code_context.includes:
body.append(f" - 包含文件: {', '.join(code_context.includes[:3])}") # 最多显示3个包含文件
# 添加项目特定的指导
body.append(f"项目特定要求:")
body.append(f" - 必须使用项目中的真实函数名、类名、变量名")
body.append(f" - 必须基于实际的代码逻辑和数据结构")
body.append(f" - 不要创建通用的模拟代码,要结合具体项目")
if code_context.function_name:
body.append(f" - 重点测试函数: {code_context.function_name}")
if code_context.class_name:
body.append(f" - 重点测试类: {code_context.class_name}")
# 添加相关性分析信息
if relevance_analysis:
body.append(f"相关性分析:")
body.append(f" - 相关性分数: {relevance_analysis['relevance_score']}")
body.append(f" - 置信度: {relevance_analysis['confidence']}%")
body.append(f" - 可能真实存在: {'' if relevance_analysis['is_likely_real'] else ''}")
if relevance_analysis['analysis_details']:
body.append(f" - 分析详情: {'; '.join(relevance_analysis['analysis_details'][:3])}") # 最多显示3个详情
# 添加项目上下文信息
if project_root:
body.append(f"项目根目录: {project_root}")
if include_dirs:
body.append(f"头文件目录: {', '.join(include_dirs)}")
body.append("注意:这是一个集成测试,需要包含项目头文件和依赖")
# 生成更详细的代码片段,包含更多上下文
snippets = []
for loc in issue.locations[:3]: # 取前3个位置做上下文
# 增加上下文范围,提供更多代码信息
code_snippet = read_code_snippet(loc.file_path, loc.line, context=50)
# 添加行号标记
lines = code_snippet.split('\n')
marked_lines = []
for i, line in enumerate(lines):
line_num = (loc.line - 25 + i) if loc.line else (i + 1)
if line_num == loc.line:
marked_lines.append(f"{line_num:4d} -> {line}") # 标记问题行
else:
marked_lines.append(f"{line_num:4d} {line}")
marked_snippet = '\n'.join(marked_lines)
snippets.append(f"文件: {loc.file_path}\n```cpp\n{marked_snippet}\n```")
# 添加项目上下文指导
if project_root:
body.append(f"项目上下文:")
body.append(f" - 项目根目录: {project_root}")
body.append(f" - 这是一个真实的项目,请使用项目中的实际代码结构")
body.append(f" - 测试用例应该能够复现项目中的实际问题")
body.append(f" - 不要生成通用的模拟代码,要基于项目实际代码")
body_text = "\n".join(body)
snippets_text = "\n\n".join(snippets)
return f"{header}\n\n{body_text}\n\n源码片段:\n{snippets_text}"
def generate_test_for_issue(issue: CppcheckIssue, model: str, project_root: Optional[Path] = None, include_dirs: List[str] = None, integration_test: bool = False, code_context: Optional[CodeContext] = None, relevance_analysis: Optional[dict] = None) -> str:
if client is None:
raise SystemExit("未找到可用的 client请先确保 Desktop/test.py 可运行或在此脚本内自行创建 client。")
messages = [
{"role": "system", "content": "你是严格的 C++ 质量工程师,请用中文、结构化输出。"},
{"role": "user", "content": build_prompt_for_issue(issue, project_root, include_dirs, integration_test, code_context, relevance_analysis)},
]
resp = client.chat.completions.create(
model=model,
messages=messages,
stream=False,
temperature=0.2,
)
return resp.choices[0].message.content if resp.choices else ""
def prioritize_issues(issues: List[CppcheckIssue]) -> List[CppcheckIssue]:
"""对问题进行优先级排序,提高智能选择的效果"""
def get_priority(issue: CppcheckIssue) -> tuple:
# 严重级别优先级error > warning > information > note
severity_priority = {"error": 0, "warning": 1, "information": 2, "note": 3}
severity_score = severity_priority.get(issue.severity.lower(), 4)
# 规则ID优先级常见重要问题优先
important_rules = {
"nullPointer", "uninitvar", "arrayIndexOutOfBounds", "memleak",
"resourceLeak", "useAfterFree", "doubleFree", "bufferAccessOutOfBounds",
"unusedVariable", "unusedFunction", "deadcode", "unreachableCode"
}
rule_score = 0 if issue.id in important_rules else 1
# 文件多样性:优先选择不同文件的问题
file_name = str(issue.locations[0].file_path) if issue.locations else ""
file_score = hash(file_name) % 1000 # 简单的文件哈希,用于分散
return (severity_score, rule_score, file_score)
return sorted(issues, key=get_priority)
def analyze_issues_with_context(issues: List[CppcheckIssue]) -> List[Tuple[CppcheckIssue, dict]]:
"""分析所有问题的上下文相关性"""
print("正在分析问题上下文相关性...")
analyzed_issues = []
for i, issue in enumerate(issues):
print(f"分析问题 {i+1}/{len(issues)}: {issue.id}")
primary = issue.locations[0] if issue.locations else None
if not primary:
continue
# 分析代码上下文
code_context = analyze_code_context(primary.file_path, primary.line)
# 分析问题相关性
relevance_analysis = analyze_issue_relevance(issue, code_context)
analyzed_issues.append((issue, {
"code_context": code_context,
"relevance_analysis": relevance_analysis,
"original_index": i
}))
return analyzed_issues
def smart_select_issues(issues: List[CppcheckIssue], max_count: int, model: str) -> List[CppcheckIssue]:
"""使用AI智能选择最有代表性的测试用例基于代码上下文分析"""
if client is None:
raise SystemExit("未找到可用的 client请先确保 Desktop/test.py 可运行或在此脚本内自行创建 client。")
if len(issues) <= max_count:
return issues
# 分析所有问题的上下文相关性
analyzed_issues = analyze_issues_with_context(issues)
# 过滤出可能真实存在的问题
real_issues = []
for issue, analysis in analyzed_issues:
if analysis["relevance_analysis"]["is_likely_real"]:
real_issues.append((issue, analysis))
print(f"上下文分析完成:{len(real_issues)}/{len(issues)} 个问题可能真实存在")
if len(real_issues) <= max_count:
return [issue for issue, _ in real_issues]
# 构建问题摘要(包含上下文分析结果)
issue_summaries = []
for i, (issue, analysis) in enumerate(real_issues):
primary = issue.locations[0] if issue.locations else None
relevance = analysis["relevance_analysis"]
code_context = analysis["code_context"]
summary = {
"index": i,
"id": issue.id,
"severity": issue.severity,
"message": issue.message,
"file": str(primary.file_path) if primary else "unknown",
"line": primary.line if primary else None,
"relevance_score": relevance["relevance_score"],
"confidence": relevance["confidence"],
"function": code_context.function_name,
"class": code_context.class_name,
"variables": len(code_context.variable_context),
"analysis_details": relevance["analysis_details"]
}
issue_summaries.append(summary)
# 按相关性分数排序
issue_summaries.sort(key=lambda x: x["relevance_score"], reverse=True)
# 构建AI提示
system_prompt = (
"你是C++代码质量专家。任务:从经过上下文分析的问题中选择最有代表性的测试用例。"
"选择原则:\n"
"1. 优先选择相关性分数高的问题(已按分数排序)\n"
"2. 优先选择不同严重级别的问题error > warning > information\n"
"3. 优先选择不同规则ID的问题避免重复\n"
"4. 优先选择不同文件的问题,提高覆盖面\n"
"5. 优先选择有明确函数/类上下文的问题\n"
"6. 优先选择容易复现和验证的问题\n\n"
"请只返回选中的问题索引列表,用逗号分隔,不要其他解释。"
)
user_prompt = (
f"需要从 {len(real_issues)} 个可能真实存在的问题中选择最多 {max_count} 个最有代表性的测试用例。\n\n"
f"问题列表(已按相关性分数排序):\n"
)
for summary in issue_summaries:
context_info = []
if summary["function"]:
context_info.append(f"函数:{summary['function']}")
if summary["class"]:
context_info.append(f"类:{summary['class']}")
if summary["variables"] > 0:
context_info.append(f"变量:{summary['variables']}")
context_str = f" ({', '.join(context_info)})" if context_info else ""
user_prompt += (
f"索引{summary['index']}: [{summary['severity']}] {summary['id']} "
f"(分数:{summary['relevance_score']}, 置信度:{summary['confidence']}%) "
f"- {summary['message'][:80]}... "
f"(文件: {summary['file']}, 行: {summary['line']}){context_str}\n"
)
user_prompt += f"\n请选择最有代表性的 {max_count} 个问题,返回索引列表:"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
resp = client.chat.completions.create(
model=model,
messages=messages,
stream=False,
temperature=0.1, # 低温度确保一致性
)
content = resp.choices[0].message.content if resp.choices else ""
# 解析返回的索引
selected_indices = []
try:
# 提取数字
import re
numbers = re.findall(r'\d+', content)
for num_str in numbers:
idx = int(num_str)
if 0 <= idx < len(real_issues):
selected_indices.append(idx)
# 去重并保持顺序
selected_indices = list(dict.fromkeys(selected_indices))
# 限制数量
if len(selected_indices) > max_count:
selected_indices = selected_indices[:max_count]
except Exception as e:
print(f"解析AI选择结果失败: {e}")
print(f"AI返回内容: {content}")
# 回退到简单选择:按相关性分数排序
selected_indices = list(range(min(max_count, len(real_issues))))
# 返回选中的问题
selected_issues = [real_issues[i][0] for i in selected_indices if i < len(real_issues)]
print(f"AI智能选择{len(issues)} 个问题中筛选出 {len(real_issues)} 个可能真实的问题,最终选择了 {len(selected_issues)} 个最有代表性的测试用例")
return selected_issues
def verify_single_test(cpp_file: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict:
"""验证单个测试用例"""
import subprocess
import time
import signal
import os
result = {
"file": cpp_file.name,
"compiled": False,
"executed": False,
"exit_code": None,
"output": "",
"error": "",
"duration": 0,
"timeout": False,
"vulnerability_confirmed": False,
"vulnerability_type": "unknown"
}
exe_file = cpp_file.with_suffix(".exe")
try:
# 编译
start_time = time.time()
compile_cmd = [
"g++", "-std=c++17", "-O0", "-g", "-Wall", "-Wextra", "-pedantic"
]
# 添加项目相关的编译选项
if project_root:
compile_cmd.extend(["-I", str(project_root)])
if include_dirs:
for include_dir in include_dirs:
compile_cmd.extend(["-I", include_dir])
compile_cmd.extend(["-o", str(exe_file), str(cpp_file)])
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=timeout
)
result["compiled"] = (compile_result.returncode == 0)
result["duration"] = time.time() - start_time
if not result["compiled"]:
result["error"] = compile_result.stderr
return result
# 执行
if exe_file.exists():
start_time = time.time()
try:
execute_result = subprocess.run(
[str(exe_file)],
capture_output=True,
text=True,
timeout=timeout
)
result["executed"] = True
result["exit_code"] = execute_result.returncode
result["output"] = execute_result.stdout
result["error"] = execute_result.stderr
result["duration"] = time.time() - start_time
# 分析漏洞类型
result["vulnerability_type"] = analyze_vulnerability_type(cpp_file.name, result)
result["vulnerability_confirmed"] = determine_vulnerability_confirmed(result)
except subprocess.TimeoutExpired:
result["timeout"] = True
result["error"] = f"执行超时({timeout}秒)"
except Exception as e:
result["error"] = f"执行异常: {str(e)}"
except subprocess.TimeoutExpired:
result["timeout"] = True
result["error"] = f"编译超时({timeout}秒)"
except Exception as e:
result["error"] = f"编译异常: {str(e)}"
finally:
# 清理
if exe_file.exists():
exe_file.unlink()
return result
def analyze_vulnerability_type(filename: str, result: dict) -> str:
"""分析漏洞类型"""
filename_lower = filename.lower()
if "uninitvar" in filename_lower:
return "未初始化变量"
elif "memleak" in filename_lower:
return "内存泄漏"
elif "nullpointer" in filename_lower:
return "空指针解引用"
elif "arrayindex" in filename_lower:
return "数组越界"
elif "buffer" in filename_lower:
return "缓冲区溢出"
elif "useafterfree" in filename_lower:
return "释放后使用"
elif "doublefree" in filename_lower:
return "重复释放"
else:
return "未知类型"
def determine_vulnerability_confirmed(result: dict) -> bool:
"""判断漏洞是否被确认"""
if not result["compiled"] or not result["executed"]:
return False
# 根据漏洞类型和程序行为判断
vuln_type = result["vulnerability_type"]
exit_code = result["exit_code"]
output = result["output"]
error = result["error"]
if "未初始化变量" in vuln_type:
# 未初始化变量:程序应该能正常运行,但输出随机值
return exit_code == 0 and "buffer" in output.lower()
elif "内存泄漏" in vuln_type:
# 内存泄漏:程序应该能正常运行
return exit_code == 0
elif "空指针解引用" in vuln_type:
# 空指针解引用:程序应该崩溃
return exit_code != 0 or "segmentation fault" in error.lower()
elif "数组越界" in vuln_type:
# 数组越界:程序可能崩溃
return exit_code != 0 or "segmentation fault" in error.lower()
elif "缓冲区溢出" in vuln_type:
# 缓冲区溢出:程序可能崩溃
return exit_code != 0 or "stack smashing" in error.lower()
else:
# 默认:程序能正常运行就认为漏洞存在
return exit_code == 0
def verify_test_case(test_file_path: Path, issue: CppcheckIssue) -> dict:
"""验证生成的测试用例是否能有效触发cppcheck检测"""
verification_result = {
'compiles': False,
'runs': False,
'triggers_cppcheck': False,
'cppcheck_warnings': [],
'compilation_errors': [],
'runtime_errors': []
}
try:
# 1. 尝试编译测试用例
import subprocess
import tempfile
import os
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
temp_cpp = Path(temp_dir) / "test.cpp"
temp_exe = Path(temp_dir) / "test"
# 复制测试文件到临时目录
with open(test_file_path, 'r', encoding='utf-8') as f:
test_content = f.read()
with open(temp_cpp, 'w', encoding='utf-8') as f:
f.write(test_content)
# 尝试编译
try:
result = subprocess.run(
['g++', '-std=c++17', '-o', str(temp_exe), str(temp_cpp)],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
verification_result['compiles'] = True
# 2. 尝试运行
try:
run_result = subprocess.run(
[str(temp_exe)],
capture_output=True, text=True, timeout=10
)
if run_result.returncode == 0:
verification_result['runs'] = True
else:
verification_result['runtime_errors'].append(run_result.stderr)
except subprocess.TimeoutExpired:
verification_result['runtime_errors'].append("Runtime timeout")
except Exception as e:
verification_result['runtime_errors'].append(str(e))
else:
verification_result['compilation_errors'].append(result.stderr)
except subprocess.TimeoutExpired:
verification_result['compilation_errors'].append("Compilation timeout")
except Exception as e:
verification_result['compilation_errors'].append(str(e))
# 3. 使用cppcheck检查
try:
cppcheck_result = subprocess.run(
['cppcheck', '--enable=all', '--std=c++17', str(temp_cpp)],
capture_output=True, text=True, timeout=30
)
if cppcheck_result.returncode != 0 or cppcheck_result.stderr:
# 解析cppcheck输出
output = cppcheck_result.stderr
if issue.id.lower() in output.lower():
verification_result['triggers_cppcheck'] = True
# 提取警告信息
lines = output.split('\n')
for line in lines:
if 'warning:' in line or 'error:' in line:
verification_result['cppcheck_warnings'].append(line.strip())
except subprocess.TimeoutExpired:
verification_result['cppcheck_warnings'].append("cppcheck timeout")
except Exception as e:
verification_result['cppcheck_warnings'].append(f"cppcheck error: {str(e)}")
except Exception as e:
verification_result['compilation_errors'].append(f"Verification error: {str(e)}")
return verification_result
def write_issue_output(output_dir: Path, idx: int, issue: CppcheckIssue, content: str, emit_runner: bool = False, verify: bool = False) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
# 提取 ```cpp ... ``` 代码块(仅取第一个)
cpp_code: Optional[str] = None
lines = content.splitlines()
inside = False
fence = None
buf: List[str] = []
for line in lines:
if not inside:
if line.strip().startswith("```cpp") or line.strip().startswith("```c++"):
inside = True
fence = line[:3]
buf = []
else:
if line.strip().startswith("```"):
inside = False
cpp_code = "\n".join(buf).strip()
break
else:
buf.append(line)
# 写 Markdown 说明
md_path = output_dir / f"issue_{idx:03d}_{issue.id}.md"
md_path.write_text(content, encoding="utf-8")
# 若提取到 C++ 代码,则写出 .cpp 文件,并可选生成 PowerShell 一键运行脚本
if cpp_code:
base = f"issue_{idx:03d}_{issue.id}"
cpp_path = output_dir / f"{base}.cpp"
cpp_path.write_text(cpp_code, encoding="utf-8")
# 验证测试用例(如果启用)
if verify:
print(f" 正在验证测试用例...")
verification_result = verify_test_case(cpp_path, issue)
# 输出验证结果
if verification_result['compiles']:
print(f" ✓ 编译成功")
else:
print(f" ✗ 编译失败: {verification_result['compilation_errors']}")
if verification_result['runs']:
print(f" ✓ 运行成功")
else:
print(f" ✗ 运行失败: {verification_result['runtime_errors']}")
if verification_result['triggers_cppcheck']:
print(f" ✓ 成功触发cppcheck检测")
else:
print(f" ✗ 未触发cppcheck检测")
if verification_result['cppcheck_warnings']:
print(f" cppcheck输出: {verification_result['cppcheck_warnings']}")
# 保存验证结果到文件
verification_file = output_dir / f"verification_{idx:03d}_{issue.id}.json"
import json
with open(verification_file, 'w', encoding='utf-8') as f:
json.dump(verification_result, f, ensure_ascii=False, indent=2)
if emit_runner:
ps1 = output_dir / f"run_{base}.ps1"
exe = output_dir / f"{base}.exe"
cmd = (
f"g++ -std=c++17 -O0 -g -Wall -Wextra -pedantic -o \"{exe.name}\" \"{cpp_path.name}\"\n"
f"if ($LASTEXITCODE -ne 0) {{ Write-Host '编译失败' -ForegroundColor Red; exit 1 }}\n"
f"./{exe.name}\n"
)
ps1.write_text(cmd, encoding="utf-8")
return md_path
def auto_verify_tests(output_dir: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict:
"""自动验证所有测试用例"""
print("开始自动验证测试用例...")
cpp_files = list(output_dir.glob("*.cpp"))
if not cpp_files:
print("未找到测试用例文件")
return {"total": 0, "results": [], "summary": {}}
results = []
for i, cpp_file in enumerate(cpp_files, 1):
print(f"验证 [{i}/{len(cpp_files)}]: {cpp_file.name}")
result = verify_single_test(cpp_file, timeout, project_root, include_dirs)
results.append(result)
# 显示验证结果
if result["vulnerability_confirmed"]:
print(f" ✓ 漏洞确认: {result['vulnerability_type']}")
elif result["compiled"] and result["executed"]:
print(f" - 程序正常: {result['vulnerability_type']} (可能误报)")
else:
print(f" ✗ 验证失败: {result['error']}")
# 生成汇总统计
summary = {
"total": len(results),
"compiled": sum(1 for r in results if r["compiled"]),
"executed": sum(1 for r in results if r["executed"]),
"vulnerabilities_confirmed": sum(1 for r in results if r["vulnerability_confirmed"]),
"timeouts": sum(1 for r in results if r["timeout"]),
"errors": sum(1 for r in results if not r["compiled"] or not r["executed"])
}
return {"total": len(results), "results": results, "summary": summary}
def generate_verification_report(output_dir: Path, verification_results: dict) -> Path:
"""生成验证结果报告"""
report_path = output_dir / "vulnerability_verification_report.md"
results = verification_results["results"]
summary = verification_results["summary"]
# 按漏洞类型分组
vuln_groups = {}
for result in results:
vuln_type = result["vulnerability_type"]
if vuln_type not in vuln_groups:
vuln_groups[vuln_type] = []
vuln_groups[vuln_type].append(result)
# 生成报告内容
report_content = f"""# 漏洞验证结果报告
## 验证汇总
- **总测试用例**: {summary['total']}
- **编译成功**: {summary['compiled']}
- **执行成功**: {summary['executed']}
- **漏洞确认**: {summary['vulnerabilities_confirmed']}
- **验证超时**: {summary['timeouts']}
- **验证错误**: {summary['errors']}
## 漏洞确认列表
"""
# 按漏洞类型生成详细报告
for vuln_type, vuln_results in vuln_groups.items():
confirmed_count = sum(1 for r in vuln_results if r["vulnerability_confirmed"])
total_count = len(vuln_results)
report_content += f"### {vuln_type} ({confirmed_count}/{total_count} 确认)\n\n"
for result in vuln_results:
status = "✓ 确认" if result["vulnerability_confirmed"] else "✗ 未确认"
report_content += f"- **{result['file']}**: {status}\n"
if result["vulnerability_confirmed"]:
report_content += f" - 返回码: {result['exit_code']}\n"
if result["output"]:
report_content += f" - 输出: {result['output'][:100]}...\n"
elif result["error"]:
report_content += f" - 错误: {result['error']}\n"
report_content += "\n"
# 添加修复建议
report_content += """## 修复建议
### 确认的漏洞
以下漏洞已被验证确认,建议优先修复:
"""
for vuln_type, vuln_results in vuln_groups.items():
confirmed_results = [r for r in vuln_results if r["vulnerability_confirmed"]]
if confirmed_results:
report_content += f"#### {vuln_type}\n"
for result in confirmed_results:
report_content += f"- {result['file']}: 需要修复\n"
report_content += "\n"
report_content += """### 未确认的问题
以下问题可能是误报或需要进一步分析:
"""
for vuln_type, vuln_results in vuln_groups.items():
unconfirmed_results = [r for r in vuln_results if not r["vulnerability_confirmed"]]
if unconfirmed_results:
report_content += f"#### {vuln_type}\n"
for result in unconfirmed_results:
report_content += f"- {result['file']}: 需要进一步分析\n"
report_content += "\n"
# 写入报告文件
report_path.write_text(report_content, encoding="utf-8")
return report_path
def generate_json_report(output_dir: Path, verification_results: dict) -> Path:
"""生成JSON格式的详细报告"""
import json
json_path = output_dir / "verification_results.json"
# 添加时间戳
verification_results["timestamp"] = str(Path().cwd())
verification_results["generated_at"] = str(Path().cwd())
# 写入JSON文件
json_path.write_text(json.dumps(verification_results, indent=2, ensure_ascii=False), encoding="utf-8")
return json_path
def filter_and_clean_issues(issues: List[CppcheckIssue], project_info: Optional[dict] = None) -> List[CppcheckIssue]:
"""过滤和清理问题,移除不可靠的问题"""
print("正在过滤和清理问题...")
cleaned_issues = []
filtered_count = 0
for issue in issues:
# 获取增强分析
code_context, relevance_analysis = get_enhanced_issue_analysis(issue, project_info)
# 基于分析结果决定是否保留问题
should_keep = False
# 1. 检查相关性分数
if relevance_analysis["relevance_score"] >= 5:
should_keep = True
# 2. 检查问题类型 - 排除明显误报
issue_id = issue.id.lower()
if issue_id in ["missinginclude", "missingincludesystem", "toomanyconfigs",
"normalchecklevelmaxbranches", "checklevelnormal", "unknown"]:
should_keep = False
# 3. 检查严重级别 - 优先保留error和warning
if issue.severity.lower() in ["error", "warning"]:
should_keep = True
elif issue.severity.lower() in ["information", "note"]:
# 对于information和note需要更高的相关性分数
if relevance_analysis["relevance_score"] >= 7:
should_keep = True
# 4. 检查是否有代码上下文
if code_context.function_name or code_context.class_name:
should_keep = True
if should_keep:
cleaned_issues.append(issue)
else:
filtered_count += 1
print(f" 过滤问题: {issue.id} - {issue.message[:50]}... (相关性分数: {relevance_analysis['relevance_score']})")
print(f"问题过滤完成: 保留 {len(cleaned_issues)} 个问题,过滤掉 {filtered_count} 个不可靠问题")
return cleaned_issues
def write_cleaned_report(issues: List[CppcheckIssue], output_path: Path) -> None:
"""将清理后的问题写入新的报告文件"""
print(f"正在生成清理后的报告: {output_path}")
with open(output_path, 'w', encoding='utf-8') as f:
for issue in issues:
for location in issue.locations:
f.write(f"{location.file_path}:{location.line}:0: {issue.severity}: {issue.message} [{issue.id}]\n")
print(f"清理后的报告已保存: {output_path}")
def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(description="根据 cppcheck XML 与源码生成可运行的 C++ 复现用例")
parser.add_argument("report", help="cppcheck 报告路径:支持 XML--xml或文本日志自动识别或 --text")
parser.add_argument("--out", default="cppcheck_tests", help="输出目录,默认 cppcheck_tests")
parser.add_argument("--model", default="deepseek-chat", help="模型名称,默认 deepseek-chat")
parser.add_argument("--emit-runner", action="store_true", help="为每个用例生成一键编译运行的 PowerShell 脚本")
parser.add_argument("--text", action="store_true", help="强制按文本日志格式解析")
parser.add_argument("--xml", action="store_true", help="强制按 XML 格式解析")
parser.add_argument("--max", type=int, default=10, help="最多处理前 N 条问题(默认 10设为 0 表示不限)")
parser.add_argument(
"--severities",
default="warning,error",
help="过滤等级,逗号分隔(如 warning,error,information,note默认 warning,error",
)
parser.add_argument(
"--include-ids",
default="",
help="仅包含这些 ruleId逗号分隔留空表示不限",
)
parser.add_argument(
"--exclude-ids",
default="missingInclude,missingIncludeSystem,toomanyconfigs,normalCheckLevelMaxBranches,checkLevelNormal,unknown",
help="排除这些 ruleId逗号分隔默认排除若干低价值项",
)
parser.add_argument(
"--smart-select",
action="store_true",
help="使用AI智能选择最有代表性的测试用例推荐用于大量问题",
)
parser.add_argument(
"--smart-max",
type=int,
default=10,
help="智能选择模式下的最大测试用例数量默认10",
)
parser.add_argument(
"--auto-verify",
action="store_true",
help="生成测试用例后自动运行验证并生成结果报告",
)
parser.add_argument(
"--verify-timeout",
type=int,
default=30,
help="验证超时时间默认30",
)
parser.add_argument(
"--verify-tests",
action="store_true",
help="生成测试用例时立即验证每个测试用例的有效性",
)
parser.add_argument(
"--use-templates",
action="store_true",
help="使用预定义的测试用例模板确保能有效触发cppcheck检测",
)
parser.add_argument(
"--project-root",
help="原始项目根目录路径(用于包含头文件和依赖)",
)
parser.add_argument(
"--include-dirs",
help="额外的头文件包含目录(逗号分隔)",
)
parser.add_argument(
"--integration-test",
action="store_true",
help="生成集成测试用例(需要原始项目)",
)
parser.add_argument(
"--enhanced-analysis",
action="store_true",
help="启用增强分析模式,基于代码上下文和项目结构进行智能筛选",
)
parser.add_argument(
"--clean-report",
action="store_true",
help="生成清理后的cppcheck报告文件过滤掉不可靠的问题",
)
parser.add_argument(
"--cleaned-report",
help="使用已清理的报告文件(跳过问题过滤步骤)",
)
args = parser.parse_args(argv)
# 处理报告文件路径
if args.cleaned_report:
# 使用已清理的报告文件
report_path = Path(args.cleaned_report).expanduser().resolve()
if not report_path.exists():
raise SystemExit(f"找不到已清理的报告文件: {report_path}")
print(f"使用已清理的报告文件: {report_path}")
else:
# 使用原始报告文件
report_path = Path(args.report).expanduser().resolve()
if not report_path.exists():
raise SystemExit(f"找不到报告文件: {report_path}")
# 解析报告文件
issues: List[CppcheckIssue] = []
if args.xml or (report_path.suffix.lower() in {".xml"} and not args.text):
issues = parse_cppcheck_xml(report_path)
else:
issues = parse_cppcheck_text(report_path)
print(f"原始报告包含 {len(issues)} 个问题")
# 基本过滤:按严重级别、包含/排除的 ruleId、去重
sev_set: Set[str] = {s.strip().lower() for s in (args.severities or "").split(",") if s.strip()}
include_ids: Set[str] = {s.strip() for s in (args.include_ids or "").split(",") if s.strip()}
exclude_ids: Set[str] = {s.strip() for s in (args.exclude_ids or "").split(",") if s.strip()}
filtered: List[CppcheckIssue] = []
seen: Set[tuple] = set()
for iss in issues:
if sev_set and iss.severity and iss.severity.lower() not in sev_set:
continue
if include_ids and iss.id not in include_ids:
continue
if exclude_ids and iss.id in exclude_ids:
continue
# 以 (id, first_file, first_line) 去重
key = (iss.id, str(iss.locations[0].file_path) if iss.locations else "", iss.locations[0].line if iss.locations else None)
if key in seen:
continue
seen.add(key)
filtered.append(iss)
print(f"基本过滤后剩余 {len(filtered)} 个问题")
if not filtered:
print("未在报告中发现问题项。")
return 0
# 处理项目上下文
project_root = None
include_dirs = []
project_info = None
if args.project_root:
project_root = Path(args.project_root).expanduser().resolve()
if not project_root.exists():
print(f"警告: 项目根目录不存在: {project_root}")
project_root = None
else:
print("正在分析项目结构...")
project_info = analyze_project_structure(project_root)
print(f"项目分析完成: 发现 {len(project_info['source_files'])} 个源文件, {len(project_info['header_files'])} 个头文件")
if args.include_dirs:
include_dirs = [d.strip() for d in args.include_dirs.split(",") if d.strip()]
valid_include_dirs = []
for include_dir in include_dirs:
include_path = Path(include_dir).expanduser().resolve()
if include_path.exists():
valid_include_dirs.append(str(include_path))
else:
print(f"警告: 头文件目录不存在: {include_path}")
include_dirs = valid_include_dirs
# 问题过滤和清理
if args.clean_report and not args.cleaned_report:
print("\n" + "="*50)
print("开始问题过滤和清理...")
print("="*50)
cleaned_issues = filter_and_clean_issues(filtered, project_info)
# 生成清理后的报告文件
cleaned_report_path = Path(args.out) / "cleaned_cppcheck_report.txt"
write_cleaned_report(cleaned_issues, cleaned_report_path)
print(f"\n清理完成!")
print(f"原始问题数量: {len(issues)}")
print(f"基本过滤后: {len(filtered)}")
print(f"智能清理后: {len(cleaned_issues)}")
print(f"清理后的报告已保存: {cleaned_report_path}")
# 使用清理后的问题继续处理
filtered = cleaned_issues
elif args.enhanced_analysis:
# 使用增强分析进行智能筛选
print("\n" + "="*50)
print("开始增强分析...")
print("="*50)
cleaned_issues = filter_and_clean_issues(filtered, project_info)
filtered = cleaned_issues
# 智能选择模式
if args.smart_select or args.enhanced_analysis:
if args.enhanced_analysis:
print(f"启用增强分析模式,从 {len(filtered)} 个问题中选择最多 {args.smart_max} 个最有代表性的测试用例...")
else:
print(f"启用AI智能选择模式{len(filtered)} 个问题中选择最多 {args.smart_max} 个最有代表性的测试用例...")
issues = smart_select_issues(filtered, args.smart_max, args.model)
else:
# 传统模式:简单限制数量
if args.max and args.max > 0:
issues = filtered[: args.max]
else:
issues = filtered
output_dir = Path(args.out).expanduser().resolve()
# 为每个问题生成增强的测试用例
for idx, issue in enumerate(issues, start=1):
print(f"生成测试用例 {idx}/{len(issues)}: {issue.id}")
# 获取增强的问题分析
code_context, relevance_analysis = get_enhanced_issue_analysis(issue, project_info)
# 显示分析结果
print(f" 相关性分数: {relevance_analysis['relevance_score']}, 置信度: {relevance_analysis['confidence']}%")
if code_context.function_name:
print(f" 所在函数: {code_context.function_name}")
if code_context.class_name:
print(f" 所在类: {code_context.class_name}")
# 使用AI生成模式这是核心功能
content = generate_test_for_issue(
issue,
model=args.model,
project_root=project_root,
include_dirs=include_dirs,
integration_test=args.integration_test,
code_context=code_context,
relevance_analysis=relevance_analysis
)
out_path = write_issue_output(output_dir, idx, issue, content, emit_runner=args.emit_runner, verify=args.verify_tests)
print(f" 已生成: {out_path}")
print(f"完成,共生成 {len(issues)} 条用例说明。")
# 自动验证
if args.auto_verify:
print("\n" + "="*50)
print("开始自动验证测试用例...")
print("="*50)
verification_results = auto_verify_tests(output_dir, args.verify_timeout, project_root, include_dirs)
# 生成报告
print("\n生成验证报告...")
md_report = generate_verification_report(output_dir, verification_results)
json_report = generate_json_report(output_dir, verification_results)
print(f"Markdown报告: {md_report}")
print(f"JSON报告: {json_report}")
# 显示汇总
summary = verification_results["summary"]
print(f"\n验证汇总:")
print(f" 总测试用例: {summary['total']}")
print(f" 编译成功: {summary['compiled']}")
print(f" 执行成功: {summary['executed']}")
print(f" 漏洞确认: {summary['vulnerabilities_confirmed']}")
print(f" 验证超时: {summary['timeouts']}")
print(f" 验证错误: {summary['errors']}")
# 显示确认的漏洞
confirmed_vulns = [r for r in verification_results["results"] if r["vulnerability_confirmed"]]
if confirmed_vulns:
print(f"\n确认的漏洞 ({len(confirmed_vulns)} 个):")
for result in confirmed_vulns:
print(f"{result['file']}: {result['vulnerability_type']}")
else:
print("\n未确认任何漏洞")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))