""" FreeRTOS验证集成测试 本模块为FreeRTOS验证提供专门的集成测试,包括: - FreeRTOS任务创建验证 - FreeRTOS队列操作验证 - FreeRTOS信号量验证 - FreeRTOS定时器验证 - 并发性验证 - 中断处理验证 """ import pytest import asyncio import tempfile import json import os from pathlib import Path from unittest.mock import Mock, patch, AsyncMock, MagicMock from typing import Dict, List, Any, Optional import time from src.ui.workflow import WorkflowManager, WorkflowConfig, WorkflowMode from src.ui.job_manager import JobManager, JobStatus, JobType from src.ui.api import init_api from src.ui.web_app import create_app from src.utils.config import get_config, ConfigManager # 导入测试固件 from tests.fixtures.cbmc_outputs import cbmc_fixtures from tests.fixtures.llm_responses import llm_fixtures class TestFreeRTOSVerificationIntegration: """FreeRTOS验证集成测试""" def setup_method(self): """测试方法设置""" self.config = get_config() self.temp_dir = tempfile.mkdtemp() self.config.working_directory = self.temp_dir # 初始化各个组件 self.job_manager = JobManager(self.config) self.workflow_manager = WorkflowManager(self.job_manager, config=self.config) # 创建测试应用 self.app = create_app(self.config) self.app.config['TESTING'] = True self.app.config['SECRET_KEY'] = 'test-secret' # 初始化API init_api( config=self.config, job_manager=self.job_manager, workflow_manager=self.workflow_manager, websocket_handler=None ) def teardown_method(self): """测试方法清理""" import shutil if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) @pytest.mark.skipif(not os.environ.get('FREERTOS_PATH'), reason="FreeRTOS headers not available") def test_freertos_task_creation_verification(self): """测试FreeRTOS任务创建验证""" freertos_code = """ #include "FreeRTOS.h" #include "task.h" void vTaskFunction(void *pvParameters) { int *param = (int *)pvParameters; for (;;) { // Task implementation if (*param > 0) { (*param)--; } vTaskDelay(pdMS_TO_TICKS(100)); } } void create_test_task(void) { int task_param = 10; TaskHandle_t xHandle = NULL; BaseType_t xResult = xTaskCreate( vTaskFunction, "TestTask", configMINIMAL_STACK_SIZE, &task_param, tskIDLE_PRIORITY + 1, &xHandle ); if (xResult == pdPASS) { // Task created successfully vTaskDelete(xHandle); } } """ test_file = os.path.join(self.temp_dir, "freertos_task_test.c") with open(test_file, 'w') as f: f.write(freertos_code) try: # 创建FreeRTOS专用工作流 workflow_data = { 'source_files': [test_file], 'target_functions': ['create_test_task', 'vTaskFunction'], 'workflow_config': { 'mode': 'freertos_verification', 'enable_parsing': True, 'enable_generation': True, 'enable_mutation': True, 'enable_verification': True, 'verification_types': ['memory_safety', 'concurrency', 'pointer_validity'], 'freertos_config': { 'include_headers': ['FreeRTOS.h', 'task.h'], 'task_stack_size': 128, 'task_priority': 1, 'timeout_ms': 1000 } } } with self.app.test_client() as client: response = client.post('/api/workflow', json=workflow_data) assert response.status_code == 200 workflow_result = response.get_json() workflow_id = workflow_result['workflow_id'] # 等待工作流完成 self._wait_for_workflow_completion(workflow_id) # 验证结果 response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 final_result = response.get_json() assert final_result['success'] is True assert final_result['workflow']['status'] == 'completed' # 验证FreeRTOS特定的统计 stats = final_result['workflow']['statistics'] assert 'total_functions' in stats assert stats['total_functions'] == 2 finally: if os.path.exists(test_file): os.unlink(test_file) def test_freertos_queue_operations_verification(self): """测试FreeRTOS队列操作验证""" freertos_code = """ #include "FreeRTOS.h" #include "queue.h" #define QUEUE_LENGTH 10 #define ITEM_SIZE sizeof(uint32_t) void queue_producer_task(void *pvParameters) { QueueHandle_t xQueue = (QueueHandle_t)pvParameters; uint32_t value = 0; for (;;) { value++; if (xQueueSend(xQueue, &value, pdMS_TO_TICKS(100)) == pdPASS) { // Item sent successfully } vTaskDelay(pdMS_TO_TICKS(50)); } } void queue_consumer_task(void *pvParameters) { QueueHandle_t xQueue = (QueueHandle_t)pvParameters; uint32_t received_value; for (;;) { if (xQueueReceive(xQueue, &received_value, pdMS_TO_TICKS(200)) == pdPASS) { // Item received successfully } vTaskDelay(pdMS_TO_TICKS(50)); } } void create_queue_example(void) { QueueHandle_t xQueue; xQueue = xQueueCreate(QUEUE_LENGTH, ITEM_SIZE); if (xQueue != NULL) { // Queue created successfully // Create producer and consumer tasks xTaskCreate(queue_producer_task, "Producer", configMINIMAL_STACK_SIZE, xQueue, 1, NULL); xTaskCreate(queue_consumer_task, "Consumer", configMINIMAL_STACK_SIZE, xQueue, 1, NULL); // Let tasks run for a while vTaskDelay(pdMS_TO_TICKS(1000)); // Cleanup vQueueDelete(xQueue); } } """ test_file = os.path.join(self.temp_dir, "freertos_queue_test.c") with open(test_file, 'w') as f: f.write(freertos_code) try: workflow_data = { 'source_files': [test_file], 'target_functions': ['queue_producer_task', 'queue_consumer_task', 'create_queue_example'], 'workflow_config': { 'mode': 'freertos_verification', 'enable_parsing': True, 'enable_generation': True, 'enable_mutation': True, 'enable_verification': True, 'verification_types': ['memory_safety', 'concurrency', 'queue_safety'], 'freertos_config': { 'include_headers': ['FreeRTOS.h', 'queue.h'], 'queue_length': 10, 'queue_item_size': 4, 'timeout_ms': 2000 } } } with self.app.test_client() as client: response = client.post('/api/workflow', json=workflow_data) assert response.status_code == 200 workflow_result = response.get_json() workflow_id = workflow_result['workflow_id'] self._wait_for_workflow_completion(workflow_id) response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 final_result = response.get_json() assert final_result['success'] is True finally: if os.path.exists(test_file): os.unlink(test_file) def test_freertos_semaphore_verification(self): """测试FreeRTOS信号量验证""" freertos_code = """ #include "FreeRTOS.h" #include "semphr.h" SemaphoreHandle_t xSemaphore; void task_using_semaphore(void *pvParameters) { for (;;) { // Try to take the semaphore if (xSemaphoreTake(xSemaphore, pdMS_TO_TICKS(100)) == pdTRUE) { // Critical section // Access shared resource // Release the semaphore xSemaphoreGive(xSemaphore); } vTaskDelay(pdMS_TO_TICKS(50)); } } void create_semaphore_example(void) { // Create a binary semaphore xSemaphore = xSemaphoreCreateBinary(); if (xSemaphore != NULL) { // Give the semaphore initially xSemaphoreGive(xSemaphore); // Create tasks that use the semaphore for (int i = 0; i < 3; i++) { xTaskCreate(task_using_semaphore, "SemaphoreTask", configMINIMAL_STACK_SIZE, NULL, 1, NULL); } // Let tasks run vTaskDelay(pdMS_TO_TICKS(2000)); // Cleanup vSemaphoreDelete(xSemaphore); } } """ test_file = os.path.join(self.temp_dir, "freertos_semaphore_test.c") with open(test_file, 'w') as f: f.write(freertos_code) try: workflow_data = { 'source_files': [test_file], 'target_functions': ['task_using_semaphore', 'create_semaphore_example'], 'workflow_config': { 'mode': 'freertos_verification', 'enable_parsing': True, 'enable_generation': True, 'enable_mutation': True, 'enable_verification': True, 'verification_types': ['memory_safety', 'concurrency', 'race_condition'], 'freertos_config': { 'include_headers': ['FreeRTOS.h', 'semphr.h'], 'max_tasks': 3, 'timeout_ms': 3000 } } } with self.app.test_client() as client: response = client.post('/api/workflow', json=workflow_data) assert response.status_code == 200 workflow_result = response.get_json() workflow_id = workflow_result['workflow_id'] self._wait_for_workflow_completion(workflow_id) response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 final_result = response.get_json() assert final_result['success'] is True finally: if os.path.exists(test_file): os.unlink(test_file) def test_freertos_timer_verification(self): """测试FreeRTOS定时器验证""" freertos_code = """ #include "FreeRTOS.h" #include "timers.h" TimerHandle_t xTimer; void vTimerCallback(TimerHandle_t xTimer) { // Timer callback function uint32_t timer_id = (uint32_t)pvTimerGetTimerID(xTimer); // Handle timer event // This could be a periodic operation } void create_timer_example(void) { // Create a software timer xTimer = xTimerCreate( "Timer", // Text name pdMS_TO_TICKS(1000), // Period pdTRUE, // Auto-reload (void *)1, // Timer ID vTimerCallback // Callback function ); if (xTimer != NULL) { // Start the timer if (xTimerStart(xTimer, 0) == pdPASS) { // Timer started successfully // Let it run for a while vTaskDelay(pdMS_TO_TICKS(5000)); // Stop and delete the timer xTimerStop(xTimer, 0); xTimerDelete(xTimer); } } } """ test_file = os.path.join(self.temp_dir, "freertos_timer_test.c") with open(test_file, 'w') as f: f.write(freertos_code) try: workflow_data = { 'source_files': [test_file], 'target_functions': ['vTimerCallback', 'create_timer_example'], 'workflow_config': { 'mode': 'freertos_verification', 'enable_parsing': True, 'enable_generation': True, 'enable_mutation': True, 'enable_verification': True, 'verification_types': ['memory_safety', 'timer_safety'], 'freertos_config': { 'include_headers': ['FreeRTOS.h', 'timers.h'], 'timer_period_ms': 1000, 'timeout_ms': 6000 } } } with self.app.test_client() as client: response = client.post('/api/workflow', json=workflow_data) assert response.status_code == 200 workflow_result = response.get_json() workflow_id = workflow_result['workflow_id'] self._wait_for_workflow_completion(workflow_id) response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 final_result = response.get_json() assert final_result['success'] is True finally: if os.path.exists(test_file): os.unlink(test_file) def test_freertos_interrupt_handling_verification(self): """测试FreeRTOS中断处理验证""" freertos_code = """ #include "FreeRTOS.h" #include "task.h" #include "queue.h" // Queue for interrupt-to-task communication QueueHandle_t xInterruptQueue; // Interrupt service routine void vISR_Handler(void) { BaseType_t xHigherPriorityTaskWoken = pdFALSE; uint32_t interrupt_value = 42; // Send data from ISR to task xQueueSendFromISR(xInterruptQueue, &interrupt_value, &xHigherPriorityTaskWoken); // Context switch if needed portYIELD_FROM_ISR(xHigherPriorityTaskWoken); } // Task that processes interrupt data void vInterruptHandlerTask(void *pvParameters) { uint32_t received_value; for (;;) { if (xQueueReceive(xInterruptQueue, &received_value, portMAX_DELAY) == pdPASS) { // Process interrupt data if (received_value == 42) { // Handle specific interrupt } } } } void setup_interrupt_handling(void) { // Create queue for interrupt communication xInterruptQueue = xQueueCreate(10, sizeof(uint32_t)); if (xInterruptQueue != NULL) { // Create task to handle interrupts xTaskCreate( vInterruptHandlerTask, "IntHandler", configMINIMAL_STACK_SIZE, NULL, tskIDLE_PRIORITY + 2, NULL ); // Setup interrupt (simulation) // In real code, this would configure hardware interrupts // Let the system run vTaskDelay(pdMS_TO_TICKS(1000)); // Cleanup vQueueDelete(xInterruptQueue); } } """ test_file = os.path.join(self.temp_dir, "freertos_interrupt_test.c") with open(test_file, 'w') as f: f.write(freertos_code) try: workflow_data = { 'source_files': [test_file], 'target_functions': ['vISR_Handler', 'vInterruptHandlerTask', 'setup_interrupt_handling'], 'workflow_config': { 'mode': 'freertos_verification', 'enable_parsing': True, 'enable_generation': True, 'enable_mutation': True, 'enable_verification': True, 'verification_types': ['memory_safety', 'concurrency', 'interrupt_safety'], 'freertos_config': { 'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h'], 'queue_size': 10, 'task_priority': 2, 'timeout_ms': 2000 } } } with self.app.test_client() as client: response = client.post('/api/workflow', json=workflow_data) assert response.status_code == 200 workflow_result = response.get_json() workflow_id = workflow_result['workflow_id'] self._wait_for_workflow_completion(workflow_id) response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 final_result = response.get_json() assert final_result['success'] is True finally: if os.path.exists(test_file): os.unlink(test_file) def test_freertos_complex_scenario_verification(self): """测试FreeRTOS复杂场景验证""" freertos_code = """ #include "FreeRTOS.h" #include "task.h" #include "queue.h" #include "semphr.h" #include "timers.h" // Shared resources QueueHandle_t xCommandQueue; SemaphoreHandle_t xResourceSemaphore; TimerHandle_t xSystemTimer; // Task states typedef enum { TASK_STATE_IDLE, TASK_STATE_PROCESSING, TASK_STATE_COMPLETED } TaskState_t; // Task data structure typedef struct { TaskState_t state; uint32_t processed_commands; uint32_t error_count; } TaskData_t; void worker_task(void *pvParameters) { TaskData_t *task_data = (TaskData_t *)pvParameters; uint32_t command; for (;;) { // Wait for command if (xQueueReceive(xCommandQueue, &command, pdMS_TO_TICKS(100)) == pdPASS) { // Acquire resource semaphore if (xSemaphoreTake(xResourceSemaphore, pdMS_TO_TICKS(50)) == pdTRUE) { // Process command task_data->state = TASK_STATE_PROCESSING; // Simulate processing if (command < 1000) { task_data->processed_commands++; } else { task_data->error_count++; } task_data->state = TASK_STATE_COMPLETED; // Release resource xSemaphoreGive(xResourceSemaphore); } } // Small delay to yield CPU vTaskDelay(pdMS_TO_TICKS(10)); } } void timer_callback(TimerHandle_t xTimer) { // Periodic system maintenance uint32_t timer_id = (uint32_t)pvTimerGetTimerID(xTimer); // Perform maintenance tasks // This could include cleanup, statistics collection, etc. } void create_freertos_system(void) { TaskData_t task_data = {TASK_STATE_IDLE, 0, 0}; // Create communication queue xCommandQueue = xQueueCreate(20, sizeof(uint32_t)); // Create resource semaphore xResourceSemaphore = xSemaphoreCreateMutex(); // Create system timer xSystemTimer = xTimerCreate( "SystemTimer", pdMS_TO_TICKS(5000), pdTRUE, (void *)1, timer_callback ); if (xCommandQueue != NULL && xResourceSemaphore != NULL && xSystemTimer != NULL) { // Create worker tasks for (int i = 0; i < 3; i++) { xTaskCreate( worker_task, "WorkerTask", configMINIMAL_STACK_SIZE * 2, &task_data, tskIDLE_PRIORITY + 1, NULL ); } // Start system timer xTimerStart(xSystemTimer, 0); // Send some test commands uint32_t test_commands[] = {100, 200, 1500, 300, 2500}; for (int i = 0; i < 5; i++) { xQueueSend(xCommandQueue, &test_commands[i], 0); } // Let system run vTaskDelay(pdMS_TO_TICKS(3000)); // Cleanup xTimerDelete(xSystemTimer); vSemaphoreDelete(xResourceSemaphore); vQueueDelete(xCommandQueue); } } """ test_file = os.path.join(self.temp_dir, "freertos_complex_test.c") with open(test_file, 'w') as f: f.write(freertos_code) try: workflow_data = { 'source_files': [test_file], 'target_functions': ['worker_task', 'timer_callback', 'create_freertos_system'], 'workflow_config': { 'mode': 'freertos_comprehensive', 'enable_parsing': True, 'enable_generation': True, 'enable_mutation': True, 'enable_verification': True, 'verification_types': [ 'memory_safety', 'concurrency', 'race_condition', 'queue_safety', 'timer_safety', 'interrupt_safety' ], 'freertos_config': { 'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h', 'semphr.h', 'timers.h'], 'num_tasks': 3, 'queue_size': 20, 'timer_period_ms': 5000, 'timeout_ms': 5000 } } } with self.app.test_client() as client: response = client.post('/api/workflow', json=workflow_data) assert response.status_code == 200 workflow_result = response.get_json() workflow_id = workflow_result['workflow_id'] self._wait_for_workflow_completion(workflow_id) response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 final_result = response.get_json() assert final_result['success'] is True # 验证复杂场景的统计 stats = final_result['workflow']['statistics'] assert 'total_functions' in stats assert stats['total_functions'] == 3 finally: if os.path.exists(test_file): os.unlink(test_file) def test_freertos_error_scenarios(self): """测试FreeRTOS错误场景""" # 测试NULL指针处理 null_pointer_code = """ #include "FreeRTOS.h" #include "task.h" void task_with_null_check(void *pvParameters) { TaskHandle_t task_handle = (TaskHandle_t)pvParameters; // Should handle NULL task handle gracefully if (task_handle != NULL) { vTaskDelete(task_handle); } } """ # 测试队列溢出 queue_overflow_code = """ #include "FreeRTOS.h" #include "queue.h" void queue_overflow_test(void) { QueueHandle_t queue = xQueueCreate(1, sizeof(int)); if (queue != NULL) { int value = 42; // This should fail gracefully when queue is full xQueueSend(queue, &value, 0); // Should succeed xQueueSend(queue, &value, 0); // Should fail (queue full) vQueueDelete(queue); } } """ # 测试信号量死锁 semaphore_deadlock_code = """ #include "FreeRTOS.h" #include "semphr.h" void semaphore_deadlock_test(void) { SemaphoreHandle_t sem1 = xSemaphoreCreateMutex(); SemaphoreHandle_t sem2 = xSemaphoreCreateMutex(); if (sem1 != NULL && sem2 != NULL) { // Potential deadlock scenario xSemaphoreTake(sem1, portMAX_DELAY); xSemaphoreTake(sem2, portMAX_DELAY); // Should release in reverse order to avoid deadlock xSemaphoreGive(sem2); xSemaphoreGive(sem1); vSemaphoreDelete(sem1); vSemaphoreDelete(sem2); } } """ error_scenarios = [ ("null_pointer", null_pointer_code), ("queue_overflow", queue_overflow_code), ("semaphore_deadlock", semaphore_deadlock_code) ] for scenario_name, scenario_code in error_scenarios: test_file = os.path.join(self.temp_dir, f"freertos_error_{scenario_name}.c") with open(test_file, 'w') as f: f.write(scenario_code) try: workflow_data = { 'source_files': [test_file], 'target_functions': [f'task_with_null_check', f'queue_overflow_test', f'semaphore_deadlock_test'], 'workflow_config': { 'mode': 'freertos_error_detection', 'enable_parsing': True, 'enable_generation': True, 'enable_mutation': True, 'enable_verification': True, 'verification_types': ['memory_safety', 'error_handling', 'null_pointer'], 'freertos_config': { 'include_headers': ['FreeRTOS.h', 'task.h', 'queue.h', 'semphr.h'], 'timeout_ms': 2000 } } } with self.app.test_client() as client: response = client.post('/api/workflow', json=workflow_data) assert response.status_code == 200 workflow_result = response.get_json() workflow_id = workflow_result['workflow_id'] self._wait_for_workflow_completion(workflow_id) response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 final_result = response.get_json() assert final_result['success'] is True finally: if os.path.exists(test_file): os.unlink(test_file) def _wait_for_workflow_completion(self, workflow_id: str, timeout: int = 120): """等待工作流完成""" import time start_time = time.time() with self.app.test_client() as client: while time.time() - start_time < timeout: response = client.get(f'/api/workflow/{workflow_id}') assert response.status_code == 200 workflow_data = response.get_json() if workflow_data['success']: workflow = workflow_data['workflow'] if workflow['status'] in ['completed', 'failed', 'cancelled']: return time.sleep(2) pytest.fail(f"FreeRTOS workflow {workflow_id} did not complete within {timeout} seconds") class TestFreeRTOSSpecificFeatures: """FreeRTOS特定功能测试""" def setup_method(self): """测试方法设置""" self.config = get_config() self.temp_dir = tempfile.mkdtemp() def teardown_method(self): """测试方法清理""" import shutil if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) def test_freertos_header_detection(self): """测试FreeRTOS头文件检测""" from src.parser import CodeParser parser = CodeParser(self.config) freertos_code = """ #include "FreeRTOS.h" #include "task.h" #include "queue.h" void freertos_function(void) { // FreeRTOS specific code } """ result = parser.parse_code(freertos_code) assert result.success # 验证检测到FreeRTOS头文件 assert any('FreeRTOS.h' in str(include) for include in result.includes) def test_freertos_api_recognition(self): """测试FreeRTOS API识别""" from src.parser import CodeParser parser = CodeParser(self.config) freertos_code = """ #include "FreeRTOS.h" #include "task.h" void api_test(void) { TaskHandle_t handle; xTaskCreate(vTaskFunction, "Task", 128, NULL, 1, &handle); vTaskDelay(100); vTaskDelete(handle); } """ result = parser.parse_code(freertos_code) assert result.success # 验证识别到FreeRTOS API调用 function = result.functions[0] assert 'xTaskCreate' in str(function.body) assert 'vTaskDelay' in str(function.body) assert 'vTaskDelete' in str(function.body) def test_freertos_task_parameter_handling(self): """测试FreeRTOS任务参数处理""" from src.spec import SpecGenerator generator = SpecGenerator(self.config) # 模拟任务函数的元数据 task_metadata = { 'name': 'vTaskFunction', 'return_type': 'void', 'parameters': [ {'name': 'pvParameters', 'type': 'void *'} ], 'complexity_score': 0.6, 'is_freertos_task': True } with patch.object(generator, 'generate_specification') as mock_generate: mock_generate.return_value = { 'specification': 'void vTaskFunction(void *pvParameters) { __CPROVER_assume(pvParameters != NULL); }', 'confidence': 0.8, 'explanation': 'FreeRTOS task specification with parameter validation' } result = generator.generate_specification(task_metadata) assert 'pvParameters' in result['specification'] def test_freertos_concurrency_verification(self): """测试FreeRTOS并发验证""" from src.mutate import MutationEngine specification = """ void concurrent_task(void *pvParameters) { int *shared_counter = (int *)pvParameters; // Critical section without protection (*shared_counter)++; } """ function_metadata = { 'name': 'concurrent_task', 'parameters': [{'name': 'pvParameters', 'type': 'void *'}], 'is_freertos_task': True } config = Mock() config.max_candidates_per_function = 10 config.max_selected_candidates = 5 config.quality_threshold = 0.6 mutator = MutationEngine(config) with patch.object(mutator, 'execute_mutation') as mock_mutate: mock_result = Mock() mock_result.selected_candidates = [ Mock( mutated_spec='void concurrent_task(void *pvParameters) { ' 'int *shared_counter = (int *)pvParameters; ' '// Add semaphore protection for critical section ' '(*shared_counter)++; ' '}', confidence=0.8 ) ] mock_mutate.return_value = mock_result result = mutator.execute_mutation(specification, function_metadata) assert len(result.selected_candidates) > 0 def test_freertos_memory_safety_verification(self): """测试FreeRTOS内存安全验证""" from src.verify import CBMCRunner freertos_spec = """ void memory_safe_task(void *pvParameters) { TaskHandle_t *task_handle = (TaskHandle_t *)pvParameters; if (task_handle != NULL && *task_handle != NULL) { // Safe task handle access vTaskDelete(*task_handle); } } """ function_metadata = { 'name': 'memory_safe_task', 'parameters': [{'name': 'pvParameters', 'type': 'void *'}] } verifier = CBMCRunner() with patch.object(verifier, 'run_verification') as mock_verify: mock_result = Mock() mock_result.status = Mock() mock_result.status.value = 'successful' mock_verify.return_value = mock_result async def test_verification(): result = await verifier.run_verification(function_metadata, "test.c", freertos_spec) assert result.status.value == 'successful' asyncio.run(test_verification()) if __name__ == "__main__": pytest.main([__file__, "-v"])