Skip to content

analyzer

Full name: tenets.core.analysis.analyzer

analyzer

Main code analyzer orchestrator for Tenets.

This module coordinates language-specific analyzers and provides a unified interface for analyzing source code files. It handles analyzer selection, caching, parallel processing, and fallback strategies.

Classes

CodeAnalyzer

Python
CodeAnalyzer(config: TenetsConfig)

Main code analysis orchestrator.

Coordinates language-specific analyzers and provides a unified interface for analyzing source code files. Handles caching, parallel processing, analyzer selection, and fallback strategies.

ATTRIBUTEDESCRIPTION
config

TenetsConfig instance for configuration

logger

Logger instance for logging

cache

AnalysisCache for caching analysis results

analyzers

Dictionary mapping file extensions to analyzer instances

stats

Analysis statistics and metrics

Initialize the code analyzer.

PARAMETERDESCRIPTION
config

Tenets configuration object

TYPE:TenetsConfig

Source code in tenets/core/analysis/analyzer.py
Python
def __init__(self, config: TenetsConfig):
    """Initialize the code analyzer.

    Args:
        config: Tenets configuration object
    """
    self.config = config
    self.logger = get_logger(__name__)

    # Initialize cache if enabled
    self.cache = None
    if config.cache.enabled:
        self.cache = AnalysisCache(config.cache.directory)
        self.logger.info(f"Cache initialized at {config.cache.directory}")

    # Initialize language analyzers
    self.analyzers = self._initialize_analyzers()

    # Thread pool for parallel analysis
    self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.scanner.workers)

    # Analysis statistics
    self.stats = {
        "files_analyzed": 0,
        "cache_hits": 0,
        "cache_misses": 0,
        "errors": 0,
        "total_time": 0,
        "languages": {},
    }

    self.logger.info(f"CodeAnalyzer initialized with {len(self.analyzers)} language analyzers")
Functions
analyze_file
Python
analyze_file(file_path: Path, deep: bool = False, extract_keywords: bool = True, use_cache: bool = True, progress_callback: Optional[Callable] = None) -> FileAnalysis

Analyze a single file.

Performs language-specific analysis on a file, extracting imports, structure, complexity metrics, and other relevant information.

PARAMETERDESCRIPTION
file_path

Path to the file to analyze

TYPE:Path

deep

Whether to perform deep analysis (AST parsing, etc.)

TYPE:boolDEFAULT:False

extract_keywords

Whether to extract keywords from content

TYPE:boolDEFAULT:True

use_cache

Whether to use cached results if available

TYPE:boolDEFAULT:True

progress_callback

Optional callback for progress updates

TYPE:Optional[Callable]DEFAULT:None

RETURNSDESCRIPTION
FileAnalysis

FileAnalysis object with complete analysis results

RAISESDESCRIPTION
FileNotFoundError

If file doesn't exist

PermissionError

If file cannot be read

