You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

358 lines
12 KiB

#!/usr/bin/env python3
"""
Adapter that converts cppcheck_test_generator outputs into the unified Issue schema.
Typical usage:
python3 cppcheck_adapter.py \
--report /path/to/cppcheck_report.xml \
--issues-dir /path/to/cppcheck_tests \
--output unified_report/cppcheck_issues.json \
--verification /path/to/cppcheck_tests/verification_results.json
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
# Ensure we can import the cppcheck_test_generator package.
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.append(str(PROJECT_ROOT))
from cppcheck_test_generator.models import CppcheckIssue # type: ignore # noqa: E402
from cppcheck_test_generator.parsers import parse_cppcheck_xml # type: ignore # noqa: E402
SEVERITY_MAP = {
"error": "HIGH",
"warning": "MEDIUM",
"style": "LOW",
"performance": "LOW",
"portability": "LOW",
"information": "INFO",
"note": "INFO",
}
CVSS_BASE = {
"CRITICAL": 9.0,
"HIGH": 8.0,
"MEDIUM": 6.5,
"LOW": 3.5,
"INFO": 0.0,
}
CATEGORY_MAP = {
"memleak": "resource_management",
"nullpointer": "memory_safety",
"arrayindexoutofbounds": "memory_safety",
"doublefree": "memory_safety",
"useafterfree": "memory_safety",
"uninitvar": "logic_bug",
"zerodiv": "logic_bug",
"mismatchallocdealloc": "resource_management",
}
IMPACT_HINT = {
"memleak": "持续泄漏会耗尽资源,导致服务性能下降或崩溃。",
"nullpointer": "空指针解引用可能导致进程崩溃,可被用作拒绝服务。",
"arrayindexoutofbounds": "数组越界访问可能破坏内存,造成未定义行为或远程代码执行。",
"doublefree": "重复释放可能被利用进行堆喷射,从而执行任意代码。",
"useafterfree": "释放后继续使用指针可能导致信息泄露或执行任意代码。",
"uninitvar": "使用未初始化变量会导致不可预测行为或安全边界被绕过。",
"zerodiv": "除零错误可能导致服务崩溃。",
}
REFERENCE_MAP = {
"memleak": ["https://cwe.mitre.org/data/definitions/401.html"],
"nullpointer": ["https://cwe.mitre.org/data/definitions/476.html"],
"arrayindexoutofbounds": ["https://cwe.mitre.org/data/definitions/119.html"],
"doublefree": ["https://cwe.mitre.org/data/definitions/415.html"],
"useafterfree": ["https://cwe.mitre.org/data/definitions/416.html"],
"uninitvar": ["https://cwe.mitre.org/data/definitions/457.html"],
"zerodiv": ["https://cwe.mitre.org/data/definitions/369.html"],
}
@dataclass
class Issue:
id: str
source: Dict[str, Any]
basic: Dict[str, Any]
location: Dict[str, Any]
severity: Dict[str, Any]
status: Dict[str, Any]
description: Dict[str, Any]
reproduce: Dict[str, Any]
root_cause: Dict[str, Any]
impact: Dict[str, Any]
fix: Dict[str, Any]
def _normalize_severity(raw: str) -> str:
if not raw:
return "INFO"
return SEVERITY_MAP.get(raw.lower(), raw.upper())
def _cvss_for(severity: str) -> float:
return CVSS_BASE.get(severity, 0.0)
def _category_for(issue: CppcheckIssue) -> str:
key = issue.id.lower()
return CATEGORY_MAP.get(key, "logic_bug")
def _impact_for(issue: CppcheckIssue) -> str:
return IMPACT_HINT.get(issue.id.lower(), "可能影响系统稳定性与可用性。")
def _references_for(issue: CppcheckIssue) -> List[str]:
return REFERENCE_MAP.get(issue.id.lower(), [])
def _parse_markdown_sections(text: str) -> Dict[str, str]:
sections: Dict[str, str] = {}
current = "body"
buffer: List[str] = []
heading_pattern = re.compile(r"^(#{1,6})\s+(.*)")
def flush():
if buffer:
sections[current] = "\n".join(buffer).strip()
elif current not in sections:
sections[current] = ""
for line in text.splitlines():
match = heading_pattern.match(line.strip())
if match:
flush()
current = match.group(2).strip().lower()
buffer = []
else:
buffer.append(line)
flush()
return sections
def _extract_section(sections: Dict[str, str], keywords: Iterable[str]) -> Optional[str]:
for key in keywords:
key_lower = key.lower()
for section_key, value in sections.items():
if key_lower in section_key:
return value.strip()
return None
def _extract_list(text: Optional[str]) -> List[str]:
if not text:
return []
items = []
for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
if stripped[0] in ("-", "*"):
stripped = stripped[1:].strip()
items.append(stripped)
return items
def _load_verification_map(verification_path: Optional[Path], issues_dir: Path) -> Dict[str, Dict[str, Any]]:
mapping: Dict[str, Dict[str, Any]] = {}
def record(entry: Dict[str, Any]) -> None:
file_name = entry.get("file")
if not file_name:
return
key = Path(file_name).stem # e.g., issue_001_memleak
mapping[key] = entry
if verification_path and verification_path.exists():
try:
data = json.loads(verification_path.read_text(encoding="utf-8"))
for entry in data.get("results", []):
record(entry)
except Exception:
pass
# Also load per-issue verification JSON files if present
for json_file in issues_dir.glob("verification_*.json"):
try:
entry = json.loads(json_file.read_text(encoding="utf-8"))
record(entry)
except Exception:
continue
return mapping
def _match_issue_files(issues_dir: Path) -> Dict[str, Tuple[Path, Optional[Path]]]:
mapping: Dict[str, Tuple[Path, Optional[Path]]] = {}
for md_file in sorted(issues_dir.glob("issue_*_*.md")):
base = md_file.stem # issue_001_rule
parts = base.split("_", 2)
if len(parts) < 3:
continue
issue_id = parts[2]
cpp_path = md_file.with_suffix(".cpp")
mapping[issue_id.lower()] = (md_file, cpp_path if cpp_path.exists() else None)
return mapping
def _build_issue(
cpp_issue: CppcheckIssue,
md_path: Path,
cpp_path: Optional[Path],
verification_info: Optional[Dict[str, Any]],
report_path: Path,
) -> Issue:
severity_level = _normalize_severity(cpp_issue.severity)
issue_id = f"CPPC-{cpp_issue.id}-{md_path.stem.split('_')[1]}"
location = cpp_issue.locations[0] if cpp_issue.locations else None
sections = _parse_markdown_sections(md_path.read_text(encoding="utf-8"))
description_section = _extract_section(sections, ["漏洞描述", "问题描述", "description"])
repro_section = _extract_section(sections, ["复现步骤", "重现步骤", "reproduction"])
root_cause_section = _extract_section(sections, ["根本原因", "原因分析", "root cause"])
impact_section = _extract_section(sections, ["潜在影响", "影响", "impact"])
fix_section = _extract_section(sections, ["修复建议", "修复方案", "mitigation"])
reference_section = _extract_section(sections, ["参考链接", "references"])
confirmed = False
confirmed_by: List[str] = []
if verification_info:
confirmed = bool(
verification_info.get("vulnerability_confirmed")
or verification_info.get("triggers_cppcheck")
)
if confirmed:
confirmed_by.append("generated_test")
reproduce_steps = _extract_list(repro_section) or [
f"参阅 {md_path.name} 中的复现说明。",
"编译并运行对应的测试用例以验证漏洞。",
]
if cpp_path:
reproduce_steps.append(f"测试用例: {cpp_path}")
artifacts: Dict[str, Any] = {"analysis_markdown": str(md_path)}
if cpp_path:
artifacts["generated_test"] = str(cpp_path)
if verification_info:
artifacts["verification"] = verification_info
return Issue(
id=issue_id,
source={
"engine": "cppcheck_ai",
"sub_tool": "cppcheck",
"raw_ids": [cpp_issue.id],
"report_path": str(report_path),
},
basic={
"title": f"{cpp_issue.id} - {location.file_path if location else '未知文件'}",
"type": cpp_issue.id,
"cwe": None,
"category": _category_for(cpp_issue),
},
location={
"file": str(location.file_path) if location else None,
"function": None,
"line": location.line if location else None,
"column": None,
"snippet": description_section or cpp_issue.message,
},
severity={
"level": severity_level,
"cvss": _cvss_for(severity_level),
"cvss_vector": None,
},
status={
"state": "confirmed" if confirmed else "new",
"confirmed_by": confirmed_by,
"first_seen": None,
"last_seen": None,
},
description={
"summary": cpp_issue.message,
"details": description_section or sections.get("body", cpp_issue.message),
},
reproduce={
"steps": reproduce_steps,
"inputs": {},
"artifacts": artifacts,
},
root_cause={
"short": root_cause_section or cpp_issue.message,
"technical_details": root_cause_section or "",
},
impact={
"technical": impact_section or _impact_for(cpp_issue),
"business": "可能影响系统稳定性与可用性。",
},
fix={
"recommendation": _extract_list(fix_section) or ["参考安全开发规范修复该漏洞。"],
"code_patch_hint": fix_section or "",
"references": _extract_list(reference_section) or _references_for(cpp_issue),
},
)
def convert(report_path: Path, issues_dir: Path, output_path: Path, verification_path: Optional[Path]) -> None:
cppcheck_issues = parse_cppcheck_xml(report_path)
issue_map = {issue.id.lower(): issue for issue in cppcheck_issues}
files_map = _match_issue_files(issues_dir)
verification_map = _load_verification_map(verification_path, issues_dir)
unified_issues: List[Issue] = []
for issue_key, (md_path, cpp_path) in files_map.items():
cpp_issue = issue_map.get(issue_key)
if not cpp_issue:
# 尝试更宽松匹配(移除非字母数字)
normalized = re.sub(r"[^a-z0-9]", "", issue_key)
cpp_issue = next(
(iss for key, iss in issue_map.items() if re.sub(r"[^a-z0-9]", "", key) == normalized),
None,
)
if not cpp_issue:
print(f"[cppcheck_adapter] 跳过 {md_path.name}: 在报告中找不到对应的 issue id")
continue
verification_info = None
base_key = md_path.stem.replace(".md", "")
if base_key in verification_map:
verification_info = verification_map[base_key]
issue = _build_issue(cpp_issue, md_path, cpp_path, verification_info, report_path)
unified_issues.append(issue)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump([asdict(issue) for issue in unified_issues], f, ensure_ascii=False, indent=2)
print(f"[cppcheck_adapter] Converted {len(unified_issues)} issues -> {output_path}")
def main() -> None:
parser = argparse.ArgumentParser(description="Convert cppcheck_test_generator outputs to unified issues.")
parser.add_argument("--report", type=Path, required=True, help="Path to cppcheck XML report.")
parser.add_argument("--issues-dir", type=Path, required=True, help="Directory containing generated issue markdown/cpp files.")
parser.add_argument("--output", type=Path, required=True, help="Path to write unified issues JSON.")
parser.add_argument("--verification", type=Path, help="Optional verification_results.json path.")
args = parser.parse_args()
convert(args.report, args.issues_dir, args.output, args.verification)
if __name__ == "__main__":
main()