analyzer
¶
Full name: tenets.core.analysis.analyzer
analyzer¶
Main code analyzer orchestrator for Tenets.
This module coordinates language-specific analyzers and provides a unified interface for analyzing source code files. It handles analyzer selection, caching, parallel processing, and fallback strategies.
Classes¶
CodeAnalyzer¶
Main code analysis orchestrator.
Coordinates language-specific analyzers and provides a unified interface for analyzing source code files. Handles caching, parallel processing, analyzer selection, and fallback strategies.
ATTRIBUTE | DESCRIPTION |
---|---|
config | TenetsConfig instance for configuration |
logger | Logger instance for logging |
cache | AnalysisCache for caching analysis results |
analyzers | Dictionary mapping file extensions to analyzer instances |
stats | Analysis statistics and metrics |
Initialize the code analyzer.
PARAMETER | DESCRIPTION |
---|---|
config | Tenets configuration object TYPE: |
Source code in tenets/core/analysis/analyzer.py
def __init__(self, config: TenetsConfig):
"""Initialize the code analyzer.
Args:
config: Tenets configuration object
"""
self.config = config
self.logger = get_logger(__name__)
# Initialize cache if enabled
self.cache = None
if config.cache.enabled:
self.cache = AnalysisCache(config.cache.directory)
self.logger.info(f"Cache initialized at {config.cache.directory}")
# Initialize language analyzers
self.analyzers = self._initialize_analyzers()
# Thread pool for parallel analysis
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.scanner.workers)
# Analysis statistics
self.stats = {
"files_analyzed": 0,
"cache_hits": 0,
"cache_misses": 0,
"errors": 0,
"total_time": 0,
"languages": {},
}
self.logger.info(f"CodeAnalyzer initialized with {len(self.analyzers)} language analyzers")
Functions¶
analyze_file¶
analyze_file(file_path: Path, deep: bool = False, extract_keywords: bool = True, use_cache: bool = True, progress_callback: Optional[Callable] = None) -> FileAnalysis
Analyze a single file.
Performs language-specific analysis on a file, extracting imports, structure, complexity metrics, and other relevant information.
PARAMETER | DESCRIPTION |
---|---|
file_path | Path to the file to analyze TYPE: |
deep | Whether to perform deep analysis (AST parsing, etc.) TYPE: |
extract_keywords | Whether to extract keywords from content TYPE: |
use_cache | Whether to use cached results if available TYPE: |
progress_callback | Optional callback for progress updates |
RETURNS | DESCRIPTION |
---|---|
FileAnalysis | FileAnalysis object with complete analysis results |
RAISES | DESCRIPTION |
---|---|
FileNotFoundError | If file doesn't exist |
PermissionError | If file cannot be read |
Source code in tenets/core/analysis/analyzer.py
def analyze_file(
self,
file_path: Path,
deep: bool = False,
extract_keywords: bool = True,
use_cache: bool = True,
progress_callback: Optional[Callable] = None,
) -> FileAnalysis:
"""Analyze a single file.
Performs language-specific analysis on a file, extracting imports,
structure, complexity metrics, and other relevant information.
Args:
file_path: Path to the file to analyze
deep: Whether to perform deep analysis (AST parsing, etc.)
extract_keywords: Whether to extract keywords from content
use_cache: Whether to use cached results if available
progress_callback: Optional callback for progress updates
Returns:
FileAnalysis object with complete analysis results
Raises:
FileNotFoundError: If file doesn't exist
PermissionError: If file cannot be read
"""
file_path = Path(file_path)
# Check cache first
if use_cache and self.cache:
cached_analysis = self.cache.get_file_analysis(file_path)
if cached_analysis:
self.stats["cache_hits"] += 1
self.logger.debug(f"Cache hit for {file_path}")
if progress_callback:
progress_callback("cache_hit", file_path)
return cached_analysis
else:
self.stats["cache_misses"] += 1
self.logger.debug(f"Analyzing file: {file_path}")
try:
# Read file content
content = self._read_file_content(file_path)
# Create base analysis
analysis = FileAnalysis(
path=str(file_path),
content=content,
size=file_path.stat().st_size,
lines=content.count("\n") + 1,
language=self._detect_language(file_path),
file_name=file_path.name,
file_extension=file_path.suffix,
last_modified=datetime.fromtimestamp(file_path.stat().st_mtime),
hash=self._calculate_file_hash(content),
)
# Get appropriate analyzer
analyzer = self._get_analyzer(file_path)
if analyzer is None and deep:
analyzer = GenericAnalyzer()
if analyzer and deep:
try:
# Run language-specific analysis
self.logger.debug(f"Running {analyzer.language_name} analyzer on {file_path}")
analysis_results = analyzer.analyze(content, file_path)
# Update analysis object with results
# Collect results
imports = analysis_results.get("imports", [])
analysis.imports = imports
analysis.exports = analysis_results.get("exports", [])
structure = analysis_results.get("structure", CodeStructure())
# Ensure imports are accessible via structure as well for downstream tools
try:
if hasattr(structure, "imports"):
# Only set if empty to respect analyzers that already populate it
if not getattr(structure, "imports", None):
structure.imports = imports
except Exception:
# Be defensive; never fail analysis due to structure syncing
pass
analysis.structure = structure
analysis.complexity = analysis_results.get("complexity", ComplexityMetrics())
# Extract additional information
if analysis.structure:
analysis.classes = analysis.structure.classes
analysis.functions = analysis.structure.functions
analysis.modules = getattr(analysis.structure, "modules", [])
except Exception as e:
self.logger.warning(f"Language-specific analysis failed for {file_path}: {e}")
analysis.error = str(e)
self.stats["errors"] += 1
# Extract keywords if requested
if extract_keywords:
analysis.keywords = self._extract_keywords(content, analysis.language)
# Add code quality metrics
analysis.quality_score = self._calculate_quality_score(analysis)
# Cache the result
if use_cache and self.cache and not analysis.error:
try:
self.cache.put_file_analysis(file_path, analysis)
except Exception as e:
self.logger.debug(f"Failed to write analysis cache for {file_path}: {e}")
analysis.error = "Cache write error"
# Update statistics
self.stats["files_analyzed"] += 1
self.stats["languages"][analysis.language] = (
self.stats["languages"].get(analysis.language, 0) + 1
)
if progress_callback:
progress_callback("analyzed", file_path)
return analysis
except FileNotFoundError:
# Propagate not found to satisfy tests expecting exception
self.logger.error(f"File not found: {file_path}")
raise
except Exception as e:
self.logger.error(f"Failed to analyze {file_path}: {e}")
self.stats["errors"] += 1
return FileAnalysis(
path=str(file_path),
error=str(e),
file_name=file_path.name,
file_extension=file_path.suffix,
)
analyze_files¶
analyze_files(file_paths: list[Path], deep: bool = False, parallel: bool = True, progress_callback: Optional[Callable] = None) -> list[FileAnalysis]
Analyze multiple files.
PARAMETER | DESCRIPTION |
---|---|
file_paths | List of file paths to analyze |
deep | Whether to perform deep analysis TYPE: |
parallel | Whether to analyze files in parallel TYPE: |
progress_callback | Optional callback for progress updates |
RETURNS | DESCRIPTION |
---|---|
list[FileAnalysis] | List of FileAnalysis objects |
Source code in tenets/core/analysis/analyzer.py
def analyze_files(
self,
file_paths: list[Path],
deep: bool = False,
parallel: bool = True,
progress_callback: Optional[Callable] = None,
) -> list[FileAnalysis]:
"""Analyze multiple files.
Args:
file_paths: List of file paths to analyze
deep: Whether to perform deep analysis
parallel: Whether to analyze files in parallel
progress_callback: Optional callback for progress updates
Returns:
List of FileAnalysis objects
"""
self.logger.info(f"Analyzing {len(file_paths)} files (parallel={parallel})")
if parallel and len(file_paths) > 1:
# Parallel analysis
futures = []
for file_path in file_paths:
future = self._executor.submit(
self.analyze_file, file_path, deep=deep, progress_callback=progress_callback
)
futures.append((future, file_path))
# Collect results
results = []
for future, file_path in futures:
try:
result = future.result(timeout=self.config.scanner.timeout)
results.append(result)
except concurrent.futures.TimeoutError:
self.logger.warning(f"Analysis timeout for {file_path}")
results.append(FileAnalysis(path=str(file_path), error="Analysis timeout"))
except Exception as e:
self.logger.warning(f"Failed to analyze {file_path}: {e}")
results.append(FileAnalysis(path=str(file_path), error=str(e)))
return results
else:
# Sequential analysis
results = []
for i, file_path in enumerate(file_paths):
result = self.analyze_file(file_path, deep=deep)
results.append(result)
if progress_callback:
progress_callback(i + 1, len(file_paths))
return results
analyze_project¶
analyze_project(project_path: Path, patterns: Optional[list[str]] = None, exclude_patterns: Optional[list[str]] = None, deep: bool = True, parallel: bool = True, progress_callback: Optional[Callable] = None) -> ProjectAnalysis
Analyze an entire project.
PARAMETER | DESCRIPTION |
---|---|
project_path | Path to the project root TYPE: |
patterns | File patterns to include (e.g., ['.py', '.js']) |
exclude_patterns | File patterns to exclude |
deep | Whether to perform deep analysis TYPE: |
parallel | Whether to analyze files in parallel TYPE: |
progress_callback | Optional callback for progress updates |
RETURNS | DESCRIPTION |
---|---|
ProjectAnalysis | ProjectAnalysis object with complete project analysis |
Source code in tenets/core/analysis/analyzer.py
def analyze_project(
self,
project_path: Path,
patterns: Optional[list[str]] = None,
exclude_patterns: Optional[list[str]] = None,
deep: bool = True,
parallel: bool = True,
progress_callback: Optional[Callable] = None,
) -> ProjectAnalysis:
"""Analyze an entire project.
Args:
project_path: Path to the project root
patterns: File patterns to include (e.g., ['*.py', '*.js'])
exclude_patterns: File patterns to exclude
deep: Whether to perform deep analysis
parallel: Whether to analyze files in parallel
progress_callback: Optional callback for progress updates
Returns:
ProjectAnalysis object with complete project analysis
"""
self.logger.info(f"Analyzing project: {project_path}")
# Collect files to analyze
files = self._collect_project_files(project_path, patterns, exclude_patterns)
self.logger.info(f"Found {len(files)} files to analyze")
# Analyze all files
file_analyses = self.analyze_files(
files, deep=deep, parallel=parallel, progress_callback=progress_callback
)
# Build project analysis
project_analysis = ProjectAnalysis(
path=str(project_path),
name=project_path.name,
files=file_analyses,
total_files=len(file_analyses),
analyzed_files=len([f for f in file_analyses if not f.error]),
failed_files=len([f for f in file_analyses if f.error]),
)
# Calculate project-level metrics
self._calculate_project_metrics(project_analysis)
# Build dependency graph
project_analysis.dependency_graph = self._build_dependency_graph(file_analyses)
# Detect project type and framework
project_analysis.project_type = self._detect_project_type(project_path, file_analyses)
project_analysis.frameworks = self._detect_frameworks(file_analyses)
# Generate summary
project_analysis.summary = self._generate_project_summary(project_analysis)
return project_analysis
generate_report¶
generate_report(analysis: Union[FileAnalysis, ProjectAnalysis, list[FileAnalysis]], format: str = 'json', output_path: Optional[Path] = None) -> AnalysisReport
Generate an analysis report.
PARAMETER | DESCRIPTION |
---|---|
analysis | Analysis results to report on TYPE: |
format | Report format ('json', 'html', 'markdown', 'csv') TYPE: |
output_path | Optional path to save the report |
RETURNS | DESCRIPTION |
---|---|
AnalysisReport | AnalysisReport object |
Source code in tenets/core/analysis/analyzer.py
def generate_report(
self,
analysis: Union[FileAnalysis, ProjectAnalysis, list[FileAnalysis]],
format: str = "json",
output_path: Optional[Path] = None,
) -> AnalysisReport:
"""Generate an analysis report.
Args:
analysis: Analysis results to report on
format: Report format ('json', 'html', 'markdown', 'csv')
output_path: Optional path to save the report
Returns:
AnalysisReport object
"""
self.logger.info(f"Generating {format} report")
report = AnalysisReport(
timestamp=datetime.now(), format=format, statistics=self.stats.copy()
)
# Generate report content based on format
if format == "json":
report.content = self._generate_json_report(analysis)
elif format == "html":
report.content = self._generate_html_report(analysis)
elif format == "markdown":
report.content = self._generate_markdown_report(analysis)
elif format == "csv":
report.content = self._generate_csv_report(analysis)
else:
raise ValueError(f"Unsupported report format: {format}")
# Save report if output path provided
if output_path:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if format in ["json", "csv"]:
output_path.write_text(report.content)
else:
output_path.write_text(report.content, encoding="utf-8")
self.logger.info(f"Report saved to {output_path}")
report.output_path = str(output_path)
return report
shutdown¶
Shutdown the analyzer and clean up resources.