You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tools/generate_test_data.py

762 lines
23 KiB

#!/usr/bin/env python3
"""
CodeDetect测试数据生成工具
自动生成各种类型的测试数据,包括:
- C代码样本
- CBMC规范
- 函数元数据
- 验证结果
- 性能测试数据
"""
import os
import sys
import json
import random
import string
import inspect
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
import yaml
import numpy as np
from datetime import datetime, timedelta
@dataclass
class TestCase:
"""测试用例"""
name: str
description: str
code: str
specification: str
metadata: Dict[str, Any]
expected_result: Dict[str, Any]
difficulty: str # "easy", "medium", "hard"
category: str # "basic", "memory", "concurrency", "freertos"
@dataclass
class TestData:
"""测试数据集"""
name: str
description: str
test_cases: List[TestCase]
generation_config: Dict[str, Any]
created_at: datetime
class CodeGenerator(ABC):
"""代码生成器基类"""
@abstractmethod
def generate(self, **kwargs) -> str:
"""生成代码"""
pass
class BasicCodeGenerator(CodeGenerator):
"""基础C代码生成器"""
def __init__(self):
self.templates = {
"arithmetic": [
("int add(int a, int b) { return a + b; }", "加法函数"),
("int subtract(int a, int b) { return a - b; }", "减法函数"),
("int multiply(int a, int b) { return a * b; }", "乘法函数"),
("float divide(float a, float b) { return a / b; }", "除法函数"),
],
"logic": [
("int max(int a, int b) { return a > b ? a : b; }", "最大值函数"),
("int min(int a, int b) { return a < b ? a : b; }", "最小值函数"),
("int abs(int x) { return x >= 0 ? x : -x; }", "绝对值函数"),
],
"string": [
("int strlen(const char *s) { int i = 0; while (s[i]) i++; return i; }", "字符串长度"),
("void strcpy(char *dest, const char *src) { while ((*dest++ = *src++)); }", "字符串复制"),
]
}
def generate(self, category: str = "random", **kwargs) -> str:
"""生成基础C代码"""
if category == "random":
category = random.choice(list(self.templates.keys()))
if category not in self.templates:
category = "arithmetic"
template, _ = random.choice(self.templates[category])
# 添加随机变化
if random.random() < 0.3:
template = self._add_random_comments(template)
if random.random() < 0.2:
template = self._add_error_handling(template)
return template
def _add_random_comments(self, code: str) -> str:
"""添加随机注释"""
comments = [
"// 简单的计算函数",
"// 返回两个数的和",
"// 基础算术运算",
"// 处理整数运算"
]
comment = random.choice(comments)
return f"{comment}\n{code}"
def _add_error_handling(self, code: str) -> str:
"""添加错误处理"""
# 简单的错误处理示例
return f"""
/* 错误处理版本 */
{code}
"""
class MemoryCodeGenerator(CodeGenerator):
"""内存操作代码生成器"""
def __init__(self):
self.templates = [
"""
void* safe_malloc(size_t size) {
if (size == 0) return NULL;
void* ptr = malloc(size);
if (!ptr) return NULL;
return ptr;
}""",
"""
int array_sum(int* arr, int size) {
if (!arr || size <= 0) return 0;
int sum = 0;
for (int i = 0; i < size; i++) {
sum += arr[i];
}
return sum;
}""",
"""
void array_copy(int* dest, int* src, int size) {
if (!dest || !src || size <= 0) return;
for (int i = 0; i < size; i++) {
dest[i] = src[i];
}
}"""
]
def generate(self, **kwargs) -> str:
"""生成内存操作代码"""
template = random.choice(self.templates)
return template.strip()
class ConcurrencyCodeGenerator(CodeGenerator):
"""并发代码生成器"""
def __init__(self):
self.templates = [
"""
#include <pthread.h>
typedef struct {
int counter;
pthread_mutex_t mutex;
} SharedData;
void increment_counter(SharedData* data) {
pthread_mutex_lock(&data->mutex);
data->counter++;
pthread_mutex_unlock(&data->mutex);
}""",
"""
#include <stdatomic.h>
void atomic_increment(atomic_int* counter) {
atomic_fetch_add(counter, 1);
}""",
"""
#include <semaphore.h>
void producer_consumer() {
sem_t empty, full;
sem_init(&empty, 0, 10);
sem_init(&full, 0, 0);
// 生产者和消费者逻辑
sem_destroy(&empty);
sem_destroy(&full);
}"""
]
def generate(self, **kwargs) -> str:
"""生成并发代码"""
template = random.choice(self.templates)
return template.strip()
class FreeRTOSCodeGenerator(CodeGenerator):
"""FreeRTOS代码生成器"""
def __init__(self):
self.templates = [
"""
#include "FreeRTOS.h"
#include "task.h"
void vTaskFunction(void *pvParameters) {
TickType_t xLastWakeTime = xTaskGetTickCount();
for (;;) {
// 任务逻辑
vTaskDelayUntil(&xLastWakeTime, pdMS_TO_TICKS(100));
}
}""",
"""
#include "FreeRTOS.h"
#include "queue.h"
void vQueueSenderTask(void *pvParameters) {
QueueHandle_t xQueue = (QueueHandle_t)pvParameters;
int32_t lValueToSend = 0;
for (;;) {
xQueueSend(xQueue, &lValueToSend, portMAX_DELAY);
lValueToSend++;
vTaskDelay(pdMS_TO_TICKS(1000));
}
}""",
"""
#include "FreeRTOS.h"
#include "semphr.h"
void vSemaphoreTask(void *pvParameters) {
SemaphoreHandle_t xSemaphore = (SemaphoreHandle_t)pvParameters;
for (;;) {
if (xSemaphoreTake(xSemaphore, portMAX_DELAY) == pdTRUE) {
// 执行受保护的操作
xSemaphoreGive(xSemaphore);
}
vTaskDelay(pdMS_TO_TICKS(100));
}
}"""
]
def generate(self, task_type: str = "random", **kwargs) -> str:
"""生成FreeRTOS代码"""
template = random.choice(self.templates)
return template.strip()
class SpecificationGenerator:
"""CBMC规范生成器"""
def __init__(self):
self.templates = {
"basic": """
void {function_name}_test() {{
// 生成随机输入
{input_declaration}
// 调用被测函数
{function_call}
// 验证结果
{assertions}
}}""",
"memory": """
void {function_name}_memory_test() {{
// 内存安全测试
{input_declaration}
// 测试NULL指针
{null_pointer_test}
// 测试边界条件
{boundary_test}
// 测试正常情况
{normal_test}
}}""",
"concurrency": """
void {function_name}_concurrency_test() {{
// 并发安全测试
{shared_data_declaration}
// 模拟并发访问
{concurrent_access}
// 验证结果一致性
{consistency_check}
}}""",
"freertos": """
#include "FreeRTOS.h"
#include "task.h"
void {function_name}_freertos_test() {{
// FreeRTOS测试设置
TaskHandle_t xTaskHandle = NULL;
{task_parameters}
// 测试任务创建
BaseType_t xResult = xTaskCreate(
{function_name},
"TestTask",
configMINIMAL_STACK_SIZE,
{task_params},
1,
&xTaskHandle
);
__CPROVER_assert(xResult == pdPASS, "task_creation_success");
__CPROVER_assert(xTaskHandle != NULL, "task_handle_not_null");
}}"""
}
def generate(self, function_name: str, spec_type: str = "basic", **kwargs) -> str:
"""生成CBMC规范"""
if spec_type not in self.templates:
spec_type = "basic"
template = self.templates[spec_type]
# 填充模板
input_decl = self._generate_input_declaration(**kwargs)
function_call = self._generate_function_call(function_name, **kwargs)
assertions = self._generate_assertions(**kwargs)
return template.format(
function_name=function_name,
input_declaration=input_decl,
function_call=function_call,
assertions=assertions,
**kwargs
)
def _generate_input_declaration(self, **kwargs) -> str:
"""生成输入声明"""
return "int a = __CPROVER_nondet_int();\n int b = __CPROVER_nondet_int();"
def _generate_function_call(self, function_name: str, **kwargs) -> str:
"""生成函数调用"""
return f"int result = {function_name}(a, b);"
def _generate_assertions(self, **kwargs) -> str:
"""生成断言"""
return "__CPROVER_assert(result == result, \"basic_assertion\");"
class MetadataGenerator:
"""函数元数据生成器"""
def __init__(self):
self.function_names = [
"add", "subtract", "multiply", "divide", "max", "min", "abs",
"strlen", "strcpy", "strcmp", "concat", "reverse", "sort",
"find", "search", "filter", "transform", "validate", "process"
]
self.types = ["int", "float", "double", "char", "void", "bool"]
def generate(self, function_name: Optional[str] = None) -> Dict[str, Any]:
"""生成函数元数据"""
if not function_name:
function_name = random.choice(self.function_names)
return {
"name": function_name,
"return_type": random.choice(self.types),
"parameters": self._generate_parameters(),
"complexity_score": random.uniform(0.1, 1.0),
"is_recursive": random.random() < 0.1,
"has_loops": random.random() < 0.6,
"has_conditionals": random.random() < 0.8,
"estimated_lines": random.randint(5, 50),
"description": f"Generated function {function_name}"
}
def _generate_parameters(self) -> List[Dict[str, Any]]:
"""生成参数列表"""
num_params = random.randint(0, 5)
parameters = []
for i in range(num_params):
param = {
"name": f"param_{i}",
"type": random.choice(self.types),
"is_pointer": random.random() < 0.3,
"is_array": random.random() < 0.2,
"description": f"Parameter {i}"
}
parameters.append(param)
return parameters
class VerificationResultGenerator:
"""验证结果生成器"""
def __init__(self):
self.status_options = ["success", "failure", "timeout", "error"]
self.error_types = [
"memory_safety_violation",
"assertion_failure",
"null_pointer_dereference",
"buffer_overflow",
"integer_overflow",
"division_by_zero"
]
def generate(self, test_case: TestCase) -> Dict[str, Any]:
"""生成验证结果"""
status = random.choice(self.status_options)
result = {
"status": status,
"execution_time": random.uniform(0.1, 30.0),
"memory_usage": random.randint(1024, 1024 * 1024),
"test_case": test_case.name,
"timestamp": datetime.now().isoformat(),
"cbmc_version": "5.12.0",
"platform": "Linux x86_64"
}
if status == "success":
result.update({
"properties_verified": random.randint(1, 10),
"coverage_percentage": random.uniform(0.5, 1.0),
"verification_depth": random.randint(1, 20)
})
elif status == "failure":
result.update({
"error_type": random.choice(self.error_types),
"error_location": f"{test_case.name}:{random.randint(1, 50)}",
"counterexample": self._generate_counterexample(),
"properties_failed": random.randint(1, 5)
})
elif status == "timeout":
result.update({
"timeout_duration": 30.0,
"progress_at_timeout": random.uniform(0.1, 0.9)
})
else: # error
result.update({
"error_message": "CBMC execution error",
"system_error": random.choice(["memory_limit", "process_killed", "invalid_input"])
})
return result
def _generate_counterexample(self) -> Dict[str, Any]:
"""生成反例"""
return {
"inputs": {
"a": random.randint(-1000, 1000),
"b": random.randint(-1000, 1000)
},
"execution_trace": [
f"Line {i}: {random.choice(['enter', 'exit', 'branch'])}"
for i in range(random.randint(3, 10))
],
"failed_assertion": "assertion_failed_example"
}
class TestDataGenerator:
"""测试数据生成器主类"""
def __init__(self, output_dir: str):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.code_generators = {
"basic": BasicCodeGenerator(),
"memory": MemoryCodeGenerator(),
"concurrency": ConcurrencyCodeGenerator(),
"freertos": FreeRTOSCodeGenerator()
}
self.spec_generator = SpecificationGenerator()
self.metadata_generator = MetadataGenerator()
self.result_generator = VerificationResultGenerator()
def generate_test_suite(self, name: str, config: Dict[str, Any]) -> TestData:
"""生成测试套件"""
print(f"🔄 生成测试套件: {name}")
test_cases = []
# 根据配置生成测试用例
for category in config.get("categories", ["basic"]):
num_tests = config.get("tests_per_category", 5)
for i in range(num_tests):
test_case = self._generate_test_case(category, f"{category}_test_{i}")
test_cases.append(test_case)
test_data = TestData(
name=name,
description=f"Generated test suite: {name}",
test_cases=test_cases,
generation_config=config,
created_at=datetime.now()
)
# 保存测试数据
self._save_test_data(test_data)
return test_data
def _generate_test_case(self, category: str, name: str) -> TestCase:
"""生成单个测试用例"""
print(f" 📝 生成测试用例: {name}")
# 生成代码
code_generator = self.code_generators.get(category, self.code_generators["basic"])
code = code_generator.generate()
# 提取函数名
function_name = self._extract_function_name(code)
# 生成元数据
metadata = self.metadata_generator.generate(function_name)
# 生成规范
spec_type = self._get_spec_type_for_category(category)
specification = self.spec_generator.generate(
function_name,
spec_type,
**metadata.get("parameters", {})
)
# 生成期望结果
expected_result = self.result_generator.generate(
TestCase(name, "", code, specification, metadata, {}, category, category)
)
# 确定难度
difficulty = self._determine_difficulty(category, len(code))
test_case = TestCase(
name=name,
description=f"Generated {category} test case",
code=code,
specification=specification,
metadata=metadata,
expected_result=expected_result,
difficulty=difficulty,
category=category
)
# 保存测试用例文件
self._save_test_case(test_case)
return test_case
def _extract_function_name(self, code: str) -> str:
"""从代码中提取函数名"""
import re
match = re.search(r'\b(\w+)\s*\([^)]*\)\s*{', code)
return match.group(1) if match else "test_function"
def _get_spec_type_for_category(self, category: str) -> str:
"""根据类别获取规范类型"""
spec_mapping = {
"basic": "basic",
"memory": "memory",
"concurrency": "concurrency",
"freertos": "freertos"
}
return spec_mapping.get(category, "basic")
def _determine_difficulty(self, category: str, code_length: int) -> str:
"""确定测试难度"""
if category == "basic":
return "easy"
elif category == "memory":
return "medium"
elif code_length > 200:
return "hard"
else:
return "medium"
def _save_test_data(self, test_data: TestData):
"""保存测试数据"""
# 转换为字典格式
data_dict = asdict(test_data)
data_dict["created_at"] = test_data.created_at.isoformat()
data_dict["test_cases"] = [asdict(tc) for tc in test_data.test_cases]
# 保存为JSON
output_file = self.output_dir / f"{test_data.name}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data_dict, f, indent=2, ensure_ascii=False)
# 保存为YAML
yaml_file = self.output_dir / f"{test_data.name}.yaml"
with open(yaml_file, 'w', encoding='utf-8') as f:
yaml.dump(data_dict, f, default_flow_style=False, allow_unicode=True)
def _save_test_case(self, test_case: TestCase):
"""保存单个测试用例"""
# 创建分类目录
category_dir = self.output_dir / "cases" / test_case.category
category_dir.mkdir(parents=True, exist_ok=True)
# 保存代码文件
code_file = category_dir / f"{test_case.name}.c"
with open(code_file, 'w', encoding='utf-8') as f:
f.write(test_case.code)
# 保存规范文件
spec_file = category_dir / f"{test_case.name}_spec.c"
with open(spec_file, 'w', encoding='utf-8') as f:
f.write(test_case.specification)
# 保存元数据文件
metadata_file = category_dir / f"{test_case.name}_meta.json"
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(test_case.metadata, f, indent=2, ensure_ascii=False)
def generate_performance_data(self, config: Dict[str, Any]) -> None:
"""生成性能测试数据"""
print("📊 生成性能测试数据...")
performance_data = {
"baseline_metrics": self._generate_baseline_metrics(),
"scalability_data": self._generate_scalability_data(),
"memory_usage_data": self._generate_memory_usage_data(),
"concurrent_performance": self._generate_concurrent_performance_data()
}
# 保存性能数据
perf_file = self.output_dir / "performance_data.json"
with open(perf_file, 'w', encoding='utf-8') as f:
json.dump(performance_data, f, indent=2, ensure_ascii=False)
def _generate_baseline_metrics(self) -> Dict[str, Any]:
"""生成基准性能数据"""
return {
"parsing_time": {
"mean": 0.5,
"std": 0.1,
"min": 0.2,
"max": 2.0,
"unit": "seconds"
},
"verification_time": {
"mean": 10.0,
"std": 5.0,
"min": 1.0,
"max": 60.0,
"unit": "seconds"
},
"memory_usage": {
"mean": 50.0,
"std": 20.0,
"min": 10.0,
"max": 200.0,
"unit": "MB"
}
}
def _generate_scalability_data(self) -> Dict[str, Any]:
"""生成扩展性数据"""
code_sizes = [100, 500, 1000, 2000, 5000]
return {
"code_size_vs_time": [
{
"code_size": size,
"parsing_time": size * 0.001 + random.uniform(-0.1, 0.1),
"verification_time": size * 0.01 + random.uniform(-1, 1)
}
for size in code_sizes
]
}
def _generate_memory_usage_data(self) -> Dict[str, Any]:
"""生成内存使用数据"""
return {
"memory_profile": [
{
"timestamp": (datetime.now() - timedelta(minutes=i)).isoformat(),
"memory_mb": random.randint(50, 200),
"cpu_usage": random.uniform(10, 90)
}
for i in range(60)
]
}
def _generate_concurrent_performance_data(self) -> Dict[str, Any]:
"""生成并发性能数据"""
concurrency_levels = [1, 2, 4, 8, 16]
return {
"concurrent_jobs": [
{
"concurrent_jobs": level,
"throughput": level * 0.8 + random.uniform(-0.2, 0.2),
"avg_response_time": 10.0 / level + random.uniform(-1, 1)
}
for level in concurrency_levels
]
}
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='CodeDetect测试数据生成工具')
parser.add_argument('--output-dir', type=str, default='test_data',
help='输出目录 (默认: test_data)')
parser.add_argument('--suite', type=str, default='default',
help='测试套件名称 (默认: default)')
parser.add_argument('--categories', nargs='+',
default=['basic', 'memory', 'concurrency', 'freertos'],
help='测试类别')
parser.add_argument('--tests-per-category', type=int, default=5,
help='每类别测试数量 (默认: 5)')
parser.add_argument('--include-performance', action='store_true',
help='生成性能测试数据')
parser.add_argument('--verbose', action='store_true',
help='详细输出')
args = parser.parse_args()
print("🚀 CodeDetect测试数据生成器")
print("=" * 50)
# 创建生成器
generator = TestDataGenerator(args.output_dir)
# 生成配置
config = {
"categories": args.categories,
"tests_per_category": args.tests_per_category,
"generated_at": datetime.now().isoformat()
}
# 生成测试套件
test_data = generator.generate_test_suite(args.suite, config)
# 生成性能数据
if args.include_performance:
generator.generate_performance_data(config)
print(f"\n✅ 测试数据生成完成!")
print(f"📁 输出目录: {args.output_dir}")
print(f"📊 测试用例: {len(test_data.test_cases)}")
print(f"📋 类别: {', '.join(args.categories)}")
if args.verbose:
print("\n📈 测试用例分布:")
for category in args.categories:
count = sum(1 for tc in test_data.test_cases if tc.category == category)
print(f" {category}: {count}")
if __name__ == "__main__":
main()