|
|
"""
|
|
|
FreeRTOS验证集成测试
|
|
|
|
|
|
本模块为FreeRTOS验证提供专门的集成测试,包括:
|
|
|
- FreeRTOS任务创建验证
|
|
|
- FreeRTOS队列操作验证
|
|
|
- FreeRTOS信号量验证
|
|
|
- FreeRTOS定时器验证
|
|
|
- 并发性验证
|
|
|
- 中断处理验证
|
|
|
"""
|
|
|
|
|
|
import pytest
|
|
|
import asyncio
|
|
|
import tempfile
|
|
|
import json
|
|
|
import os
|
|
|
from pathlib import Path
|
|
|
from unittest.mock import Mock, patch, AsyncMock, MagicMock
|
|
|
from typing import Dict, List, Any, Optional
|
|
|
import time
|
|
|
|
|
|
from src.ui.workflow import WorkflowManager, WorkflowConfig, WorkflowMode
|
|
|
from src.ui.job_manager import JobManager, JobStatus, JobType
|
|
|
from src.ui.api import init_api
|
|
|
from src.ui.web_app import create_app
|
|
|
from src.utils.config import get_config, ConfigManager
|
|
|
|
|
|
# 导入测试固件
|
|
|
from tests.fixtures.cbmc_outputs import cbmc_fixtures
|
|
|
from tests.fixtures.llm_responses import llm_fixtures
|
|
|
|
|
|
|
|
|
class TestFreeRTOSVerificationIntegration:
|
|
|
"""FreeRTOS验证集成测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.config = get_config()
|
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
self.config.working_directory = self.temp_dir
|
|
|
|
|
|
# 初始化各个组件
|
|
|
self.job_manager = JobManager(self.config)
|
|
|
self.workflow_manager = WorkflowManager(self.job_manager, config=self.config)
|
|
|
|
|
|
# 创建测试应用
|
|
|
self.app = create_app(self.config)
|
|
|
self.app.config['TESTING'] = True
|
|
|
self.app.config['SECRET_KEY'] = 'test-secret'
|
|
|
|
|
|
# 初始化API
|
|
|
init_api(
|
|
|
config=self.config,
|
|
|
job_manager=self.job_manager,
|
|
|
workflow_manager=self.workflow_manager,
|
|
|
websocket_handler=None
|
|
|
)
|
|
|
|
|
|
def teardown_method(self):
|
|
|
"""测试方法清理"""
|
|
|
import shutil
|
|
|
if os.path.exists(self.temp_dir):
|
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
|
|
@pytest.mark.skipif(not os.environ.get('FREERTOS_PATH'), reason="FreeRTOS headers not available")
|
|
|
def test_freertos_task_creation_verification(self):
|
|
|
"""测试FreeRTOS任务创建验证"""
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "task.h"
|
|
|
|
|
|
void vTaskFunction(void *pvParameters) {
|
|
|
int *param = (int *)pvParameters;
|
|
|
|
|
|
for (;;) {
|
|
|
// Task implementation
|
|
|
if (*param > 0) {
|
|
|
(*param)--;
|
|
|
}
|
|
|
vTaskDelay(pdMS_TO_TICKS(100));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void create_test_task(void) {
|
|
|
int task_param = 10;
|
|
|
TaskHandle_t xHandle = NULL;
|
|
|
|
|
|
BaseType_t xResult = xTaskCreate(
|
|
|
vTaskFunction,
|
|
|
"TestTask",
|
|
|
configMINIMAL_STACK_SIZE,
|
|
|
&task_param,
|
|
|
tskIDLE_PRIORITY + 1,
|
|
|
&xHandle
|
|
|
);
|
|
|
|
|
|
if (xResult == pdPASS) {
|
|
|
// Task created successfully
|
|
|
vTaskDelete(xHandle);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
test_file = os.path.join(self.temp_dir, "freertos_task_test.c")
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(freertos_code)
|
|
|
|
|
|
try:
|
|
|
# 创建FreeRTOS专用工作流
|
|
|
workflow_data = {
|
|
|
'source_files': [test_file],
|
|
|
'target_functions': ['create_test_task', 'vTaskFunction'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'freertos_verification',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True,
|
|
|
'verification_types': ['memory_safety', 'concurrency', 'pointer_validity'],
|
|
|
'freertos_config': {
|
|
|
'include_headers': ['FreeRTOS.h', 'task.h'],
|
|
|
'task_stack_size': 128,
|
|
|
'task_priority': 1,
|
|
|
'timeout_ms': 1000
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
workflow_result = response.get_json()
|
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
|
|
# 等待工作流完成
|
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
|
|
# 验证结果
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
final_result = response.get_json()
|
|
|
assert final_result['success'] is True
|
|
|
assert final_result['workflow']['status'] == 'completed'
|
|
|
|
|
|
# 验证FreeRTOS特定的统计
|
|
|
stats = final_result['workflow']['statistics']
|
|
|
assert 'total_functions' in stats
|
|
|
assert stats['total_functions'] == 2
|
|
|
|
|
|
finally:
|
|
|
if os.path.exists(test_file):
|
|
|
os.unlink(test_file)
|
|
|
|
|
|
def test_freertos_queue_operations_verification(self):
|
|
|
"""测试FreeRTOS队列操作验证"""
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "queue.h"
|
|
|
|
|
|
#define QUEUE_LENGTH 10
|
|
|
#define ITEM_SIZE sizeof(uint32_t)
|
|
|
|
|
|
void queue_producer_task(void *pvParameters) {
|
|
|
QueueHandle_t xQueue = (QueueHandle_t)pvParameters;
|
|
|
uint32_t value = 0;
|
|
|
|
|
|
for (;;) {
|
|
|
value++;
|
|
|
if (xQueueSend(xQueue, &value, pdMS_TO_TICKS(100)) == pdPASS) {
|
|
|
// Item sent successfully
|
|
|
}
|
|
|
vTaskDelay(pdMS_TO_TICKS(50));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void queue_consumer_task(void *pvParameters) {
|
|
|
QueueHandle_t xQueue = (QueueHandle_t)pvParameters;
|
|
|
uint32_t received_value;
|
|
|
|
|
|
for (;;) {
|
|
|
if (xQueueReceive(xQueue, &received_value, pdMS_TO_TICKS(200)) == pdPASS) {
|
|
|
// Item received successfully
|
|
|
}
|
|
|
vTaskDelay(pdMS_TO_TICKS(50));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void create_queue_example(void) {
|
|
|
QueueHandle_t xQueue;
|
|
|
|
|
|
xQueue = xQueueCreate(QUEUE_LENGTH, ITEM_SIZE);
|
|
|
|
|
|
if (xQueue != NULL) {
|
|
|
// Queue created successfully
|
|
|
|
|
|
// Create producer and consumer tasks
|
|
|
xTaskCreate(queue_producer_task, "Producer", configMINIMAL_STACK_SIZE, xQueue, 1, NULL);
|
|
|
xTaskCreate(queue_consumer_task, "Consumer", configMINIMAL_STACK_SIZE, xQueue, 1, NULL);
|
|
|
|
|
|
// Let tasks run for a while
|
|
|
vTaskDelay(pdMS_TO_TICKS(1000));
|
|
|
|
|
|
// Cleanup
|
|
|
vQueueDelete(xQueue);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
test_file = os.path.join(self.temp_dir, "freertos_queue_test.c")
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(freertos_code)
|
|
|
|
|
|
try:
|
|
|
workflow_data = {
|
|
|
'source_files': [test_file],
|
|
|
'target_functions': ['queue_producer_task', 'queue_consumer_task', 'create_queue_example'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'freertos_verification',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True,
|
|
|
'verification_types': ['memory_safety', 'concurrency', 'queue_safety'],
|
|
|
'freertos_config': {
|
|
|
'include_headers': ['FreeRTOS.h', 'queue.h'],
|
|
|
'queue_length': 10,
|
|
|
'queue_item_size': 4,
|
|
|
'timeout_ms': 2000
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
workflow_result = response.get_json()
|
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
final_result = response.get_json()
|
|
|
assert final_result['success'] is True
|
|
|
|
|
|
finally:
|
|
|
if os.path.exists(test_file):
|
|
|
os.unlink(test_file)
|
|
|
|
|
|
def test_freertos_semaphore_verification(self):
|
|
|
"""测试FreeRTOS信号量验证"""
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "semphr.h"
|
|
|
|
|
|
SemaphoreHandle_t xSemaphore;
|
|
|
|
|
|
void task_using_semaphore(void *pvParameters) {
|
|
|
for (;;) {
|
|
|
// Try to take the semaphore
|
|
|
if (xSemaphoreTake(xSemaphore, pdMS_TO_TICKS(100)) == pdTRUE) {
|
|
|
// Critical section
|
|
|
// Access shared resource
|
|
|
|
|
|
// Release the semaphore
|
|
|
xSemaphoreGive(xSemaphore);
|
|
|
}
|
|
|
vTaskDelay(pdMS_TO_TICKS(50));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void create_semaphore_example(void) {
|
|
|
// Create a binary semaphore
|
|
|
xSemaphore = xSemaphoreCreateBinary();
|
|
|
|
|
|
if (xSemaphore != NULL) {
|
|
|
// Give the semaphore initially
|
|
|
xSemaphoreGive(xSemaphore);
|
|
|
|
|
|
// Create tasks that use the semaphore
|
|
|
for (int i = 0; i < 3; i++) {
|
|
|
xTaskCreate(task_using_semaphore, "SemaphoreTask", configMINIMAL_STACK_SIZE, NULL, 1, NULL);
|
|
|
}
|
|
|
|
|
|
// Let tasks run
|
|
|
vTaskDelay(pdMS_TO_TICKS(2000));
|
|
|
|
|
|
// Cleanup
|
|
|
vSemaphoreDelete(xSemaphore);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
test_file = os.path.join(self.temp_dir, "freertos_semaphore_test.c")
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(freertos_code)
|
|
|
|
|
|
try:
|
|
|
workflow_data = {
|
|
|
'source_files': [test_file],
|
|
|
'target_functions': ['task_using_semaphore', 'create_semaphore_example'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'freertos_verification',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True,
|
|
|
'verification_types': ['memory_safety', 'concurrency', 'race_condition'],
|
|
|
'freertos_config': {
|
|
|
'include_headers': ['FreeRTOS.h', 'semphr.h'],
|
|
|
'max_tasks': 3,
|
|
|
'timeout_ms': 3000
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
workflow_result = response.get_json()
|
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
final_result = response.get_json()
|
|
|
assert final_result['success'] is True
|
|
|
|
|
|
finally:
|
|
|
if os.path.exists(test_file):
|
|
|
os.unlink(test_file)
|
|
|
|
|
|
def test_freertos_timer_verification(self):
|
|
|
"""测试FreeRTOS定时器验证"""
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "timers.h"
|
|
|
|
|
|
TimerHandle_t xTimer;
|
|
|
|
|
|
void vTimerCallback(TimerHandle_t xTimer) {
|
|
|
// Timer callback function
|
|
|
uint32_t timer_id = (uint32_t)pvTimerGetTimerID(xTimer);
|
|
|
|
|
|
// Handle timer event
|
|
|
// This could be a periodic operation
|
|
|
}
|
|
|
|
|
|
void create_timer_example(void) {
|
|
|
// Create a software timer
|
|
|
xTimer = xTimerCreate(
|
|
|
"Timer", // Text name
|
|
|
pdMS_TO_TICKS(1000), // Period
|
|
|
pdTRUE, // Auto-reload
|
|
|
(void *)1, // Timer ID
|
|
|
vTimerCallback // Callback function
|
|
|
);
|
|
|
|
|
|
if (xTimer != NULL) {
|
|
|
// Start the timer
|
|
|
if (xTimerStart(xTimer, 0) == pdPASS) {
|
|
|
// Timer started successfully
|
|
|
|
|
|
// Let it run for a while
|
|
|
vTaskDelay(pdMS_TO_TICKS(5000));
|
|
|
|
|
|
// Stop and delete the timer
|
|
|
xTimerStop(xTimer, 0);
|
|
|
xTimerDelete(xTimer);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
test_file = os.path.join(self.temp_dir, "freertos_timer_test.c")
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(freertos_code)
|
|
|
|
|
|
try:
|
|
|
workflow_data = {
|
|
|
'source_files': [test_file],
|
|
|
'target_functions': ['vTimerCallback', 'create_timer_example'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'freertos_verification',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True,
|
|
|
'verification_types': ['memory_safety', 'timer_safety'],
|
|
|
'freertos_config': {
|
|
|
'include_headers': ['FreeRTOS.h', 'timers.h'],
|
|
|
'timer_period_ms': 1000,
|
|
|
'timeout_ms': 6000
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
workflow_result = response.get_json()
|
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
final_result = response.get_json()
|
|
|
assert final_result['success'] is True
|
|
|
|
|
|
finally:
|
|
|
if os.path.exists(test_file):
|
|
|
os.unlink(test_file)
|
|
|
|
|
|
def test_freertos_interrupt_handling_verification(self):
|
|
|
"""测试FreeRTOS中断处理验证"""
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "task.h"
|
|
|
#include "queue.h"
|
|
|
|
|
|
// Queue for interrupt-to-task communication
|
|
|
QueueHandle_t xInterruptQueue;
|
|
|
|
|
|
// Interrupt service routine
|
|
|
void vISR_Handler(void) {
|
|
|
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
|
|
|
uint32_t interrupt_value = 42;
|
|
|
|
|
|
// Send data from ISR to task
|
|
|
xQueueSendFromISR(xInterruptQueue, &interrupt_value, &xHigherPriorityTaskWoken);
|
|
|
|
|
|
// Context switch if needed
|
|
|
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
|
|
|
}
|
|
|
|
|
|
// Task that processes interrupt data
|
|
|
void vInterruptHandlerTask(void *pvParameters) {
|
|
|
uint32_t received_value;
|
|
|
|
|
|
for (;;) {
|
|
|
if (xQueueReceive(xInterruptQueue, &received_value, portMAX_DELAY) == pdPASS) {
|
|
|
// Process interrupt data
|
|
|
if (received_value == 42) {
|
|
|
// Handle specific interrupt
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void setup_interrupt_handling(void) {
|
|
|
// Create queue for interrupt communication
|
|
|
xInterruptQueue = xQueueCreate(10, sizeof(uint32_t));
|
|
|
|
|
|
if (xInterruptQueue != NULL) {
|
|
|
// Create task to handle interrupts
|
|
|
xTaskCreate(
|
|
|
vInterruptHandlerTask,
|
|
|
"IntHandler",
|
|
|
configMINIMAL_STACK_SIZE,
|
|
|
NULL,
|
|
|
tskIDLE_PRIORITY + 2,
|
|
|
NULL
|
|
|
);
|
|
|
|
|
|
// Setup interrupt (simulation)
|
|
|
// In real code, this would configure hardware interrupts
|
|
|
|
|
|
// Let the system run
|
|
|
vTaskDelay(pdMS_TO_TICKS(1000));
|
|
|
|
|
|
// Cleanup
|
|
|
vQueueDelete(xInterruptQueue);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
test_file = os.path.join(self.temp_dir, "freertos_interrupt_test.c")
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(freertos_code)
|
|
|
|
|
|
try:
|
|
|
workflow_data = {
|
|
|
'source_files': [test_file],
|
|
|
'target_functions': ['vISR_Handler', 'vInterruptHandlerTask', 'setup_interrupt_handling'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'freertos_verification',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True,
|
|
|
'verification_types': ['memory_safety', 'concurrency', 'interrupt_safety'],
|
|
|
'freertos_config': {
|
|
|
'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h'],
|
|
|
'queue_size': 10,
|
|
|
'task_priority': 2,
|
|
|
'timeout_ms': 2000
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
workflow_result = response.get_json()
|
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
final_result = response.get_json()
|
|
|
assert final_result['success'] is True
|
|
|
|
|
|
finally:
|
|
|
if os.path.exists(test_file):
|
|
|
os.unlink(test_file)
|
|
|
|
|
|
def test_freertos_complex_scenario_verification(self):
|
|
|
"""测试FreeRTOS复杂场景验证"""
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "task.h"
|
|
|
#include "queue.h"
|
|
|
#include "semphr.h"
|
|
|
#include "timers.h"
|
|
|
|
|
|
// Shared resources
|
|
|
QueueHandle_t xCommandQueue;
|
|
|
SemaphoreHandle_t xResourceSemaphore;
|
|
|
TimerHandle_t xSystemTimer;
|
|
|
|
|
|
// Task states
|
|
|
typedef enum {
|
|
|
TASK_STATE_IDLE,
|
|
|
TASK_STATE_PROCESSING,
|
|
|
TASK_STATE_COMPLETED
|
|
|
} TaskState_t;
|
|
|
|
|
|
// Task data structure
|
|
|
typedef struct {
|
|
|
TaskState_t state;
|
|
|
uint32_t processed_commands;
|
|
|
uint32_t error_count;
|
|
|
} TaskData_t;
|
|
|
|
|
|
void worker_task(void *pvParameters) {
|
|
|
TaskData_t *task_data = (TaskData_t *)pvParameters;
|
|
|
uint32_t command;
|
|
|
|
|
|
for (;;) {
|
|
|
// Wait for command
|
|
|
if (xQueueReceive(xCommandQueue, &command, pdMS_TO_TICKS(100)) == pdPASS) {
|
|
|
// Acquire resource semaphore
|
|
|
if (xSemaphoreTake(xResourceSemaphore, pdMS_TO_TICKS(50)) == pdTRUE) {
|
|
|
// Process command
|
|
|
task_data->state = TASK_STATE_PROCESSING;
|
|
|
|
|
|
// Simulate processing
|
|
|
if (command < 1000) {
|
|
|
task_data->processed_commands++;
|
|
|
} else {
|
|
|
task_data->error_count++;
|
|
|
}
|
|
|
|
|
|
task_data->state = TASK_STATE_COMPLETED;
|
|
|
|
|
|
// Release resource
|
|
|
xSemaphoreGive(xResourceSemaphore);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Small delay to yield CPU
|
|
|
vTaskDelay(pdMS_TO_TICKS(10));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void timer_callback(TimerHandle_t xTimer) {
|
|
|
// Periodic system maintenance
|
|
|
uint32_t timer_id = (uint32_t)pvTimerGetTimerID(xTimer);
|
|
|
|
|
|
// Perform maintenance tasks
|
|
|
// This could include cleanup, statistics collection, etc.
|
|
|
}
|
|
|
|
|
|
void create_freertos_system(void) {
|
|
|
TaskData_t task_data = {TASK_STATE_IDLE, 0, 0};
|
|
|
|
|
|
// Create communication queue
|
|
|
xCommandQueue = xQueueCreate(20, sizeof(uint32_t));
|
|
|
|
|
|
// Create resource semaphore
|
|
|
xResourceSemaphore = xSemaphoreCreateMutex();
|
|
|
|
|
|
// Create system timer
|
|
|
xSystemTimer = xTimerCreate(
|
|
|
"SystemTimer",
|
|
|
pdMS_TO_TICKS(5000),
|
|
|
pdTRUE,
|
|
|
(void *)1,
|
|
|
timer_callback
|
|
|
);
|
|
|
|
|
|
if (xCommandQueue != NULL && xResourceSemaphore != NULL && xSystemTimer != NULL) {
|
|
|
// Create worker tasks
|
|
|
for (int i = 0; i < 3; i++) {
|
|
|
xTaskCreate(
|
|
|
worker_task,
|
|
|
"WorkerTask",
|
|
|
configMINIMAL_STACK_SIZE * 2,
|
|
|
&task_data,
|
|
|
tskIDLE_PRIORITY + 1,
|
|
|
NULL
|
|
|
);
|
|
|
}
|
|
|
|
|
|
// Start system timer
|
|
|
xTimerStart(xSystemTimer, 0);
|
|
|
|
|
|
// Send some test commands
|
|
|
uint32_t test_commands[] = {100, 200, 1500, 300, 2500};
|
|
|
for (int i = 0; i < 5; i++) {
|
|
|
xQueueSend(xCommandQueue, &test_commands[i], 0);
|
|
|
}
|
|
|
|
|
|
// Let system run
|
|
|
vTaskDelay(pdMS_TO_TICKS(3000));
|
|
|
|
|
|
// Cleanup
|
|
|
xTimerDelete(xSystemTimer);
|
|
|
vSemaphoreDelete(xResourceSemaphore);
|
|
|
vQueueDelete(xCommandQueue);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
test_file = os.path.join(self.temp_dir, "freertos_complex_test.c")
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(freertos_code)
|
|
|
|
|
|
try:
|
|
|
workflow_data = {
|
|
|
'source_files': [test_file],
|
|
|
'target_functions': ['worker_task', 'timer_callback', 'create_freertos_system'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'freertos_comprehensive',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True,
|
|
|
'verification_types': [
|
|
|
'memory_safety', 'concurrency', 'race_condition',
|
|
|
'queue_safety', 'timer_safety', 'interrupt_safety'
|
|
|
],
|
|
|
'freertos_config': {
|
|
|
'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h', 'semphr.h', 'timers.h'],
|
|
|
'num_tasks': 3,
|
|
|
'queue_size': 20,
|
|
|
'timer_period_ms': 5000,
|
|
|
'timeout_ms': 5000
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
workflow_result = response.get_json()
|
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
final_result = response.get_json()
|
|
|
assert final_result['success'] is True
|
|
|
|
|
|
# 验证复杂场景的统计
|
|
|
stats = final_result['workflow']['statistics']
|
|
|
assert 'total_functions' in stats
|
|
|
assert stats['total_functions'] == 3
|
|
|
|
|
|
finally:
|
|
|
if os.path.exists(test_file):
|
|
|
os.unlink(test_file)
|
|
|
|
|
|
def test_freertos_error_scenarios(self):
|
|
|
"""测试FreeRTOS错误场景"""
|
|
|
# 测试NULL指针处理
|
|
|
null_pointer_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "task.h"
|
|
|
|
|
|
void task_with_null_check(void *pvParameters) {
|
|
|
TaskHandle_t task_handle = (TaskHandle_t)pvParameters;
|
|
|
|
|
|
// Should handle NULL task handle gracefully
|
|
|
if (task_handle != NULL) {
|
|
|
vTaskDelete(task_handle);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
# 测试队列溢出
|
|
|
queue_overflow_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "queue.h"
|
|
|
|
|
|
void queue_overflow_test(void) {
|
|
|
QueueHandle_t queue = xQueueCreate(1, sizeof(int));
|
|
|
|
|
|
if (queue != NULL) {
|
|
|
int value = 42;
|
|
|
|
|
|
// This should fail gracefully when queue is full
|
|
|
xQueueSend(queue, &value, 0); // Should succeed
|
|
|
xQueueSend(queue, &value, 0); // Should fail (queue full)
|
|
|
|
|
|
vQueueDelete(queue);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
# 测试信号量死锁
|
|
|
semaphore_deadlock_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "semphr.h"
|
|
|
|
|
|
void semaphore_deadlock_test(void) {
|
|
|
SemaphoreHandle_t sem1 = xSemaphoreCreateMutex();
|
|
|
SemaphoreHandle_t sem2 = xSemaphoreCreateMutex();
|
|
|
|
|
|
if (sem1 != NULL && sem2 != NULL) {
|
|
|
// Potential deadlock scenario
|
|
|
xSemaphoreTake(sem1, portMAX_DELAY);
|
|
|
xSemaphoreTake(sem2, portMAX_DELAY);
|
|
|
|
|
|
// Should release in reverse order to avoid deadlock
|
|
|
xSemaphoreGive(sem2);
|
|
|
xSemaphoreGive(sem1);
|
|
|
|
|
|
vSemaphoreDelete(sem1);
|
|
|
vSemaphoreDelete(sem2);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
error_scenarios = [
|
|
|
("null_pointer", null_pointer_code),
|
|
|
("queue_overflow", queue_overflow_code),
|
|
|
("semaphore_deadlock", semaphore_deadlock_code)
|
|
|
]
|
|
|
|
|
|
for scenario_name, scenario_code in error_scenarios:
|
|
|
test_file = os.path.join(self.temp_dir, f"freertos_error_{scenario_name}.c")
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(scenario_code)
|
|
|
|
|
|
try:
|
|
|
workflow_data = {
|
|
|
'source_files': [test_file],
|
|
|
'target_functions': [f'task_with_null_check', f'queue_overflow_test', f'semaphore_deadlock_test'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'freertos_error_detection',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True,
|
|
|
'verification_types': ['memory_safety', 'error_handling', 'null_pointer'],
|
|
|
'freertos_config': {
|
|
|
'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h', 'semphr.h'],
|
|
|
'timeout_ms': 2000
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
workflow_result = response.get_json()
|
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
final_result = response.get_json()
|
|
|
assert final_result['success'] is True
|
|
|
|
|
|
finally:
|
|
|
if os.path.exists(test_file):
|
|
|
os.unlink(test_file)
|
|
|
|
|
|
def _wait_for_workflow_completion(self, workflow_id: str, timeout: int = 120):
|
|
|
"""等待工作流完成"""
|
|
|
import time
|
|
|
start_time = time.time()
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
while time.time() - start_time < timeout:
|
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
|
assert response.status_code == 200
|
|
|
workflow_data = response.get_json()
|
|
|
|
|
|
if workflow_data['success']:
|
|
|
workflow = workflow_data['workflow']
|
|
|
if workflow['status'] in ['completed', 'failed', 'cancelled']:
|
|
|
return
|
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
pytest.fail(f"FreeRTOS workflow {workflow_id} did not complete within {timeout} seconds")
|
|
|
|
|
|
|
|
|
class TestFreeRTOSSpecificFeatures:
|
|
|
"""FreeRTOS特定功能测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.config = get_config()
|
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
|
|
def teardown_method(self):
|
|
|
"""测试方法清理"""
|
|
|
import shutil
|
|
|
if os.path.exists(self.temp_dir):
|
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
|
|
def test_freertos_header_detection(self):
|
|
|
"""测试FreeRTOS头文件检测"""
|
|
|
from src.parser import CodeParser
|
|
|
parser = CodeParser(self.config)
|
|
|
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "task.h"
|
|
|
#include "queue.h"
|
|
|
|
|
|
void freertos_function(void) {
|
|
|
// FreeRTOS specific code
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
result = parser.parse_code(freertos_code)
|
|
|
assert result.success
|
|
|
|
|
|
# 验证检测到FreeRTOS头文件
|
|
|
assert any('FreeRTOS.h' in str(include) for include in result.includes)
|
|
|
|
|
|
def test_freertos_api_recognition(self):
|
|
|
"""测试FreeRTOS API识别"""
|
|
|
from src.parser import CodeParser
|
|
|
parser = CodeParser(self.config)
|
|
|
|
|
|
freertos_code = """
|
|
|
#include "FreeRTOS.h"
|
|
|
#include "task.h"
|
|
|
|
|
|
void api_test(void) {
|
|
|
TaskHandle_t handle;
|
|
|
xTaskCreate(vTaskFunction, "Task", 128, NULL, 1, &handle);
|
|
|
vTaskDelay(100);
|
|
|
vTaskDelete(handle);
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
result = parser.parse_code(freertos_code)
|
|
|
assert result.success
|
|
|
|
|
|
# 验证识别到FreeRTOS API调用
|
|
|
function = result.functions[0]
|
|
|
assert 'xTaskCreate' in str(function.body)
|
|
|
assert 'vTaskDelay' in str(function.body)
|
|
|
assert 'vTaskDelete' in str(function.body)
|
|
|
|
|
|
def test_freertos_task_parameter_handling(self):
|
|
|
"""测试FreeRTOS任务参数处理"""
|
|
|
from src.spec import SpecGenerator
|
|
|
generator = SpecGenerator(self.config)
|
|
|
|
|
|
# 模拟任务函数的元数据
|
|
|
task_metadata = {
|
|
|
'name': 'vTaskFunction',
|
|
|
'return_type': 'void',
|
|
|
'parameters': [
|
|
|
{'name': 'pvParameters', 'type': 'void *'}
|
|
|
],
|
|
|
'complexity_score': 0.6,
|
|
|
'is_freertos_task': True
|
|
|
}
|
|
|
|
|
|
with patch.object(generator, 'generate_specification') as mock_generate:
|
|
|
mock_generate.return_value = {
|
|
|
'specification': 'void vTaskFunction(void *pvParameters) { __CPROVER_assume(pvParameters != NULL); }',
|
|
|
'confidence': 0.8,
|
|
|
'explanation': 'FreeRTOS task specification with parameter validation'
|
|
|
}
|
|
|
|
|
|
result = generator.generate_specification(task_metadata)
|
|
|
assert 'pvParameters' in result['specification']
|
|
|
|
|
|
def test_freertos_concurrency_verification(self):
|
|
|
"""测试FreeRTOS并发验证"""
|
|
|
from src.mutate import MutationEngine
|
|
|
|
|
|
specification = """
|
|
|
void concurrent_task(void *pvParameters) {
|
|
|
int *shared_counter = (int *)pvParameters;
|
|
|
|
|
|
// Critical section without protection
|
|
|
(*shared_counter)++;
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
function_metadata = {
|
|
|
'name': 'concurrent_task',
|
|
|
'parameters': [{'name': 'pvParameters', 'type': 'void *'}],
|
|
|
'is_freertos_task': True
|
|
|
}
|
|
|
|
|
|
config = Mock()
|
|
|
config.max_candidates_per_function = 10
|
|
|
config.max_selected_candidates = 5
|
|
|
config.quality_threshold = 0.6
|
|
|
|
|
|
mutator = MutationEngine(config)
|
|
|
|
|
|
with patch.object(mutator, 'execute_mutation') as mock_mutate:
|
|
|
mock_result = Mock()
|
|
|
mock_result.selected_candidates = [
|
|
|
Mock(
|
|
|
mutated_spec='void concurrent_task(void *pvParameters) { '
|
|
|
'int *shared_counter = (int *)pvParameters; '
|
|
|
'// Add semaphore protection for critical section '
|
|
|
'(*shared_counter)++; '
|
|
|
'}',
|
|
|
confidence=0.8
|
|
|
)
|
|
|
]
|
|
|
mock_mutate.return_value = mock_result
|
|
|
|
|
|
result = mutator.execute_mutation(specification, function_metadata)
|
|
|
assert len(result.selected_candidates) > 0
|
|
|
|
|
|
def test_freertos_memory_safety_verification(self):
|
|
|
"""测试FreeRTOS内存安全验证"""
|
|
|
from src.verify import CBMCRunner
|
|
|
|
|
|
freertos_spec = """
|
|
|
void memory_safe_task(void *pvParameters) {
|
|
|
TaskHandle_t *task_handle = (TaskHandle_t *)pvParameters;
|
|
|
|
|
|
if (task_handle != NULL && *task_handle != NULL) {
|
|
|
// Safe task handle access
|
|
|
vTaskDelete(*task_handle);
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
function_metadata = {
|
|
|
'name': 'memory_safe_task',
|
|
|
'parameters': [{'name': 'pvParameters', 'type': 'void *'}]
|
|
|
}
|
|
|
|
|
|
verifier = CBMCRunner()
|
|
|
|
|
|
with patch.object(verifier, 'run_verification') as mock_verify:
|
|
|
mock_result = Mock()
|
|
|
mock_result.status = Mock()
|
|
|
mock_result.status.value = 'successful'
|
|
|
mock_verify.return_value = mock_result
|
|
|
|
|
|
async def test_verification():
|
|
|
result = await verifier.run_verification(function_metadata, "test.c", freertos_spec)
|
|
|
assert result.status.value == 'successful'
|
|
|
|
|
|
asyncio.run(test_verification())
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
pytest.main([__file__, "-v"]) |