#!/usr/bin/env python3 """ Header dependency analysis utility for CBMC SpecGen. This module provides comprehensive header dependency resolution for C/C++ files, ensuring that CBMC verification has access to all required header files. """ import os import re import logging import threading from pathlib import Path from typing import List, Set, Dict, Optional, Tuple from collections import defaultdict, deque class HeaderDependencyAnalyzer: """ Analyzes and resolves header dependencies for C/C++ files. This class provides methods to: - Extract local include statements from C/C++ files - Resolve header file paths - Collect complete dependency trees - Copy dependencies to temporary directories """ def __init__(self, logger: Optional[logging.Logger] = None): """ Initialize the header dependency analyzer. Args: logger: Optional logger instance for dependency analysis logging """ self.logger = logger or logging.getLogger(__name__) # Regex pattern to match local includes: #include "header.h" self.include_pattern = re.compile(r'^\s*#\s*include\s*"([^"]+)"', re.MULTILINE) # Cache for dependency analysis to avoid re-processing self.dependency_cache: Dict[str, Set[str]] = {} # Track processed files to avoid circular dependencies self.processed_files: Set[str] = set() # Thread safety locks self._cache_lock = threading.Lock() self._processed_files_lock = threading.Lock() def extract_local_includes(self, file_path: str) -> List[str]: """ Extract local include statements from a C/C++ file. Args: file_path: Path to the C/C++ file to analyze Returns: List of header files included with #include "..." syntax """ try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Find all local includes includes = self.include_pattern.findall(content) if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Found {len(includes)} local includes in {file_path}: {includes}") return includes except Exception as e: self.logger.error(f"Error reading file {file_path}: {e}") return [] def resolve_header_path(self, header_name: str, source_file_path: str, project_root: Optional[str] = None) -> Optional[str]: """ Resolve a header file path relative to the source file and project root. Args: header_name: Header file name from #include statement source_file_path: Path to the source file that includes the header project_root: Optional project root directory for absolute resolution Returns: Resolved absolute path to the header file, or None if not found """ source_dir = Path(source_file_path).parent # Try relative to source file first header_path = source_dir / header_name if header_path.exists() and header_path.is_file(): return str(header_path.absolute()) # Try in project root if specified if project_root: project_path = Path(project_root) / header_name if project_path.exists() and project_path.is_file(): return str(project_path.absolute()) # Try common include directories relative to source for include_dir in ['include', 'inc', 'headers']: include_path = source_dir / include_dir / header_name if include_path.exists() and include_path.is_file(): return str(include_path.absolute()) # Try parent directories parent_dir = source_dir for _ in range(5): # Limit search depth parent_dir = parent_dir.parent if parent_dir == parent_dir.parent: # Reached filesystem root break header_path = parent_dir / header_name if header_path.exists() and header_path.is_file(): return str(header_path.absolute()) if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Could not resolve header '{header_name}' for {source_file_path}") return None def collect_file_dependencies(self, file_path: str, project_root: Optional[str] = None, max_depth: int = 10, current_depth: int = 0) -> Set[str]: """ Collect all header dependencies for a single C/C++ file recursively. Args: file_path: Path to the C/C++ file to analyze project_root: Optional project root directory max_depth: Maximum recursion depth for dependency analysis current_depth: Current recursion depth (for internal use) Returns: Set of absolute paths to all required header files """ # Check cache first (thread-safe) with self._cache_lock: if file_path in self.dependency_cache: return self.dependency_cache[file_path].copy() # Return copy to avoid concurrent modification # Prevent infinite recursion and excessive depth if current_depth > max_depth: self.logger.warning(f"Max dependency depth ({max_depth}) exceeded for {file_path}") return set() # Prevent circular dependencies (thread-safe) with self._processed_files_lock: if file_path in self.processed_files: self.logger.debug(f"Circular dependency detected for {file_path}") return set() self.processed_files.add(file_path) dependencies = set() try: # Extract includes from the current file includes = self.extract_local_includes(file_path) for include in includes: # Resolve the header path header_path = self.resolve_header_path(include, file_path, project_root) if header_path: dependencies.add(header_path) # Recursively analyze the header file itself if current_depth < max_depth: header_dependencies = self.collect_file_dependencies( header_path, project_root, max_depth, current_depth + 1 ) dependencies.update(header_dependencies) else: self.logger.warning(f"Could not resolve header '{include}' for {file_path}") except Exception as e: self.logger.error(f"Error analyzing dependencies for {file_path}: {e}") # Remove from processed files to allow processing in other contexts (thread-safe) with self._processed_files_lock: if file_path in self.processed_files: self.processed_files.remove(file_path) # Cache the result (thread-safe) with self._cache_lock: self.dependency_cache[file_path] = dependencies.copy() # Store copy to avoid concurrent modification if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Dependencies for {file_path}: {list(dependencies)}") return dependencies def collect_project_dependencies(self, project_root: str, file_patterns: List[str] = None, max_depth: int = 10) -> Dict[str, Set[str]]: """ Analyze all C/C++ files in a project to build comprehensive dependency map. Args: project_root: Root directory of the project file_patterns: List of file patterns to analyze (default: common C/C++ extensions) max_depth: Maximum recursion depth for dependency analysis Returns: Dictionary mapping file paths to their dependency sets """ if file_patterns is None: file_patterns = ['*.c', '*.cpp', '*.cc', '*.cxx', '*.h', '*.hpp'] project_path = Path(project_root) dependency_map = {} if not project_path.exists(): self.logger.error(f"Project root does not exist: {project_root}") return dependency_map # Find all source and header files source_files = [] for pattern in file_patterns: source_files.extend(project_path.rglob(pattern)) self.logger.info(f"Analyzing dependencies for {len(source_files)} files in {project_root}") # Analyze dependencies for each file for file_path in source_files: abs_path = str(file_path.absolute()) dependencies = self.collect_file_dependencies(abs_path, project_root, max_depth) dependency_map[abs_path] = dependencies # Log summary statistics total_dependencies = sum(len(deps) for deps in dependency_map.values()) self.logger.info(f"Dependency analysis complete: {len(dependency_map)} files, " f"{total_dependencies} total dependencies") return dependency_map def copy_dependencies_to_temp(self, dependencies: Set[str], temp_dir: str, preserve_structure: bool = True, project_root: Optional[str] = None) -> Dict[str, str]: """ Copy header dependencies to temporary directory. Args: dependencies: Set of absolute paths to header files temp_dir: Temporary directory to copy files to preserve_structure: Whether to preserve directory structure project_root: Project root directory for computing relative paths Returns: Dictionary mapping original paths to temporary paths """ temp_path = Path(temp_dir) temp_path.mkdir(parents=True, exist_ok=True) copied_files = {} for header_path in dependencies: try: src_path = Path(header_path) if not src_path.exists(): self.logger.warning(f"Header file not found: {header_path}") continue if preserve_structure: # Preserve relative directory structure based on project_root if project_root: try: # Calculate relative path from project root project_path = Path(project_root) rel_path = src_path.relative_to(project_path) except ValueError: # If not under project root, use filename rel_path = Path(src_path.name) else: # If no project root, try to find common parent among all dependencies rel_path = Path(src_path.name) else: # Copy all headers to temp directory root rel_path = Path(src_path.name) # Create destination path dest_path = temp_path / rel_path # Create destination directory if needed dest_path.parent.mkdir(parents=True, exist_ok=True) # Copy the file import shutil shutil.copy2(src_path, dest_path) copied_files[header_path] = str(dest_path) if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Copied {header_path} -> {dest_path}") except Exception as e: self.logger.error(f"Error copying header {header_path}: {e}") self.logger.info(f"Copied {len(copied_files)} header files to {temp_dir}") return copied_files def validate_dependencies(self, dependencies: Set[str]) -> Tuple[bool, List[str]]: """ Validate that all dependency files exist and are readable. Args: dependencies: Set of header file paths to validate Returns: Tuple of (is_valid, list_of_error_messages) """ is_valid = True errors = [] for header_path in dependencies: path = Path(header_path) if not path.exists(): is_valid = False errors.append(f"Header file does not exist: {header_path}") continue if not path.is_file(): is_valid = False errors.append(f"Header path is not a file: {header_path}") continue if not os.access(header_path, os.R_OK): is_valid = False errors.append(f"Header file is not readable: {header_path}") return is_valid, errors def clear_cache(self): """Clear the dependency analysis cache (thread-safe).""" with self._cache_lock: self.dependency_cache.clear() with self._processed_files_lock: self.processed_files.clear() self.logger.debug("Dependency cache cleared") def get_dependency_statistics(self, dependency_map: Dict[str, Set[str]]) -> Dict[str, int]: """ Generate statistics about project dependencies. Args: dependency_map: Dictionary mapping files to their dependencies Returns: Dictionary with dependency statistics """ if not dependency_map: return {} total_files = len(dependency_map) total_dependencies = sum(len(deps) for deps in dependency_map.values()) avg_dependencies = total_dependencies / total_files if total_files > 0 else 0 # Find files with most/least dependencies dependency_counts = [len(deps) for deps in dependency_map.values()] max_deps = max(dependency_counts) if dependency_counts else 0 min_deps = min(dependency_counts) if dependency_counts else 0 return { 'total_files': total_files, 'total_dependencies': total_dependencies, 'average_dependencies_per_file': avg_dependencies, 'max_dependencies': max_deps, 'min_dependencies': min_deps } def create_header_analyzer(logger: Optional[logging.Logger] = None, thread_safe: bool = True) -> HeaderDependencyAnalyzer: """ Factory function to create a header dependency analyzer. Args: logger: Optional logger instance thread_safe: Whether to create a thread-safe analyzer (default: True) Returns: Configured HeaderDependencyAnalyzer instance """ return HeaderDependencyAnalyzer(logger) # Convenience functions for direct usage def extract_local_includes(file_path: str) -> List[str]: """Extract local includes from a file (convenience function).""" analyzer = create_header_analyzer() return analyzer.extract_local_includes(file_path) def collect_file_dependencies(file_path: str, project_root: Optional[str] = None) -> Set[str]: """Collect dependencies for a file (convenience function).""" analyzer = create_header_analyzer() return analyzer.collect_file_dependencies(file_path, project_root) def copy_dependencies_to_temp(dependencies: Set[str], temp_dir: str, preserve_structure: bool = True, project_root: Optional[str] = None) -> Dict[str, str]: """Copy dependencies to temp directory (convenience function).""" analyzer = create_header_analyzer() return analyzer.copy_dependencies_to_temp(dependencies, temp_dir, preserve_structure, project_root)