You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/integration/test_freertos_verification.py

983 lines
31 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
FreeRTOS验证集成测试
本模块为FreeRTOS验证提供专门的集成测试包括
- FreeRTOS任务创建验证
- FreeRTOS队列操作验证
- FreeRTOS信号量验证
- FreeRTOS定时器验证
- 并发性验证
- 中断处理验证
"""
import pytest
import asyncio
import tempfile
import json
import os
from pathlib import Path
from unittest.mock import Mock, patch, AsyncMock, MagicMock
from typing import Dict, List, Any, Optional
import time
from src.ui.workflow import WorkflowManager, WorkflowConfig, WorkflowMode
from src.ui.job_manager import JobManager, JobStatus, JobType
from src.ui.api import init_api
from src.ui.web_app import create_app
from src.utils.config import get_config, ConfigManager
# 导入测试固件
from tests.fixtures.cbmc_outputs import cbmc_fixtures
from tests.fixtures.llm_responses import llm_fixtures
class TestFreeRTOSVerificationIntegration:
"""FreeRTOS验证集成测试"""
def setup_method(self):
"""测试方法设置"""
self.config = get_config()
self.temp_dir = tempfile.mkdtemp()
self.config.working_directory = self.temp_dir
# 初始化各个组件
self.job_manager = JobManager(self.config)
self.workflow_manager = WorkflowManager(self.job_manager, config=self.config)
# 创建测试应用
self.app = create_app(self.config)
self.app.config['TESTING'] = True
self.app.config['SECRET_KEY'] = 'test-secret'
# 初始化API
init_api(
config=self.config,
job_manager=self.job_manager,
workflow_manager=self.workflow_manager,
websocket_handler=None
)
def teardown_method(self):
"""测试方法清理"""
import shutil
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
@pytest.mark.skipif(not os.environ.get('FREERTOS_PATH'), reason="FreeRTOS headers not available")
def test_freertos_task_creation_verification(self):
"""测试FreeRTOS任务创建验证"""
freertos_code = """
#include "FreeRTOS.h"
#include "task.h"
void vTaskFunction(void *pvParameters) {
int *param = (int *)pvParameters;
for (;;) {
// Task implementation
if (*param > 0) {
(*param)--;
}
vTaskDelay(pdMS_TO_TICKS(100));
}
}
void create_test_task(void) {
int task_param = 10;
TaskHandle_t xHandle = NULL;
BaseType_t xResult = xTaskCreate(
vTaskFunction,
"TestTask",
configMINIMAL_STACK_SIZE,
&task_param,
tskIDLE_PRIORITY + 1,
&xHandle
);
if (xResult == pdPASS) {
// Task created successfully
vTaskDelete(xHandle);
}
}
"""
test_file = os.path.join(self.temp_dir, "freertos_task_test.c")
with open(test_file, 'w') as f:
f.write(freertos_code)
try:
# 创建FreeRTOS专用工作流
workflow_data = {
'source_files': [test_file],
'target_functions': ['create_test_task', 'vTaskFunction'],
'workflow_config': {
'mode': 'freertos_verification',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': ['memory_safety', 'concurrency', 'pointer_validity'],
'freertos_config': {
'include_headers': ['FreeRTOS.h', 'task.h'],
'task_stack_size': 128,
'task_priority': 1,
'timeout_ms': 1000
}
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
# 等待工作流完成
self._wait_for_workflow_completion(workflow_id)
# 验证结果
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
assert final_result['workflow']['status'] == 'completed'
# 验证FreeRTOS特定的统计
stats = final_result['workflow']['statistics']
assert 'total_functions' in stats
assert stats['total_functions'] == 2
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_freertos_queue_operations_verification(self):
"""测试FreeRTOS队列操作验证"""
freertos_code = """
#include "FreeRTOS.h"
#include "queue.h"
#define QUEUE_LENGTH 10
#define ITEM_SIZE sizeof(uint32_t)
void queue_producer_task(void *pvParameters) {
QueueHandle_t xQueue = (QueueHandle_t)pvParameters;
uint32_t value = 0;
for (;;) {
value++;
if (xQueueSend(xQueue, &value, pdMS_TO_TICKS(100)) == pdPASS) {
// Item sent successfully
}
vTaskDelay(pdMS_TO_TICKS(50));
}
}
void queue_consumer_task(void *pvParameters) {
QueueHandle_t xQueue = (QueueHandle_t)pvParameters;
uint32_t received_value;
for (;;) {
if (xQueueReceive(xQueue, &received_value, pdMS_TO_TICKS(200)) == pdPASS) {
// Item received successfully
}
vTaskDelay(pdMS_TO_TICKS(50));
}
}
void create_queue_example(void) {
QueueHandle_t xQueue;
xQueue = xQueueCreate(QUEUE_LENGTH, ITEM_SIZE);
if (xQueue != NULL) {
// Queue created successfully
// Create producer and consumer tasks
xTaskCreate(queue_producer_task, "Producer", configMINIMAL_STACK_SIZE, xQueue, 1, NULL);
xTaskCreate(queue_consumer_task, "Consumer", configMINIMAL_STACK_SIZE, xQueue, 1, NULL);
// Let tasks run for a while
vTaskDelay(pdMS_TO_TICKS(1000));
// Cleanup
vQueueDelete(xQueue);
}
}
"""
test_file = os.path.join(self.temp_dir, "freertos_queue_test.c")
with open(test_file, 'w') as f:
f.write(freertos_code)
try:
workflow_data = {
'source_files': [test_file],
'target_functions': ['queue_producer_task', 'queue_consumer_task', 'create_queue_example'],
'workflow_config': {
'mode': 'freertos_verification',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': ['memory_safety', 'concurrency', 'queue_safety'],
'freertos_config': {
'include_headers': ['FreeRTOS.h', 'queue.h'],
'queue_length': 10,
'queue_item_size': 4,
'timeout_ms': 2000
}
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
self._wait_for_workflow_completion(workflow_id)
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_freertos_semaphore_verification(self):
"""测试FreeRTOS信号量验证"""
freertos_code = """
#include "FreeRTOS.h"
#include "semphr.h"
SemaphoreHandle_t xSemaphore;
void task_using_semaphore(void *pvParameters) {
for (;;) {
// Try to take the semaphore
if (xSemaphoreTake(xSemaphore, pdMS_TO_TICKS(100)) == pdTRUE) {
// Critical section
// Access shared resource
// Release the semaphore
xSemaphoreGive(xSemaphore);
}
vTaskDelay(pdMS_TO_TICKS(50));
}
}
void create_semaphore_example(void) {
// Create a binary semaphore
xSemaphore = xSemaphoreCreateBinary();
if (xSemaphore != NULL) {
// Give the semaphore initially
xSemaphoreGive(xSemaphore);
// Create tasks that use the semaphore
for (int i = 0; i < 3; i++) {
xTaskCreate(task_using_semaphore, "SemaphoreTask", configMINIMAL_STACK_SIZE, NULL, 1, NULL);
}
// Let tasks run
vTaskDelay(pdMS_TO_TICKS(2000));
// Cleanup
vSemaphoreDelete(xSemaphore);
}
}
"""
test_file = os.path.join(self.temp_dir, "freertos_semaphore_test.c")
with open(test_file, 'w') as f:
f.write(freertos_code)
try:
workflow_data = {
'source_files': [test_file],
'target_functions': ['task_using_semaphore', 'create_semaphore_example'],
'workflow_config': {
'mode': 'freertos_verification',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': ['memory_safety', 'concurrency', 'race_condition'],
'freertos_config': {
'include_headers': ['FreeRTOS.h', 'semphr.h'],
'max_tasks': 3,
'timeout_ms': 3000
}
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
self._wait_for_workflow_completion(workflow_id)
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_freertos_timer_verification(self):
"""测试FreeRTOS定时器验证"""
freertos_code = """
#include "FreeRTOS.h"
#include "timers.h"
TimerHandle_t xTimer;
void vTimerCallback(TimerHandle_t xTimer) {
// Timer callback function
uint32_t timer_id = (uint32_t)pvTimerGetTimerID(xTimer);
// Handle timer event
// This could be a periodic operation
}
void create_timer_example(void) {
// Create a software timer
xTimer = xTimerCreate(
"Timer", // Text name
pdMS_TO_TICKS(1000), // Period
pdTRUE, // Auto-reload
(void *)1, // Timer ID
vTimerCallback // Callback function
);
if (xTimer != NULL) {
// Start the timer
if (xTimerStart(xTimer, 0) == pdPASS) {
// Timer started successfully
// Let it run for a while
vTaskDelay(pdMS_TO_TICKS(5000));
// Stop and delete the timer
xTimerStop(xTimer, 0);
xTimerDelete(xTimer);
}
}
}
"""
test_file = os.path.join(self.temp_dir, "freertos_timer_test.c")
with open(test_file, 'w') as f:
f.write(freertos_code)
try:
workflow_data = {
'source_files': [test_file],
'target_functions': ['vTimerCallback', 'create_timer_example'],
'workflow_config': {
'mode': 'freertos_verification',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': ['memory_safety', 'timer_safety'],
'freertos_config': {
'include_headers': ['FreeRTOS.h', 'timers.h'],
'timer_period_ms': 1000,
'timeout_ms': 6000
}
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
self._wait_for_workflow_completion(workflow_id)
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_freertos_interrupt_handling_verification(self):
"""测试FreeRTOS中断处理验证"""
freertos_code = """
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
// Queue for interrupt-to-task communication
QueueHandle_t xInterruptQueue;
// Interrupt service routine
void vISR_Handler(void) {
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
uint32_t interrupt_value = 42;
// Send data from ISR to task
xQueueSendFromISR(xInterruptQueue, &interrupt_value, &xHigherPriorityTaskWoken);
// Context switch if needed
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}
// Task that processes interrupt data
void vInterruptHandlerTask(void *pvParameters) {
uint32_t received_value;
for (;;) {
if (xQueueReceive(xInterruptQueue, &received_value, portMAX_DELAY) == pdPASS) {
// Process interrupt data
if (received_value == 42) {
// Handle specific interrupt
}
}
}
}
void setup_interrupt_handling(void) {
// Create queue for interrupt communication
xInterruptQueue = xQueueCreate(10, sizeof(uint32_t));
if (xInterruptQueue != NULL) {
// Create task to handle interrupts
xTaskCreate(
vInterruptHandlerTask,
"IntHandler",
configMINIMAL_STACK_SIZE,
NULL,
tskIDLE_PRIORITY + 2,
NULL
);
// Setup interrupt (simulation)
// In real code, this would configure hardware interrupts
// Let the system run
vTaskDelay(pdMS_TO_TICKS(1000));
// Cleanup
vQueueDelete(xInterruptQueue);
}
}
"""
test_file = os.path.join(self.temp_dir, "freertos_interrupt_test.c")
with open(test_file, 'w') as f:
f.write(freertos_code)
try:
workflow_data = {
'source_files': [test_file],
'target_functions': ['vISR_Handler', 'vInterruptHandlerTask', 'setup_interrupt_handling'],
'workflow_config': {
'mode': 'freertos_verification',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': ['memory_safety', 'concurrency', 'interrupt_safety'],
'freertos_config': {
'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h'],
'queue_size': 10,
'task_priority': 2,
'timeout_ms': 2000
}
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
self._wait_for_workflow_completion(workflow_id)
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_freertos_complex_scenario_verification(self):
"""测试FreeRTOS复杂场景验证"""
freertos_code = """
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include "semphr.h"
#include "timers.h"
// Shared resources
QueueHandle_t xCommandQueue;
SemaphoreHandle_t xResourceSemaphore;
TimerHandle_t xSystemTimer;
// Task states
typedef enum {
TASK_STATE_IDLE,
TASK_STATE_PROCESSING,
TASK_STATE_COMPLETED
} TaskState_t;
// Task data structure
typedef struct {
TaskState_t state;
uint32_t processed_commands;
uint32_t error_count;
} TaskData_t;
void worker_task(void *pvParameters) {
TaskData_t *task_data = (TaskData_t *)pvParameters;
uint32_t command;
for (;;) {
// Wait for command
if (xQueueReceive(xCommandQueue, &command, pdMS_TO_TICKS(100)) == pdPASS) {
// Acquire resource semaphore
if (xSemaphoreTake(xResourceSemaphore, pdMS_TO_TICKS(50)) == pdTRUE) {
// Process command
task_data->state = TASK_STATE_PROCESSING;
// Simulate processing
if (command < 1000) {
task_data->processed_commands++;
} else {
task_data->error_count++;
}
task_data->state = TASK_STATE_COMPLETED;
// Release resource
xSemaphoreGive(xResourceSemaphore);
}
}
// Small delay to yield CPU
vTaskDelay(pdMS_TO_TICKS(10));
}
}
void timer_callback(TimerHandle_t xTimer) {
// Periodic system maintenance
uint32_t timer_id = (uint32_t)pvTimerGetTimerID(xTimer);
// Perform maintenance tasks
// This could include cleanup, statistics collection, etc.
}
void create_freertos_system(void) {
TaskData_t task_data = {TASK_STATE_IDLE, 0, 0};
// Create communication queue
xCommandQueue = xQueueCreate(20, sizeof(uint32_t));
// Create resource semaphore
xResourceSemaphore = xSemaphoreCreateMutex();
// Create system timer
xSystemTimer = xTimerCreate(
"SystemTimer",
pdMS_TO_TICKS(5000),
pdTRUE,
(void *)1,
timer_callback
);
if (xCommandQueue != NULL && xResourceSemaphore != NULL && xSystemTimer != NULL) {
// Create worker tasks
for (int i = 0; i < 3; i++) {
xTaskCreate(
worker_task,
"WorkerTask",
configMINIMAL_STACK_SIZE * 2,
&task_data,
tskIDLE_PRIORITY + 1,
NULL
);
}
// Start system timer
xTimerStart(xSystemTimer, 0);
// Send some test commands
uint32_t test_commands[] = {100, 200, 1500, 300, 2500};
for (int i = 0; i < 5; i++) {
xQueueSend(xCommandQueue, &test_commands[i], 0);
}
// Let system run
vTaskDelay(pdMS_TO_TICKS(3000));
// Cleanup
xTimerDelete(xSystemTimer);
vSemaphoreDelete(xResourceSemaphore);
vQueueDelete(xCommandQueue);
}
}
"""
test_file = os.path.join(self.temp_dir, "freertos_complex_test.c")
with open(test_file, 'w') as f:
f.write(freertos_code)
try:
workflow_data = {
'source_files': [test_file],
'target_functions': ['worker_task', 'timer_callback', 'create_freertos_system'],
'workflow_config': {
'mode': 'freertos_comprehensive',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': [
'memory_safety', 'concurrency', 'race_condition',
'queue_safety', 'timer_safety', 'interrupt_safety'
],
'freertos_config': {
'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h', 'semphr.h', 'timers.h'],
'num_tasks': 3,
'queue_size': 20,
'timer_period_ms': 5000,
'timeout_ms': 5000
}
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
self._wait_for_workflow_completion(workflow_id)
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
# 验证复杂场景的统计
stats = final_result['workflow']['statistics']
assert 'total_functions' in stats
assert stats['total_functions'] == 3
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_freertos_error_scenarios(self):
"""测试FreeRTOS错误场景"""
# 测试NULL指针处理
null_pointer_code = """
#include "FreeRTOS.h"
#include "task.h"
void task_with_null_check(void *pvParameters) {
TaskHandle_t task_handle = (TaskHandle_t)pvParameters;
// Should handle NULL task handle gracefully
if (task_handle != NULL) {
vTaskDelete(task_handle);
}
}
"""
# 测试队列溢出
queue_overflow_code = """
#include "FreeRTOS.h"
#include "queue.h"
void queue_overflow_test(void) {
QueueHandle_t queue = xQueueCreate(1, sizeof(int));
if (queue != NULL) {
int value = 42;
// This should fail gracefully when queue is full
xQueueSend(queue, &value, 0); // Should succeed
xQueueSend(queue, &value, 0); // Should fail (queue full)
vQueueDelete(queue);
}
}
"""
# 测试信号量死锁
semaphore_deadlock_code = """
#include "FreeRTOS.h"
#include "semphr.h"
void semaphore_deadlock_test(void) {
SemaphoreHandle_t sem1 = xSemaphoreCreateMutex();
SemaphoreHandle_t sem2 = xSemaphoreCreateMutex();
if (sem1 != NULL && sem2 != NULL) {
// Potential deadlock scenario
xSemaphoreTake(sem1, portMAX_DELAY);
xSemaphoreTake(sem2, portMAX_DELAY);
// Should release in reverse order to avoid deadlock
xSemaphoreGive(sem2);
xSemaphoreGive(sem1);
vSemaphoreDelete(sem1);
vSemaphoreDelete(sem2);
}
}
"""
error_scenarios = [
("null_pointer", null_pointer_code),
("queue_overflow", queue_overflow_code),
("semaphore_deadlock", semaphore_deadlock_code)
]
for scenario_name, scenario_code in error_scenarios:
test_file = os.path.join(self.temp_dir, f"freertos_error_{scenario_name}.c")
with open(test_file, 'w') as f:
f.write(scenario_code)
try:
workflow_data = {
'source_files': [test_file],
'target_functions': [f'task_with_null_check', f'queue_overflow_test', f'semaphore_deadlock_test'],
'workflow_config': {
'mode': 'freertos_error_detection',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': ['memory_safety', 'error_handling', 'null_pointer'],
'freertos_config': {
'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h', 'semphr.h'],
'timeout_ms': 2000
}
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
self._wait_for_workflow_completion(workflow_id)
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def _wait_for_workflow_completion(self, workflow_id: str, timeout: int = 120):
"""等待工作流完成"""
import time
start_time = time.time()
with self.app.test_client() as client:
while time.time() - start_time < timeout:
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
workflow_data = response.get_json()
if workflow_data['success']:
workflow = workflow_data['workflow']
if workflow['status'] in ['completed', 'failed', 'cancelled']:
return
time.sleep(2)
pytest.fail(f"FreeRTOS workflow {workflow_id} did not complete within {timeout} seconds")
class TestFreeRTOSSpecificFeatures:
"""FreeRTOS特定功能测试"""
def setup_method(self):
"""测试方法设置"""
self.config = get_config()
self.temp_dir = tempfile.mkdtemp()
def teardown_method(self):
"""测试方法清理"""
import shutil
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def test_freertos_header_detection(self):
"""测试FreeRTOS头文件检测"""
from src.parser import CodeParser
parser = CodeParser(self.config)
freertos_code = """
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
void freertos_function(void) {
// FreeRTOS specific code
}
"""
result = parser.parse_code(freertos_code)
assert result.success
# 验证检测到FreeRTOS头文件
assert any('FreeRTOS.h' in str(include) for include in result.includes)
def test_freertos_api_recognition(self):
"""测试FreeRTOS API识别"""
from src.parser import CodeParser
parser = CodeParser(self.config)
freertos_code = """
#include "FreeRTOS.h"
#include "task.h"
void api_test(void) {
TaskHandle_t handle;
xTaskCreate(vTaskFunction, "Task", 128, NULL, 1, &handle);
vTaskDelay(100);
vTaskDelete(handle);
}
"""
result = parser.parse_code(freertos_code)
assert result.success
# 验证识别到FreeRTOS API调用
function = result.functions[0]
assert 'xTaskCreate' in str(function.body)
assert 'vTaskDelay' in str(function.body)
assert 'vTaskDelete' in str(function.body)
def test_freertos_task_parameter_handling(self):
"""测试FreeRTOS任务参数处理"""
from src.spec import SpecGenerator
generator = SpecGenerator(self.config)
# 模拟任务函数的元数据
task_metadata = {
'name': 'vTaskFunction',
'return_type': 'void',
'parameters': [
{'name': 'pvParameters', 'type': 'void *'}
],
'complexity_score': 0.6,
'is_freertos_task': True
}
with patch.object(generator, 'generate_specification') as mock_generate:
mock_generate.return_value = {
'specification': 'void vTaskFunction(void *pvParameters) { __CPROVER_assume(pvParameters != NULL); }',
'confidence': 0.8,
'explanation': 'FreeRTOS task specification with parameter validation'
}
result = generator.generate_specification(task_metadata)
assert 'pvParameters' in result['specification']
def test_freertos_concurrency_verification(self):
"""测试FreeRTOS并发验证"""
from src.mutate import MutationEngine
specification = """
void concurrent_task(void *pvParameters) {
int *shared_counter = (int *)pvParameters;
// Critical section without protection
(*shared_counter)++;
}
"""
function_metadata = {
'name': 'concurrent_task',
'parameters': [{'name': 'pvParameters', 'type': 'void *'}],
'is_freertos_task': True
}
config = Mock()
config.max_candidates_per_function = 10
config.max_selected_candidates = 5
config.quality_threshold = 0.6
mutator = MutationEngine(config)
with patch.object(mutator, 'execute_mutation') as mock_mutate:
mock_result = Mock()
mock_result.selected_candidates = [
Mock(
mutated_spec='void concurrent_task(void *pvParameters) { '
'int *shared_counter = (int *)pvParameters; '
'// Add semaphore protection for critical section '
'(*shared_counter)++; '
'}',
confidence=0.8
)
]
mock_mutate.return_value = mock_result
result = mutator.execute_mutation(specification, function_metadata)
assert len(result.selected_candidates) > 0
def test_freertos_memory_safety_verification(self):
"""测试FreeRTOS内存安全验证"""
from src.verify import CBMCRunner
freertos_spec = """
void memory_safe_task(void *pvParameters) {
TaskHandle_t *task_handle = (TaskHandle_t *)pvParameters;
if (task_handle != NULL && *task_handle != NULL) {
// Safe task handle access
vTaskDelete(*task_handle);
}
}
"""
function_metadata = {
'name': 'memory_safe_task',
'parameters': [{'name': 'pvParameters', 'type': 'void *'}]
}
verifier = CBMCRunner()
with patch.object(verifier, 'run_verification') as mock_verify:
mock_result = Mock()
mock_result.status = Mock()
mock_result.status.value = 'successful'
mock_verify.return_value = mock_result
async def test_verification():
result = await verifier.run_verification(function_metadata, "test.c", freertos_spec)
assert result.status.value == 'successful'
asyncio.run(test_verification())
if __name__ == "__main__":
pytest.main([__file__, "-v"])