forked from pfqgauxfb/code-analysis
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
380 lines
14 KiB
380 lines
14 KiB
"""
|
|
验证和测试模块
|
|
"""
|
|
import subprocess
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
from .models import CppcheckIssue
|
|
|
|
|
|
def verify_single_test(cpp_file: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict:
|
|
"""验证单个测试用例"""
|
|
result = {
|
|
"file": cpp_file.name,
|
|
"compiled": False,
|
|
"executed": False,
|
|
"exit_code": None,
|
|
"output": "",
|
|
"error": "",
|
|
"duration": 0,
|
|
"timeout": False,
|
|
"vulnerability_confirmed": False,
|
|
"vulnerability_type": "unknown"
|
|
}
|
|
|
|
exe_file = cpp_file.with_suffix(".exe")
|
|
|
|
try:
|
|
# 编译
|
|
start_time = time.time()
|
|
compile_cmd = [
|
|
"g++", "-std=c++17", "-O0", "-g", "-Wall", "-Wextra", "-pedantic"
|
|
]
|
|
|
|
# 添加项目相关的编译选项
|
|
if project_root:
|
|
compile_cmd.extend(["-I", str(project_root)])
|
|
|
|
if include_dirs:
|
|
for include_dir in include_dirs:
|
|
compile_cmd.extend(["-I", include_dir])
|
|
|
|
compile_cmd.extend(["-o", str(exe_file), str(cpp_file)])
|
|
|
|
compile_result = subprocess.run(
|
|
compile_cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
|
|
result["compiled"] = (compile_result.returncode == 0)
|
|
result["duration"] = time.time() - start_time
|
|
|
|
if not result["compiled"]:
|
|
result["error"] = compile_result.stderr
|
|
return result
|
|
|
|
# 执行
|
|
if exe_file.exists():
|
|
start_time = time.time()
|
|
try:
|
|
execute_result = subprocess.run(
|
|
[str(exe_file)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
|
|
result["executed"] = True
|
|
result["exit_code"] = execute_result.returncode
|
|
result["output"] = execute_result.stdout
|
|
result["error"] = execute_result.stderr
|
|
result["duration"] = time.time() - start_time
|
|
|
|
# 分析漏洞类型
|
|
result["vulnerability_type"] = analyze_vulnerability_type(cpp_file.name, result)
|
|
result["vulnerability_confirmed"] = determine_vulnerability_confirmed(result)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
result["timeout"] = True
|
|
result["error"] = f"执行超时({timeout}秒)"
|
|
except Exception as e:
|
|
result["error"] = f"执行异常: {str(e)}"
|
|
|
|
except subprocess.TimeoutExpired:
|
|
result["timeout"] = True
|
|
result["error"] = f"编译超时({timeout}秒)"
|
|
except Exception as e:
|
|
result["error"] = f"编译异常: {str(e)}"
|
|
finally:
|
|
# 清理
|
|
if exe_file.exists():
|
|
exe_file.unlink()
|
|
|
|
return result
|
|
|
|
|
|
def analyze_vulnerability_type(filename: str, result: dict) -> str:
|
|
"""分析漏洞类型"""
|
|
filename_lower = filename.lower()
|
|
|
|
if "uninitvar" in filename_lower:
|
|
return "未初始化变量"
|
|
elif "memleak" in filename_lower:
|
|
return "内存泄漏"
|
|
elif "nullpointer" in filename_lower:
|
|
return "空指针解引用"
|
|
elif "arrayindex" in filename_lower:
|
|
return "数组越界"
|
|
elif "buffer" in filename_lower:
|
|
return "缓冲区溢出"
|
|
elif "useafterfree" in filename_lower:
|
|
return "释放后使用"
|
|
elif "doublefree" in filename_lower:
|
|
return "重复释放"
|
|
else:
|
|
return "未知类型"
|
|
|
|
|
|
def determine_vulnerability_confirmed(result: dict) -> bool:
|
|
"""判断漏洞是否被确认"""
|
|
if not result["compiled"] or not result["executed"]:
|
|
return False
|
|
|
|
# 根据漏洞类型和程序行为判断
|
|
vuln_type = result["vulnerability_type"]
|
|
exit_code = result["exit_code"]
|
|
output = result["output"]
|
|
error = result["error"]
|
|
|
|
if "未初始化变量" in vuln_type:
|
|
# 未初始化变量:程序应该能正常运行,但输出随机值
|
|
return exit_code == 0 and "buffer" in output.lower()
|
|
|
|
elif "内存泄漏" in vuln_type:
|
|
# 内存泄漏:程序应该能正常运行
|
|
return exit_code == 0
|
|
|
|
elif "空指针解引用" in vuln_type:
|
|
# 空指针解引用:程序应该崩溃
|
|
return exit_code != 0 or "segmentation fault" in error.lower()
|
|
|
|
elif "数组越界" in vuln_type:
|
|
# 数组越界:程序可能崩溃
|
|
return exit_code != 0 or "segmentation fault" in error.lower()
|
|
|
|
elif "缓冲区溢出" in vuln_type:
|
|
# 缓冲区溢出:程序可能崩溃
|
|
return exit_code != 0 or "stack smashing" in error.lower()
|
|
|
|
else:
|
|
# 默认:程序能正常运行就认为漏洞存在
|
|
return exit_code == 0
|
|
|
|
|
|
def verify_test_case(test_file_path: Path, issue: CppcheckIssue) -> dict:
|
|
"""验证生成的测试用例是否能有效触发cppcheck检测"""
|
|
verification_result = {
|
|
'compiles': False,
|
|
'runs': False,
|
|
'triggers_cppcheck': False,
|
|
'cppcheck_warnings': [],
|
|
'compilation_errors': [],
|
|
'runtime_errors': []
|
|
}
|
|
|
|
try:
|
|
# 1. 尝试编译测试用例
|
|
import tempfile
|
|
|
|
# 创建临时目录
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_cpp = Path(temp_dir) / "test.cpp"
|
|
temp_exe = Path(temp_dir) / "test"
|
|
|
|
# 复制测试文件到临时目录
|
|
with open(test_file_path, 'r', encoding='utf-8') as f:
|
|
test_content = f.read()
|
|
|
|
with open(temp_cpp, 'w', encoding='utf-8') as f:
|
|
f.write(test_content)
|
|
|
|
# 尝试编译
|
|
try:
|
|
result = subprocess.run(
|
|
['g++', '-std=c++17', '-o', str(temp_exe), str(temp_cpp)],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
verification_result['compiles'] = True
|
|
|
|
# 2. 尝试运行
|
|
try:
|
|
run_result = subprocess.run(
|
|
[str(temp_exe)],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if run_result.returncode == 0:
|
|
verification_result['runs'] = True
|
|
else:
|
|
verification_result['runtime_errors'].append(run_result.stderr)
|
|
except subprocess.TimeoutExpired:
|
|
verification_result['runtime_errors'].append("Runtime timeout")
|
|
except Exception as e:
|
|
verification_result['runtime_errors'].append(str(e))
|
|
else:
|
|
verification_result['compilation_errors'].append(result.stderr)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
verification_result['compilation_errors'].append("Compilation timeout")
|
|
except Exception as e:
|
|
verification_result['compilation_errors'].append(str(e))
|
|
|
|
# 3. 使用cppcheck检查
|
|
try:
|
|
cppcheck_result = subprocess.run(
|
|
['cppcheck', '--enable=all', '--std=c++17', str(temp_cpp)],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
|
|
if cppcheck_result.returncode != 0 or cppcheck_result.stderr:
|
|
# 解析cppcheck输出
|
|
output = cppcheck_result.stderr
|
|
if issue.id.lower() in output.lower():
|
|
verification_result['triggers_cppcheck'] = True
|
|
|
|
# 提取警告信息
|
|
lines = output.split('\n')
|
|
for line in lines:
|
|
if 'warning:' in line or 'error:' in line:
|
|
verification_result['cppcheck_warnings'].append(line.strip())
|
|
|
|
except subprocess.TimeoutExpired:
|
|
verification_result['cppcheck_warnings'].append("cppcheck timeout")
|
|
except Exception as e:
|
|
verification_result['cppcheck_warnings'].append(f"cppcheck error: {str(e)}")
|
|
|
|
except Exception as e:
|
|
verification_result['compilation_errors'].append(f"Verification error: {str(e)}")
|
|
|
|
return verification_result
|
|
|
|
|
|
def auto_verify_tests(output_dir: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict:
|
|
"""自动验证所有测试用例"""
|
|
print("开始自动验证测试用例...")
|
|
|
|
cpp_files = list(output_dir.glob("*.cpp"))
|
|
if not cpp_files:
|
|
print("未找到测试用例文件")
|
|
return {"total": 0, "results": [], "summary": {}}
|
|
|
|
results = []
|
|
for i, cpp_file in enumerate(cpp_files, 1):
|
|
print(f"验证 [{i}/{len(cpp_files)}]: {cpp_file.name}")
|
|
result = verify_single_test(cpp_file, timeout, project_root, include_dirs)
|
|
results.append(result)
|
|
|
|
# 显示验证结果
|
|
if result["vulnerability_confirmed"]:
|
|
print(f" ✓ 漏洞确认: {result['vulnerability_type']}")
|
|
elif result["compiled"] and result["executed"]:
|
|
print(f" - 程序正常: {result['vulnerability_type']} (可能误报)")
|
|
else:
|
|
print(f" ✗ 验证失败: {result['error']}")
|
|
|
|
# 生成汇总统计
|
|
summary = {
|
|
"total": len(results),
|
|
"compiled": sum(1 for r in results if r["compiled"]),
|
|
"executed": sum(1 for r in results if r["executed"]),
|
|
"vulnerabilities_confirmed": sum(1 for r in results if r["vulnerability_confirmed"]),
|
|
"timeouts": sum(1 for r in results if r["timeout"]),
|
|
"errors": sum(1 for r in results if not r["compiled"] or not r["executed"])
|
|
}
|
|
|
|
return {"total": len(results), "results": results, "summary": summary}
|
|
|
|
|
|
def generate_verification_report(output_dir: Path, verification_results: dict) -> Path:
|
|
"""生成验证结果报告"""
|
|
report_path = output_dir / "vulnerability_verification_report.md"
|
|
|
|
results = verification_results["results"]
|
|
summary = verification_results["summary"]
|
|
|
|
# 按漏洞类型分组
|
|
vuln_groups = {}
|
|
for result in results:
|
|
vuln_type = result["vulnerability_type"]
|
|
if vuln_type not in vuln_groups:
|
|
vuln_groups[vuln_type] = []
|
|
vuln_groups[vuln_type].append(result)
|
|
|
|
# 生成报告内容
|
|
report_content = f"""# 漏洞验证结果报告
|
|
|
|
## 验证汇总
|
|
|
|
- **总测试用例**: {summary['total']}
|
|
- **编译成功**: {summary['compiled']}
|
|
- **执行成功**: {summary['executed']}
|
|
- **漏洞确认**: {summary['vulnerabilities_confirmed']}
|
|
- **验证超时**: {summary['timeouts']}
|
|
- **验证错误**: {summary['errors']}
|
|
|
|
## 漏洞确认列表
|
|
|
|
"""
|
|
|
|
# 按漏洞类型生成详细报告
|
|
for vuln_type, vuln_results in vuln_groups.items():
|
|
confirmed_count = sum(1 for r in vuln_results if r["vulnerability_confirmed"])
|
|
total_count = len(vuln_results)
|
|
|
|
report_content += f"### {vuln_type} ({confirmed_count}/{total_count} 确认)\n\n"
|
|
|
|
for result in vuln_results:
|
|
status = "✓ 确认" if result["vulnerability_confirmed"] else "✗ 未确认"
|
|
report_content += f"- **{result['file']}**: {status}\n"
|
|
|
|
if result["vulnerability_confirmed"]:
|
|
report_content += f" - 返回码: {result['exit_code']}\n"
|
|
if result["output"]:
|
|
report_content += f" - 输出: {result['output'][:100]}...\n"
|
|
elif result["error"]:
|
|
report_content += f" - 错误: {result['error']}\n"
|
|
|
|
report_content += "\n"
|
|
|
|
# 添加修复建议
|
|
report_content += """## 修复建议
|
|
|
|
### 确认的漏洞
|
|
以下漏洞已被验证确认,建议优先修复:
|
|
|
|
"""
|
|
|
|
for vuln_type, vuln_results in vuln_groups.items():
|
|
confirmed_results = [r for r in vuln_results if r["vulnerability_confirmed"]]
|
|
if confirmed_results:
|
|
report_content += f"#### {vuln_type}\n"
|
|
for result in confirmed_results:
|
|
report_content += f"- {result['file']}: 需要修复\n"
|
|
report_content += "\n"
|
|
|
|
report_content += """### 未确认的问题
|
|
以下问题可能是误报或需要进一步分析:
|
|
|
|
"""
|
|
|
|
for vuln_type, vuln_results in vuln_groups.items():
|
|
unconfirmed_results = [r for r in vuln_results if not r["vulnerability_confirmed"]]
|
|
if unconfirmed_results:
|
|
report_content += f"#### {vuln_type}\n"
|
|
for result in unconfirmed_results:
|
|
report_content += f"- {result['file']}: 需要进一步分析\n"
|
|
report_content += "\n"
|
|
|
|
# 写入报告文件
|
|
report_path.write_text(report_content, encoding="utf-8")
|
|
return report_path
|
|
|
|
|
|
def generate_json_report(output_dir: Path, verification_results: dict) -> Path:
|
|
"""生成JSON格式的详细报告"""
|
|
json_path = output_dir / "verification_results.json"
|
|
|
|
# 添加时间戳
|
|
verification_results["timestamp"] = str(Path().cwd())
|
|
verification_results["generated_at"] = str(Path().cwd())
|
|
|
|
# 写入JSON文件
|
|
json_path.write_text(json.dumps(verification_results, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
return json_path
|