Source code in tenets/core/analysis/analyzer.py
Python
def analyze_file(
    self,
    file_path: Path,
    deep: bool = False,
    extract_keywords: bool = True,
    use_cache: bool = True,
    progress_callback: Optional[Callable] = None,
) -> FileAnalysis:
    """Analyze a single file.

    Performs language-specific analysis on a file, extracting imports,
    structure, complexity metrics, and other relevant information.

    Args:
        file_path: Path to the file to analyze
        deep: Whether to perform deep analysis (AST parsing, etc.)
        extract_keywords: Whether to extract keywords from content
        use_cache: Whether to use cached results if available
        progress_callback: Optional callback for progress updates

    Returns:
        FileAnalysis object with complete analysis results

    Raises:
        FileNotFoundError: If file doesn't exist
        PermissionError: If file cannot be read
    """
    file_path = Path(file_path)

    # Check cache first
    if use_cache and self.cache:
        cached_analysis = self.cache.get_file_analysis(file_path)
        if cached_analysis:
            self.stats["cache_hits"] += 1
            self.logger.debug(f"Cache hit for {file_path}")

            if progress_callback:
                progress_callback("cache_hit", file_path)

            return cached_analysis
        else:
            self.stats["cache_misses"] += 1

    self.logger.debug(f"Analyzing file: {file_path}")

    try:
        # Read file content
        content = self._read_file_content(file_path)

        # Create base analysis
        analysis = FileAnalysis(
            path=str(file_path),
            content=content,
            size=file_path.stat().st_size,
            lines=content.count("\n") + 1,
            language=self._detect_language(file_path),
            file_name=file_path.name,
            file_extension=file_path.suffix,
            last_modified=datetime.fromtimestamp(file_path.stat().st_mtime),
            hash=self._calculate_file_hash(content),
        )

        # Get appropriate analyzer
        analyzer = self._get_analyzer(file_path)

        if analyzer is None and deep:
            analyzer = GenericAnalyzer()

        if analyzer and deep:
            try:
                # Run language-specific analysis
                self.logger.debug(f"Running {analyzer.language_name} analyzer on {file_path}")
                analysis_results = analyzer.analyze(content, file_path)

                # Update analysis object with results
                # Collect results
                imports = analysis_results.get("imports", [])
                analysis.imports = imports
                analysis.exports = analysis_results.get("exports", [])
                structure = analysis_results.get("structure", CodeStructure())
                # Ensure imports are accessible via structure as well for downstream tools
                try:
                    if hasattr(structure, "imports"):
                        # Only set if empty to respect analyzers that already populate it
                        if not getattr(structure, "imports", None):
                            structure.imports = imports
                except Exception:
                    # Be defensive; never fail analysis due to structure syncing
                    pass
                analysis.structure = structure
                analysis.complexity = analysis_results.get("complexity", ComplexityMetrics())

                # Extract additional information
                if analysis.structure:
                    analysis.classes = analysis.structure.classes
                    analysis.functions = analysis.structure.functions
                    analysis.modules = getattr(analysis.structure, "modules", [])

            except Exception as e:
                self.logger.warning(f"Language-specific analysis failed for {file_path}: {e}")
                analysis.error = str(e)
                self.stats["errors"] += 1

        # Extract keywords if requested
        if extract_keywords:
            analysis.keywords = self._extract_keywords(content, analysis.language)

        # Add code quality metrics
        analysis.quality_score = self._calculate_quality_score(analysis)

        # Cache the result
        if use_cache and self.cache and not analysis.error:
            try:
                self.cache.put_file_analysis(file_path, analysis)
            except Exception as e:
                self.logger.debug(f"Failed to write analysis cache for {file_path}: {e}")
                analysis.error = "Cache write error"

        # Update statistics
        self.stats["files_analyzed"] += 1
        self.stats["languages"][analysis.language] = (
            self.stats["languages"].get(analysis.language, 0) + 1
        )

        if progress_callback:
            progress_callback("analyzed", file_path)

        return analysis

    except FileNotFoundError:
        # Propagate not found to satisfy tests expecting exception
        self.logger.error(f"File not found: {file_path}")
        raise
    except Exception as e:
        self.logger.error(f"Failed to analyze {file_path}: {e}")
        self.stats["errors"] += 1

        return FileAnalysis(
            path=str(file_path),
            error=str(e),
            file_name=file_path.name,
            file_extension=file_path.suffix,
        )
analyze_files
Python
analyze_files(file_paths: list[Path], deep: bool = False, parallel: bool = True, progress_callback: Optional[Callable] = None) -> list[FileAnalysis]

Analyze multiple files.

PARAMETERDESCRIPTION
file_paths

List of file paths to analyze

TYPE:list[Path]

deep

Whether to perform deep analysis

TYPE:boolDEFAULT:False

parallel

Whether to analyze files in parallel

TYPE:boolDEFAULT:True

progress_callback

Optional callback for progress updates

TYPE:Optional[Callable]DEFAULT:None

RETURNSDESCRIPTION
list[FileAnalysis]

List of FileAnalysis objects

Source code in tenets/core/analysis/analyzer.py
Python
def analyze_files(
    self,
    file_paths: list[Path],
    deep: bool = False,
    parallel: bool = True,
    progress_callback: Optional[Callable] = None,
) -> list[FileAnalysis]:
    """Analyze multiple files.

    Args:
        file_paths: List of file paths to analyze
        deep: Whether to perform deep analysis
        parallel: Whether to analyze files in parallel
        progress_callback: Optional callback for progress updates

    Returns:
        List of FileAnalysis objects
    """
    self.logger.info(f"Analyzing {len(file_paths)} files (parallel={parallel})")

    if parallel and len(file_paths) > 1:
        # Parallel analysis
        futures = []
        for file_path in file_paths:
            future = self._executor.submit(
                self.analyze_file, file_path, deep=deep, progress_callback=progress_callback
            )
            futures.append((future, file_path))

        # Collect results
        results = []
        for future, file_path in futures:
            try:
                result = future.result(timeout=self.config.scanner.timeout)
                results.append(result)
            except concurrent.futures.TimeoutError:
                self.logger.warning(f"Analysis timeout for {file_path}")
                results.append(FileAnalysis(path=str(file_path), error="Analysis timeout"))
            except Exception as e:
                self.logger.warning(f"Failed to analyze {file_path}: {e}")
                results.append(FileAnalysis(path=str(file_path), error=str(e)))

        return results
    else:
        # Sequential analysis
        results = []
        for i, file_path in enumerate(file_paths):
            result = self.analyze_file(file_path, deep=deep)
            results.append(result)

            if progress_callback:
                progress_callback(i + 1, len(file_paths))

        return results
analyze_project
Python
analyze_project(project_path: Path, patterns: Optional[list[str]] = None, exclude_patterns: Optional[list[str]] = None, deep: bool = True, parallel: bool = True, progress_callback: Optional[Callable] = None) -> ProjectAnalysis

Analyze an entire project.

PARAMETERDESCRIPTION
project_path

