""" 验证和测试模块 """ import subprocess import time import json from pathlib import Path from typing import List, Optional from .models import CppcheckIssue def verify_single_test(cpp_file: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict: """验证单个测试用例""" result = { "file": cpp_file.name, "compiled": False, "executed": False, "exit_code": None, "output": "", "error": "", "duration": 0, "timeout": False, "vulnerability_confirmed": False, "vulnerability_type": "unknown" } exe_file = cpp_file.with_suffix(".exe") try: # 编译 start_time = time.time() compile_cmd = [ "g++", "-std=c++17", "-O0", "-g", "-Wall", "-Wextra", "-pedantic" ] # 添加项目相关的编译选项 if project_root: compile_cmd.extend(["-I", str(project_root)]) if include_dirs: for include_dir in include_dirs: compile_cmd.extend(["-I", include_dir]) compile_cmd.extend(["-o", str(exe_file), str(cpp_file)]) compile_result = subprocess.run( compile_cmd, capture_output=True, text=True, timeout=timeout ) result["compiled"] = (compile_result.returncode == 0) result["duration"] = time.time() - start_time if not result["compiled"]: result["error"] = compile_result.stderr return result # 执行 if exe_file.exists(): start_time = time.time() try: execute_result = subprocess.run( [str(exe_file)], capture_output=True, text=True, timeout=timeout ) result["executed"] = True result["exit_code"] = execute_result.returncode result["output"] = execute_result.stdout result["error"] = execute_result.stderr result["duration"] = time.time() - start_time # 分析漏洞类型 result["vulnerability_type"] = analyze_vulnerability_type(cpp_file.name, result) result["vulnerability_confirmed"] = determine_vulnerability_confirmed(result) except subprocess.TimeoutExpired: result["timeout"] = True result["error"] = f"执行超时({timeout}秒)" except Exception as e: result["error"] = f"执行异常: {str(e)}" except subprocess.TimeoutExpired: result["timeout"] = True result["error"] = f"编译超时({timeout}秒)" except Exception as e: result["error"] = f"编译异常: {str(e)}" finally: # 清理 if exe_file.exists(): exe_file.unlink() return result def analyze_vulnerability_type(filename: str, result: dict) -> str: """分析漏洞类型""" filename_lower = filename.lower() if "uninitvar" in filename_lower: return "未初始化变量" elif "memleak" in filename_lower: return "内存泄漏" elif "nullpointer" in filename_lower: return "空指针解引用" elif "arrayindex" in filename_lower: return "数组越界" elif "buffer" in filename_lower: return "缓冲区溢出" elif "useafterfree" in filename_lower: return "释放后使用" elif "doublefree" in filename_lower: return "重复释放" else: return "未知类型" def determine_vulnerability_confirmed(result: dict) -> bool: """判断漏洞是否被确认""" if not result["compiled"] or not result["executed"]: return False # 根据漏洞类型和程序行为判断 vuln_type = result["vulnerability_type"] exit_code = result["exit_code"] output = result["output"] error = result["error"] if "未初始化变量" in vuln_type: # 未初始化变量:程序应该能正常运行,但输出随机值 return exit_code == 0 and "buffer" in output.lower() elif "内存泄漏" in vuln_type: # 内存泄漏:程序应该能正常运行 return exit_code == 0 elif "空指针解引用" in vuln_type: # 空指针解引用:程序应该崩溃 return exit_code != 0 or "segmentation fault" in error.lower() elif "数组越界" in vuln_type: # 数组越界:程序可能崩溃 return exit_code != 0 or "segmentation fault" in error.lower() elif "缓冲区溢出" in vuln_type: # 缓冲区溢出:程序可能崩溃 return exit_code != 0 or "stack smashing" in error.lower() else: # 默认:程序能正常运行就认为漏洞存在 return exit_code == 0 def verify_test_case(test_file_path: Path, issue: CppcheckIssue) -> dict: """验证生成的测试用例是否能有效触发cppcheck检测""" verification_result = { 'compiles': False, 'runs': False, 'triggers_cppcheck': False, 'cppcheck_warnings': [], 'compilation_errors': [], 'runtime_errors': [] } try: # 1. 尝试编译测试用例 import tempfile # 创建临时目录 with tempfile.TemporaryDirectory() as temp_dir: temp_cpp = Path(temp_dir) / "test.cpp" temp_exe = Path(temp_dir) / "test" # 复制测试文件到临时目录 with open(test_file_path, 'r', encoding='utf-8') as f: test_content = f.read() with open(temp_cpp, 'w', encoding='utf-8') as f: f.write(test_content) # 尝试编译 try: result = subprocess.run( ['g++', '-std=c++17', '-o', str(temp_exe), str(temp_cpp)], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: verification_result['compiles'] = True # 2. 尝试运行 try: run_result = subprocess.run( [str(temp_exe)], capture_output=True, text=True, timeout=10 ) if run_result.returncode == 0: verification_result['runs'] = True else: verification_result['runtime_errors'].append(run_result.stderr) except subprocess.TimeoutExpired: verification_result['runtime_errors'].append("Runtime timeout") except Exception as e: verification_result['runtime_errors'].append(str(e)) else: verification_result['compilation_errors'].append(result.stderr) except subprocess.TimeoutExpired: verification_result['compilation_errors'].append("Compilation timeout") except Exception as e: verification_result['compilation_errors'].append(str(e)) # 3. 使用cppcheck检查 try: cppcheck_result = subprocess.run( ['cppcheck', '--enable=all', '--std=c++17', str(temp_cpp)], capture_output=True, text=True, timeout=30 ) if cppcheck_result.returncode != 0 or cppcheck_result.stderr: # 解析cppcheck输出 output = cppcheck_result.stderr if issue.id.lower() in output.lower(): verification_result['triggers_cppcheck'] = True # 提取警告信息 lines = output.split('\n') for line in lines: if 'warning:' in line or 'error:' in line: verification_result['cppcheck_warnings'].append(line.strip()) except subprocess.TimeoutExpired: verification_result['cppcheck_warnings'].append("cppcheck timeout") except Exception as e: verification_result['cppcheck_warnings'].append(f"cppcheck error: {str(e)}") except Exception as e: verification_result['compilation_errors'].append(f"Verification error: {str(e)}") return verification_result def auto_verify_tests(output_dir: Path, timeout: int = 30, project_root: Optional[Path] = None, include_dirs: List[str] = None) -> dict: """自动验证所有测试用例""" print("开始自动验证测试用例...") cpp_files = list(output_dir.glob("*.cpp")) if not cpp_files: print("未找到测试用例文件") return {"total": 0, "results": [], "summary": {}} results = [] for i, cpp_file in enumerate(cpp_files, 1): print(f"验证 [{i}/{len(cpp_files)}]: {cpp_file.name}") result = verify_single_test(cpp_file, timeout, project_root, include_dirs) results.append(result) # 显示验证结果 if result["vulnerability_confirmed"]: print(f" ✓ 漏洞确认: {result['vulnerability_type']}") elif result["compiled"] and result["executed"]: print(f" - 程序正常: {result['vulnerability_type']} (可能误报)") else: print(f" ✗ 验证失败: {result['error']}") # 生成汇总统计 summary = { "total": len(results), "compiled": sum(1 for r in results if r["compiled"]), "executed": sum(1 for r in results if r["executed"]), "vulnerabilities_confirmed": sum(1 for r in results if r["vulnerability_confirmed"]), "timeouts": sum(1 for r in results if r["timeout"]), "errors": sum(1 for r in results if not r["compiled"] or not r["executed"]) } return {"total": len(results), "results": results, "summary": summary} def generate_verification_report(output_dir: Path, verification_results: dict) -> Path: """生成验证结果报告""" report_path = output_dir / "vulnerability_verification_report.md" results = verification_results["results"] summary = verification_results["summary"] # 按漏洞类型分组 vuln_groups = {} for result in results: vuln_type = result["vulnerability_type"] if vuln_type not in vuln_groups: vuln_groups[vuln_type] = [] vuln_groups[vuln_type].append(result) # 生成报告内容 report_content = f"""# 漏洞验证结果报告 ## 验证汇总 - **总测试用例**: {summary['total']} - **编译成功**: {summary['compiled']} - **执行成功**: {summary['executed']} - **漏洞确认**: {summary['vulnerabilities_confirmed']} - **验证超时**: {summary['timeouts']} - **验证错误**: {summary['errors']} ## 漏洞确认列表 """ # 按漏洞类型生成详细报告 for vuln_type, vuln_results in vuln_groups.items(): confirmed_count = sum(1 for r in vuln_results if r["vulnerability_confirmed"]) total_count = len(vuln_results) report_content += f"### {vuln_type} ({confirmed_count}/{total_count} 确认)\n\n" for result in vuln_results: status = "✓ 确认" if result["vulnerability_confirmed"] else "✗ 未确认" report_content += f"- **{result['file']}**: {status}\n" if result["vulnerability_confirmed"]: report_content += f" - 返回码: {result['exit_code']}\n" if result["output"]: report_content += f" - 输出: {result['output'][:100]}...\n" elif result["error"]: report_content += f" - 错误: {result['error']}\n" report_content += "\n" # 添加修复建议 report_content += """## 修复建议 ### 确认的漏洞 以下漏洞已被验证确认,建议优先修复: """ for vuln_type, vuln_results in vuln_groups.items(): confirmed_results = [r for r in vuln_results if r["vulnerability_confirmed"]] if confirmed_results: report_content += f"#### {vuln_type}\n" for result in confirmed_results: report_content += f"- {result['file']}: 需要修复\n" report_content += "\n" report_content += """### 未确认的问题 以下问题可能是误报或需要进一步分析: """ for vuln_type, vuln_results in vuln_groups.items(): unconfirmed_results = [r for r in vuln_results if not r["vulnerability_confirmed"]] if unconfirmed_results: report_content += f"#### {vuln_type}\n" for result in unconfirmed_results: report_content += f"- {result['file']}: 需要进一步分析\n" report_content += "\n" # 写入报告文件 report_path.write_text(report_content, encoding="utf-8") return report_path def generate_json_report(output_dir: Path, verification_results: dict) -> Path: """生成JSON格式的详细报告""" json_path = output_dir / "verification_results.json" # 添加时间戳 verification_results["timestamp"] = str(Path().cwd()) verification_results["generated_at"] = str(Path().cwd()) # 写入JSON文件 json_path.write_text(json.dumps(verification_results, indent=2, ensure_ascii=False), encoding="utf-8") return json_path