#!/usr/bin/env python3 # CodeDetect测试数据生成器 import os import sys import json import random import string from pathlib import Path from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict import tempfile @dataclass class TestCase: """测试用例""" name: str category: str code: str expected_result: Dict[str, Any] complexity_score: float metadata: Dict[str, Any] @dataclass class TestData: """测试数据集""" name: str description: str test_cases: List[TestCase] config: Dict[str, Any] class TestDataGenerator: """测试数据生成器""" def __init__(self, output_dir: str = "tests/fixtures"): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) def generate_all_test_data(self): """生成所有测试数据""" print("🚀 开始生成测试数据...") # 生成各种测试数据 self._generate_basic_test_data() self._generate_freertos_test_data() self._generate_edge_case_data() self._generate_performance_data() print("✅ 测试数据生成完成") def _generate_basic_test_data(self): """生成基础测试数据""" print("📝 生成基础测试数据...") test_cases = [ TestCase( name="simple_function", category="basic", code=""" int add(int a, int b) { return a + b; } """, expected_result={"status": "success", "properties": ["deterministic", "no_side_effects"]}, complexity_score=0.1, metadata={"function_count": 1, "line_count": 3} ), TestCase( name="recursive_function", category="basic", code=""" int factorial(int n) { if (n <= 1) return 1; return n * factorial(n - 1); } """, expected_result={"status": "success", "properties": ["recursive", "terminating"]}, complexity_score=0.3, metadata={"function_count": 1, "line_count": 4, "recursion_depth": 10} ), TestCase( name="array_operations", category="basic", code=""" #include int array_sum(int* arr, int size) { if (arr == NULL || size <= 0) return 0; int sum = 0; for (int i = 0; i < size; i++) { sum += arr[i]; } return sum; } """, expected_result={"status": "success", "properties": ["null_safe", "bounded_loop"]}, complexity_score=0.4, metadata={"function_count": 1, "line_count": 8, "complexity_metrics": {"cyclomatic": 3}} ) ] test_data = TestData( name="basic_test_data", description="基础C代码测试用例", test_cases=test_cases, config={"language": "c", "complexity_range": [0.1, 0.5]} ) self._save_test_data(test_data, "basic_test_data.json") def _generate_freertos_test_data(self): """生成FreeRTOS测试数据""" print("🔧 生成FreeRTOS测试数据...") test_cases = [ TestCase( name="freertos_task_creation", category="freertos", code=""" #include "FreeRTOS.h" #include "task.h" void vTaskFunction(void *pvParameters) { TaskParameters_t *params = (TaskParameters_t *)pvParameters; configASSERT(params != NULL); while (1) { vTaskDelay(pdMS_TO_TICKS(1000)); } } int main() { TaskParameters_t params = {0}; xTaskCreate(vTaskFunction, "Task1", 128, ¶ms, 1, NULL); vTaskStartScheduler(); return 0; } """, expected_result={"status": "success", "properties": ["task_safe", "parameter_validation"]}, complexity_score=0.6, metadata={"freertos_features": ["tasks", "delays"], "safety_checks": 2} ), TestCase( name="freertos_queue_operations", category="freertos", code=""" #include "FreeRTOS.h" #include "queue.h" QueueHandle_t xQueue; void vProducerTask(void *pvParameters) { int data = 0; while (1) { data++; xQueueSend(xQueue, &data, pdMS_TO_TICKS(100)); vTaskDelay(pdMS_TO_TICKS(10)); } } void vConsumerTask(void *pvParameters) { int received_data; while (1) { if (xQueueReceive(xQueue, &received_data, pdMS_TO_TICKS(100)) == pdPASS) { // Process data } } } """, expected_result={"status": "success", "properties": ["queue_safe", "timeout_handling"]}, complexity_score=0.7, metadata={"freertos_features": ["queues", "tasks", "timeouts"], "concurrency_patterns": ["producer_consumer"]} ) ] test_data = TestData( name="freertos_test_data", description="FreeRTOS相关测试用例", test_cases=test_cases, config={"language": "c", "framework": "freertos", "headers": ["FreeRTOS.h", "queue.h", "task.h"]} ) self._save_test_data(test_data, "freertos_test_data.json") def _generate_edge_case_data(self): """生成边缘情况测试数据""" print("⚠️ 生成边缘情况测试数据...") test_cases = [ TestCase( name="null_pointer_dereference", category="edge_cases", code=""" #include int risky_function(int *ptr) { // 危险的空指针解引用 return *ptr; } """, expected_result={"status": "warning", "properties": ["null_pointer_risk"]}, complexity_score=0.2, metadata={"vulnerabilities": ["null_pointer_dereference"], "severity": "high"} ), TestCase( name="buffer_overflow", category="edge_cases", code=""" void copy_data(char *dest, char *src, int size) { for (int i = 0; i <= size; i++) { // 注意:<= 导致缓冲区溢出 dest[i] = src[i]; } } """, expected_result={"status": "warning", "properties": ["buffer_overflow_risk"]}, complexity_score=0.3, metadata={"vulnerabilities": ["buffer_overflow"], "severity": "high"} ), TestCase( name="resource_leak", category="edge_cases", code=""" #include void leaky_function() { int *ptr = malloc(sizeof(int)); *ptr = 42; // 忘记释放内存 } """, expected_result={"status": "warning", "properties": ["memory_leak"]}, complexity_score=0.2, metadata={"vulnerabilities": ["memory_leak"], "severity": "medium"} ) ] test_data = TestData( name="edge_case_test_data", description="边缘情况和漏洞测试用例", test_cases=test_cases, config={"language": "c", "focus": "security", "expected_warnings": True} ) self._save_test_data(test_data, "edge_case_test_data.json") def _generate_performance_data(self): """生成性能测试数据""" print("⚡ 生成性能测试数据...") # 生成大规模测试数据 large_function_code = "" for i in range(100): large_function_code += f""" int function_{i}(int x) {{ int result = 0; for (int j = 0; j < {random.randint(10, 100)}; j++) {{ result += x * j; }} return result; }} """ test_cases = [ TestCase( name="large_codebase", category="performance", code=large_function_code, expected_result={"status": "success", "processing_time_ms": "<1000"}, complexity_score=0.8, metadata={"function_count": 100, "line_count": 600, "estimated_processing_time": "medium"} ), TestCase( name="deep_recursion", category="performance", code=""" int deep_recursive(int n) { if (n <= 0) return 1; return deep_recursive(n - 1) + deep_recursive(n - 2); } """, expected_result={"status": "success", "properties": ["exponential_complexity"]}, complexity_score=0.9, metadata={"function_count": 1, "line_count": 4, "complexity_class": "exponential"} ) ] test_data = TestData( name="performance_test_data", description="性能相关测试用例", test_cases=test_cases, config={"language": "c", "focus": "performance", "timeout_seconds": 30} ) self._save_test_data(test_data, "performance_test_data.json") def _save_test_data(self, test_data: TestData, filename: str): """保存测试数据到文件""" filepath = self.output_dir / filename with open(filepath, 'w', encoding='utf-8') as f: json.dump(asdict(test_data), f, indent=2, ensure_ascii=False) print(f" 保存测试数据到: {filepath}") def load_test_data(self, filename: str) -> TestData: """从文件加载测试数据""" filepath = self.output_dir / filename if not filepath.exists(): raise FileNotFoundError(f"测试数据文件不存在: {filepath}") with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) return TestData( name=data["name"], description=data["description"], test_cases=[TestCase(**tc) for tc in data["test_cases"]], config=data["config"] ) def main(): """主函数""" generator = TestDataGenerator() generator.generate_all_test_data() if __name__ == "__main__": main()