You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
408 lines
16 KiB
408 lines
16 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Header dependency analysis utility for CBMC SpecGen.
|
|
|
|
This module provides comprehensive header dependency resolution for C/C++ files,
|
|
ensuring that CBMC verification has access to all required header files.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import logging
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import List, Set, Dict, Optional, Tuple
|
|
from collections import defaultdict, deque
|
|
|
|
|
|
class HeaderDependencyAnalyzer:
|
|
"""
|
|
Analyzes and resolves header dependencies for C/C++ files.
|
|
|
|
This class provides methods to:
|
|
- Extract local include statements from C/C++ files
|
|
- Resolve header file paths
|
|
- Collect complete dependency trees
|
|
- Copy dependencies to temporary directories
|
|
"""
|
|
|
|
def __init__(self, logger: Optional[logging.Logger] = None):
|
|
"""
|
|
Initialize the header dependency analyzer.
|
|
|
|
Args:
|
|
logger: Optional logger instance for dependency analysis logging
|
|
"""
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
|
# Regex pattern to match local includes: #include "header.h"
|
|
self.include_pattern = re.compile(r'^\s*#\s*include\s*"([^"]+)"', re.MULTILINE)
|
|
|
|
# Cache for dependency analysis to avoid re-processing
|
|
self.dependency_cache: Dict[str, Set[str]] = {}
|
|
|
|
# Track processed files to avoid circular dependencies
|
|
self.processed_files: Set[str] = set()
|
|
|
|
# Thread safety locks
|
|
self._cache_lock = threading.Lock()
|
|
self._processed_files_lock = threading.Lock()
|
|
|
|
def extract_local_includes(self, file_path: str) -> List[str]:
|
|
"""
|
|
Extract local include statements from a C/C++ file.
|
|
|
|
Args:
|
|
file_path: Path to the C/C++ file to analyze
|
|
|
|
Returns:
|
|
List of header files included with #include "..." syntax
|
|
"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
content = f.read()
|
|
|
|
# Find all local includes
|
|
includes = self.include_pattern.findall(content)
|
|
|
|
if self.logger.isEnabledFor(logging.DEBUG):
|
|
self.logger.debug(f"Found {len(includes)} local includes in {file_path}: {includes}")
|
|
|
|
return includes
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading file {file_path}: {e}")
|
|
return []
|
|
|
|
def resolve_header_path(self, header_name: str, source_file_path: str,
|
|
project_root: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Resolve a header file path relative to the source file and project root.
|
|
|
|
Args:
|
|
header_name: Header file name from #include statement
|
|
source_file_path: Path to the source file that includes the header
|
|
project_root: Optional project root directory for absolute resolution
|
|
|
|
Returns:
|
|
Resolved absolute path to the header file, or None if not found
|
|
"""
|
|
source_dir = Path(source_file_path).parent
|
|
|
|
# Try relative to source file first
|
|
header_path = source_dir / header_name
|
|
if header_path.exists() and header_path.is_file():
|
|
return str(header_path.absolute())
|
|
|
|
# Try in project root if specified
|
|
if project_root:
|
|
project_path = Path(project_root) / header_name
|
|
if project_path.exists() and project_path.is_file():
|
|
return str(project_path.absolute())
|
|
|
|
# Try common include directories relative to source
|
|
for include_dir in ['include', 'inc', 'headers']:
|
|
include_path = source_dir / include_dir / header_name
|
|
if include_path.exists() and include_path.is_file():
|
|
return str(include_path.absolute())
|
|
|
|
# Try parent directories
|
|
parent_dir = source_dir
|
|
for _ in range(5): # Limit search depth
|
|
parent_dir = parent_dir.parent
|
|
if parent_dir == parent_dir.parent: # Reached filesystem root
|
|
break
|
|
header_path = parent_dir / header_name
|
|
if header_path.exists() and header_path.is_file():
|
|
return str(header_path.absolute())
|
|
|
|
if self.logger.isEnabledFor(logging.DEBUG):
|
|
self.logger.debug(f"Could not resolve header '{header_name}' for {source_file_path}")
|
|
|
|
return None
|
|
|
|
def collect_file_dependencies(self, file_path: str,
|
|
project_root: Optional[str] = None,
|
|
max_depth: int = 10,
|
|
current_depth: int = 0) -> Set[str]:
|
|
"""
|
|
Collect all header dependencies for a single C/C++ file recursively.
|
|
|
|
Args:
|
|
file_path: Path to the C/C++ file to analyze
|
|
project_root: Optional project root directory
|
|
max_depth: Maximum recursion depth for dependency analysis
|
|
current_depth: Current recursion depth (for internal use)
|
|
|
|
Returns:
|
|
Set of absolute paths to all required header files
|
|
"""
|
|
# Check cache first (thread-safe)
|
|
with self._cache_lock:
|
|
if file_path in self.dependency_cache:
|
|
return self.dependency_cache[file_path].copy() # Return copy to avoid concurrent modification
|
|
|
|
# Prevent infinite recursion and excessive depth
|
|
if current_depth > max_depth:
|
|
self.logger.warning(f"Max dependency depth ({max_depth}) exceeded for {file_path}")
|
|
return set()
|
|
|
|
# Prevent circular dependencies (thread-safe)
|
|
with self._processed_files_lock:
|
|
if file_path in self.processed_files:
|
|
self.logger.debug(f"Circular dependency detected for {file_path}")
|
|
return set()
|
|
self.processed_files.add(file_path)
|
|
dependencies = set()
|
|
|
|
try:
|
|
# Extract includes from the current file
|
|
includes = self.extract_local_includes(file_path)
|
|
|
|
for include in includes:
|
|
# Resolve the header path
|
|
header_path = self.resolve_header_path(include, file_path, project_root)
|
|
|
|
if header_path:
|
|
dependencies.add(header_path)
|
|
|
|
# Recursively analyze the header file itself
|
|
if current_depth < max_depth:
|
|
header_dependencies = self.collect_file_dependencies(
|
|
header_path, project_root, max_depth, current_depth + 1
|
|
)
|
|
dependencies.update(header_dependencies)
|
|
else:
|
|
self.logger.warning(f"Could not resolve header '{include}' for {file_path}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error analyzing dependencies for {file_path}: {e}")
|
|
|
|
# Remove from processed files to allow processing in other contexts (thread-safe)
|
|
with self._processed_files_lock:
|
|
if file_path in self.processed_files:
|
|
self.processed_files.remove(file_path)
|
|
|
|
# Cache the result (thread-safe)
|
|
with self._cache_lock:
|
|
self.dependency_cache[file_path] = dependencies.copy() # Store copy to avoid concurrent modification
|
|
|
|
if self.logger.isEnabledFor(logging.DEBUG):
|
|
self.logger.debug(f"Dependencies for {file_path}: {list(dependencies)}")
|
|
|
|
return dependencies
|
|
|
|
def collect_project_dependencies(self, project_root: str,
|
|
file_patterns: List[str] = None,
|
|
max_depth: int = 10) -> Dict[str, Set[str]]:
|
|
"""
|
|
Analyze all C/C++ files in a project to build comprehensive dependency map.
|
|
|
|
Args:
|
|
project_root: Root directory of the project
|
|
file_patterns: List of file patterns to analyze (default: common C/C++ extensions)
|
|
max_depth: Maximum recursion depth for dependency analysis
|
|
|
|
Returns:
|
|
Dictionary mapping file paths to their dependency sets
|
|
"""
|
|
if file_patterns is None:
|
|
file_patterns = ['*.c', '*.cpp', '*.cc', '*.cxx', '*.h', '*.hpp']
|
|
|
|
project_path = Path(project_root)
|
|
dependency_map = {}
|
|
|
|
if not project_path.exists():
|
|
self.logger.error(f"Project root does not exist: {project_root}")
|
|
return dependency_map
|
|
|
|
# Find all source and header files
|
|
source_files = []
|
|
for pattern in file_patterns:
|
|
source_files.extend(project_path.rglob(pattern))
|
|
|
|
self.logger.info(f"Analyzing dependencies for {len(source_files)} files in {project_root}")
|
|
|
|
# Analyze dependencies for each file
|
|
for file_path in source_files:
|
|
abs_path = str(file_path.absolute())
|
|
dependencies = self.collect_file_dependencies(abs_path, project_root, max_depth)
|
|
dependency_map[abs_path] = dependencies
|
|
|
|
# Log summary statistics
|
|
total_dependencies = sum(len(deps) for deps in dependency_map.values())
|
|
self.logger.info(f"Dependency analysis complete: {len(dependency_map)} files, "
|
|
f"{total_dependencies} total dependencies")
|
|
|
|
return dependency_map
|
|
|
|
def copy_dependencies_to_temp(self, dependencies: Set[str],
|
|
temp_dir: str,
|
|
preserve_structure: bool = True,
|
|
project_root: Optional[str] = None) -> Dict[str, str]:
|
|
"""
|
|
Copy header dependencies to temporary directory.
|
|
|
|
Args:
|
|
dependencies: Set of absolute paths to header files
|
|
temp_dir: Temporary directory to copy files to
|
|
preserve_structure: Whether to preserve directory structure
|
|
project_root: Project root directory for computing relative paths
|
|
|
|
Returns:
|
|
Dictionary mapping original paths to temporary paths
|
|
"""
|
|
temp_path = Path(temp_dir)
|
|
temp_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
copied_files = {}
|
|
|
|
for header_path in dependencies:
|
|
try:
|
|
src_path = Path(header_path)
|
|
|
|
if not src_path.exists():
|
|
self.logger.warning(f"Header file not found: {header_path}")
|
|
continue
|
|
|
|
if preserve_structure:
|
|
# Preserve relative directory structure based on project_root
|
|
if project_root:
|
|
try:
|
|
# Calculate relative path from project root
|
|
project_path = Path(project_root)
|
|
rel_path = src_path.relative_to(project_path)
|
|
except ValueError:
|
|
# If not under project root, use filename
|
|
rel_path = Path(src_path.name)
|
|
else:
|
|
# If no project root, try to find common parent among all dependencies
|
|
rel_path = Path(src_path.name)
|
|
else:
|
|
# Copy all headers to temp directory root
|
|
rel_path = Path(src_path.name)
|
|
|
|
# Create destination path
|
|
dest_path = temp_path / rel_path
|
|
|
|
# Create destination directory if needed
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Copy the file
|
|
import shutil
|
|
shutil.copy2(src_path, dest_path)
|
|
copied_files[header_path] = str(dest_path)
|
|
|
|
if self.logger.isEnabledFor(logging.DEBUG):
|
|
self.logger.debug(f"Copied {header_path} -> {dest_path}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error copying header {header_path}: {e}")
|
|
|
|
self.logger.info(f"Copied {len(copied_files)} header files to {temp_dir}")
|
|
return copied_files
|
|
|
|
def validate_dependencies(self, dependencies: Set[str]) -> Tuple[bool, List[str]]:
|
|
"""
|
|
Validate that all dependency files exist and are readable.
|
|
|
|
Args:
|
|
dependencies: Set of header file paths to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid, list_of_error_messages)
|
|
"""
|
|
is_valid = True
|
|
errors = []
|
|
|
|
for header_path in dependencies:
|
|
path = Path(header_path)
|
|
|
|
if not path.exists():
|
|
is_valid = False
|
|
errors.append(f"Header file does not exist: {header_path}")
|
|
continue
|
|
|
|
if not path.is_file():
|
|
is_valid = False
|
|
errors.append(f"Header path is not a file: {header_path}")
|
|
continue
|
|
|
|
if not os.access(header_path, os.R_OK):
|
|
is_valid = False
|
|
errors.append(f"Header file is not readable: {header_path}")
|
|
|
|
return is_valid, errors
|
|
|
|
def clear_cache(self):
|
|
"""Clear the dependency analysis cache (thread-safe)."""
|
|
with self._cache_lock:
|
|
self.dependency_cache.clear()
|
|
with self._processed_files_lock:
|
|
self.processed_files.clear()
|
|
self.logger.debug("Dependency cache cleared")
|
|
|
|
def get_dependency_statistics(self, dependency_map: Dict[str, Set[str]]) -> Dict[str, int]:
|
|
"""
|
|
Generate statistics about project dependencies.
|
|
|
|
Args:
|
|
dependency_map: Dictionary mapping files to their dependencies
|
|
|
|
Returns:
|
|
Dictionary with dependency statistics
|
|
"""
|
|
if not dependency_map:
|
|
return {}
|
|
|
|
total_files = len(dependency_map)
|
|
total_dependencies = sum(len(deps) for deps in dependency_map.values())
|
|
avg_dependencies = total_dependencies / total_files if total_files > 0 else 0
|
|
|
|
# Find files with most/least dependencies
|
|
dependency_counts = [len(deps) for deps in dependency_map.values()]
|
|
max_deps = max(dependency_counts) if dependency_counts else 0
|
|
min_deps = min(dependency_counts) if dependency_counts else 0
|
|
|
|
return {
|
|
'total_files': total_files,
|
|
'total_dependencies': total_dependencies,
|
|
'average_dependencies_per_file': avg_dependencies,
|
|
'max_dependencies': max_deps,
|
|
'min_dependencies': min_deps
|
|
}
|
|
|
|
|
|
def create_header_analyzer(logger: Optional[logging.Logger] = None, thread_safe: bool = True) -> HeaderDependencyAnalyzer:
|
|
"""
|
|
Factory function to create a header dependency analyzer.
|
|
|
|
Args:
|
|
logger: Optional logger instance
|
|
thread_safe: Whether to create a thread-safe analyzer (default: True)
|
|
|
|
Returns:
|
|
Configured HeaderDependencyAnalyzer instance
|
|
"""
|
|
return HeaderDependencyAnalyzer(logger)
|
|
|
|
|
|
# Convenience functions for direct usage
|
|
def extract_local_includes(file_path: str) -> List[str]:
|
|
"""Extract local includes from a file (convenience function)."""
|
|
analyzer = create_header_analyzer()
|
|
return analyzer.extract_local_includes(file_path)
|
|
|
|
|
|
def collect_file_dependencies(file_path: str, project_root: Optional[str] = None) -> Set[str]:
|
|
"""Collect dependencies for a file (convenience function)."""
|
|
analyzer = create_header_analyzer()
|
|
return analyzer.collect_file_dependencies(file_path, project_root)
|
|
|
|
|
|
def copy_dependencies_to_temp(dependencies: Set[str], temp_dir: str,
|
|
preserve_structure: bool = True,
|
|
project_root: Optional[str] = None) -> Dict[str, str]:
|
|
"""Copy dependencies to temp directory (convenience function)."""
|
|
analyzer = create_header_analyzer()
|
|
return analyzer.copy_dependencies_to_temp(dependencies, temp_dir, preserve_structure, project_root) |