Path to the project root

TYPE:Path

patterns

File patterns to include (e.g., ['.py', '.js'])

TYPE:Optional[list[str]]DEFAULT:None

exclude_patterns

File patterns to exclude

TYPE:Optional[list[str]]DEFAULT:None

deep

Whether to perform deep analysis

TYPE:boolDEFAULT:True

parallel

Whether to analyze files in parallel

TYPE:boolDEFAULT:True

progress_callback

Optional callback for progress updates

TYPE:Optional[Callable]DEFAULT:None

RETURNSDESCRIPTION
ProjectAnalysis

ProjectAnalysis object with complete project analysis

Source code in tenets/core/analysis/analyzer.py
Python
def analyze_project(
    self,
    project_path: Path,
    patterns: Optional[list[str]] = None,
    exclude_patterns: Optional[list[str]] = None,
    deep: bool = True,
    parallel: bool = True,
    progress_callback: Optional[Callable] = None,
) -> ProjectAnalysis:
    """Analyze an entire project.

    Args:
        project_path: Path to the project root
        patterns: File patterns to include (e.g., ['*.py', '*.js'])
        exclude_patterns: File patterns to exclude
        deep: Whether to perform deep analysis
        parallel: Whether to analyze files in parallel
        progress_callback: Optional callback for progress updates

    Returns:
        ProjectAnalysis object with complete project analysis
    """
    self.logger.info(f"Analyzing project: {project_path}")

    # Collect files to analyze
    files = self._collect_project_files(project_path, patterns, exclude_patterns)

    self.logger.info(f"Found {len(files)} files to analyze")

    # Analyze all files
    file_analyses = self.analyze_files(
        files, deep=deep, parallel=parallel, progress_callback=progress_callback
    )

    # Build project analysis
    project_analysis = ProjectAnalysis(
        path=str(project_path),
        name=project_path.name,
        files=file_analyses,
        total_files=len(file_analyses),
        analyzed_files=len([f for f in file_analyses if not f.error]),
        failed_files=len([f for f in file_analyses if f.error]),
    )

    # Calculate project-level metrics
    self._calculate_project_metrics(project_analysis)

    # Build dependency graph
    project_analysis.dependency_graph = self._build_dependency_graph(file_analyses)

    # Detect project type and framework
    project_analysis.project_type = self._detect_project_type(project_path, file_analyses)
    project_analysis.frameworks = self._detect_frameworks(file_analyses)

    # Generate summary
    project_analysis.summary = self._generate_project_summary(project_analysis)

    return project_analysis
generate_report
Python
generate_report(analysis: Union[FileAnalysis, ProjectAnalysis, list[FileAnalysis]], format: str = 'json', output_path: Optional[Path] = None) -> AnalysisReport

Generate an analysis report.

PARAMETERDESCRIPTION
analysis

Analysis results to report on

TYPE:Union[FileAnalysis, ProjectAnalysis, list[FileAnalysis]]

format

Report format ('json', 'html', 'markdown', 'csv')

TYPE:strDEFAULT:'json'

output_path

Optional path to save the report

TYPE:Optional[Path]DEFAULT:None

RETURNSDESCRIPTION
AnalysisReport

AnalysisReport object

Source code in tenets/core/analysis/analyzer.py
Python
def generate_report(
    self,
    analysis: Union[FileAnalysis, ProjectAnalysis, list[FileAnalysis]],
    format: str = "json",
    output_path: Optional[Path] = None,
) -> AnalysisReport:
    """Generate an analysis report.

    Args:
        analysis: Analysis results to report on
        format: Report format ('json', 'html', 'markdown', 'csv')
        output_path: Optional path to save the report

    Returns:
        AnalysisReport object
    """
    self.logger.info(f"Generating {format} report")

    report = AnalysisReport(
        timestamp=datetime.now(), format=format, statistics=self.stats.copy()
    )

    # Generate report content based on format
    if format == "json":
        report.content = self._generate_json_report(analysis)
    elif format == "html":
        report.content = self._generate_html_report(analysis)
    elif format == "markdown":
        report.content = self._generate_markdown_report(analysis)
    elif format == "csv":
        report.content = self._generate_csv_report(analysis)
    else:
        raise ValueError(f"Unsupported report format: {format}")

    # Save report if output path provided
    if output_path:
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        if format in ["json", "csv"]:
            output_path.write_text(report.content)
        else:
            output_path.write_text(report.content, encoding="utf-8")

        self.logger.info(f"Report saved to {output_path}")
        report.output_path = str(output_path)

    return report
shutdown
Python
shutdown()

Shutdown the analyzer and clean up resources.

Source code in tenets/core/analysis/analyzer.py
Python
def shutdown(self):
    """Shutdown the analyzer and clean up resources."""
    self._executor.shutdown(wait=True)

    if self.cache:
        self.cache.close()

    self.logger.info("CodeAnalyzer shutdown complete")
    self.logger.info(f"Analysis statistics: {self.stats}")

Functions