You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

380 lines
14 KiB

"""
验证和测试模块
"""
import subprocess
import time
import json
from pathlib import Path
from typing import List, Optional
from .models import CppcheckIssue
def verify_single_test(cpp_file: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict:
"""验证单个测试用例"""
result = {
"file": cpp_file.name,
"compiled": False,
"executed": False,
"exit_code": None,
"output": "",
"error": "",
"duration": 0,
"timeout": False,
"vulnerability_confirmed": False,
"vulnerability_type": "unknown"
}
exe_file = cpp_file.with_suffix(".exe")
try:
# 编译
start_time = time.time()
compile_cmd = [
"g++", "-std=c++17", "-O0", "-g", "-Wall", "-Wextra", "-pedantic"
]
# 添加项目相关的编译选项
if project_root:
compile_cmd.extend(["-I", str(project_root)])
if include_dirs:
for include_dir in include_dirs:
compile_cmd.extend(["-I", include_dir])
compile_cmd.extend(["-o", str(exe_file), str(cpp_file)])
compile_result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
timeout=timeout
)
result["compiled"] = (compile_result.returncode == 0)
result["duration"] = time.time() - start_time
if not result["compiled"]:
result["error"] = compile_result.stderr
return result
# 执行
if exe_file.exists():
start_time = time.time()
try:
execute_result = subprocess.run(
[str(exe_file)],
capture_output=True,
text=True,
timeout=timeout
)
result["executed"] = True
result["exit_code"] = execute_result.returncode
result["output"] = execute_result.stdout
result["error"] = execute_result.stderr
result["duration"] = time.time() - start_time
# 分析漏洞类型
result["vulnerability_type"] = analyze_vulnerability_type(cpp_file.name, result)
result["vulnerability_confirmed"] = determine_vulnerability_confirmed(result)
except subprocess.TimeoutExpired:
result["timeout"] = True
result["error"] = f"执行超时({timeout}秒)"
except Exception as e:
result["error"] = f"执行异常: {str(e)}"
except subprocess.TimeoutExpired:
result["timeout"] = True
result["error"] = f"编译超时({timeout}秒)"
except Exception as e:
result["error"] = f"编译异常: {str(e)}"
finally:
# 清理
if exe_file.exists():
exe_file.unlink()
return result
def analyze_vulnerability_type(filename: str, result: dict) -> str:
"""分析漏洞类型"""
filename_lower = filename.lower()
if "uninitvar" in filename_lower:
return "未初始化变量"
elif "memleak" in filename_lower:
return "内存泄漏"
elif "nullpointer" in filename_lower:
return "空指针解引用"
elif "arrayindex" in filename_lower:
return "数组越界"
elif "buffer" in filename_lower:
return "缓冲区溢出"
elif "useafterfree" in filename_lower:
return "释放后使用"
elif "doublefree" in filename_lower:
return "重复释放"
else:
return "未知类型"
def determine_vulnerability_confirmed(result: dict) -> bool:
"""判断漏洞是否被确认"""
if not result["compiled"] or not result["executed"]:
return False
# 根据漏洞类型和程序行为判断
vuln_type = result["vulnerability_type"]
exit_code = result["exit_code"]
output = result["output"]
error = result["error"]
if "未初始化变量" in vuln_type:
# 未初始化变量:程序应该能正常运行,但输出随机值
return exit_code == 0 and "buffer" in output.lower()
elif "内存泄漏" in vuln_type:
# 内存泄漏:程序应该能正常运行
return exit_code == 0
elif "空指针解引用" in vuln_type:
# 空指针解引用:程序应该崩溃
return exit_code != 0 or "segmentation fault" in error.lower()
elif "数组越界" in vuln_type:
# 数组越界:程序可能崩溃
return exit_code != 0 or "segmentation fault" in error.lower()
elif "缓冲区溢出" in vuln_type:
# 缓冲区溢出:程序可能崩溃
return exit_code != 0 or "stack smashing" in error.lower()
else:
# 默认:程序能正常运行就认为漏洞存在
return exit_code == 0
def verify_test_case(test_file_path: Path, issue: CppcheckIssue) -> dict:
"""验证生成的测试用例是否能有效触发cppcheck检测"""
verification_result = {
'compiles': False,
'runs': False,
'triggers_cppcheck': False,
'cppcheck_warnings': [],
'compilation_errors': [],
'runtime_errors': []
}
try:
# 1. 尝试编译测试用例
import tempfile
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
temp_cpp = Path(temp_dir) / "test.cpp"
temp_exe = Path(temp_dir) / "test"
# 复制测试文件到临时目录
with open(test_file_path, 'r', encoding='utf-8') as f:
test_content = f.read()
with open(temp_cpp, 'w', encoding='utf-8') as f:
f.write(test_content)
# 尝试编译
try:
result = subprocess.run(
['g++', '-std=c++17', '-o', str(temp_exe), str(temp_cpp)],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
verification_result['compiles'] = True
# 2. 尝试运行
try:
run_result = subprocess.run(
[str(temp_exe)],
capture_output=True, text=True, timeout=10
)
if run_result.returncode == 0:
verification_result['runs'] = True
else:
verification_result['runtime_errors'].append(run_result.stderr)
except subprocess.TimeoutExpired:
verification_result['runtime_errors'].append("Runtime timeout")
except Exception as e:
verification_result['runtime_errors'].append(str(e))
else:
verification_result['compilation_errors'].append(result.stderr)
except subprocess.TimeoutExpired:
verification_result['compilation_errors'].append("Compilation timeout")
except Exception as e:
verification_result['compilation_errors'].append(str(e))
# 3. 使用cppcheck检查
try:
cppcheck_result = subprocess.run(
['cppcheck', '--enable=all', '--std=c++17', str(temp_cpp)],
capture_output=True, text=True, timeout=30
)
if cppcheck_result.returncode != 0 or cppcheck_result.stderr:
# 解析cppcheck输出
output = cppcheck_result.stderr
if issue.id.lower() in output.lower():
verification_result['triggers_cppcheck'] = True
# 提取警告信息
lines = output.split('\n')
for line in lines:
if 'warning:' in line or 'error:' in line:
verification_result['cppcheck_warnings'].append(line.strip())
except subprocess.TimeoutExpired:
verification_result['cppcheck_warnings'].append("cppcheck timeout")
except Exception as e:
verification_result['cppcheck_warnings'].append(f"cppcheck error: {str(e)}")
except Exception as e:
verification_result['compilation_errors'].append(f"Verification error: {str(e)}")
return verification_result
def auto_verify_tests(output_dir: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict:
"""自动验证所有测试用例"""
print("开始自动验证测试用例...")
cpp_files = list(output_dir.glob("*.cpp"))
if not cpp_files:
print("未找到测试用例文件")
return {"total": 0, "results": [], "summary": {}}
results = []
for i, cpp_file in enumerate(cpp_files, 1):
print(f"验证 [{i}/{len(cpp_files)}]: {cpp_file.name}")
result = verify_single_test(cpp_file, timeout, project_root, include_dirs)
results.append(result)
# 显示验证结果
if result["vulnerability_confirmed"]:
print(f" ✓ 漏洞确认: {result['vulnerability_type']}")
elif result["compiled"] and result["executed"]:
print(f" - 程序正常: {result['vulnerability_type']} (可能误报)")
else:
print(f" ✗ 验证失败: {result['error']}")
# 生成汇总统计
summary = {
"total": len(results),
"compiled": sum(1 for r in results if r["compiled"]),
"executed": sum(1 for r in results if r["executed"]),
"vulnerabilities_confirmed": sum(1 for r in results if r["vulnerability_confirmed"]),
"timeouts": sum(1 for r in results if r["timeout"]),
"errors": sum(1 for r in results if not r["compiled"] or not r["executed"])
}
return {"total": len(results), "results": results, "summary": summary}
def generate_verification_report(output_dir: Path, verification_results: dict) -> Path:
"""生成验证结果报告"""
report_path = output_dir / "vulnerability_verification_report.md"
results = verification_results["results"]
summary = verification_results["summary"]
# 按漏洞类型分组
vuln_groups = {}
for result in results:
vuln_type = result["vulnerability_type"]
if vuln_type not in vuln_groups:
vuln_groups[vuln_type] = []
vuln_groups[vuln_type].append(result)
# 生成报告内容
report_content = f"""# 漏洞验证结果报告
## 验证汇总
- **总测试用例**: {summary['total']}
- **编译成功**: {summary['compiled']}
- **执行成功**: {summary['executed']}
- **漏洞确认**: {summary['vulnerabilities_confirmed']}
- **验证超时**: {summary['timeouts']}
- **验证错误**: {summary['errors']}
## 漏洞确认列表
"""
# 按漏洞类型生成详细报告
for vuln_type, vuln_results in vuln_groups.items():
confirmed_count = sum(1 for r in vuln_results if r["vulnerability_confirmed"])
total_count = len(vuln_results)
report_content += f"### {vuln_type} ({confirmed_count}/{total_count} 确认)\n\n"
for result in vuln_results:
status = "✓ 确认" if result["vulnerability_confirmed"] else "✗ 未确认"
report_content += f"- **{result['file']}**: {status}\n"
if result["vulnerability_confirmed"]:
report_content += f" - 返回码: {result['exit_code']}\n"
if result["output"]:
report_content += f" - 输出: {result['output'][:100]}...\n"
elif result["error"]:
report_content += f" - 错误: {result['error']}\n"
report_content += "\n"
# 添加修复建议
report_content += """## 修复建议
### 确认的漏洞
以下漏洞已被验证确认,建议优先修复:
"""
for vuln_type, vuln_results in vuln_groups.items():
confirmed_results = [r for r in vuln_results if r["vulnerability_confirmed"]]
if confirmed_results:
report_content += f"#### {vuln_type}\n"
for result in confirmed_results:
report_content += f"- {result['file']}: 需要修复\n"
report_content += "\n"
report_content += """### 未确认的问题
以下问题可能是误报或需要进一步分析:
"""
for vuln_type, vuln_results in vuln_groups.items():
unconfirmed_results = [r for r in vuln_results if not r["vulnerability_confirmed"]]
if unconfirmed_results:
report_content += f"#### {vuln_type}\n"
for result in unconfirmed_results:
report_content += f"- {result['file']}: 需要进一步分析\n"
report_content += "\n"
# 写入报告文件
report_path.write_text(report_content, encoding="utf-8")
return report_path
def generate_json_report(output_dir: Path, verification_results: dict) -> Path:
"""生成JSON格式的详细报告"""
json_path = output_dir / "verification_results.json"
# 添加时间戳
verification_results["timestamp"] = str(Path().cwd())
verification_results["generated_at"] = str(Path().cwd())
# 写入JSON文件
json_path.write_text(json.dumps(verification_results, indent=2, ensure_ascii=False), encoding="utf-8")
return json_path