"""Source material scanner — discovers and reads platform content files.""" from __future__ import annotations import fnmatch from dataclasses import dataclass, field from pathlib import Path from typing import Protocol from rich.console import Console @dataclass(frozen=True) class SourceLocation: """A registered source of content to scan.""" name: str path: Path description: str file_patterns: list[str] = field(default_factory=lambda: ["*"]) @dataclass class Issue: """A detected inconsistency in platform content.""" severity: str # "critical", "high", "medium", "low" analyzer: str # Which analyzer found it message: str file: str line: int | None = None context: str | None = None # Surrounding text expected: str | None = None actual: str | None = None suggestion: str | None = None @property def severity_rank(self) -> int: return {"critical": 0, "high": 1, "medium": 2, "low": 3}.get(self.severity, 4) @dataclass class ScanResult: """Results from a full scan.""" issues: list[Issue] = field(default_factory=list) files_scanned: int = 0 sources_scanned: int = 0 @property def critical_count(self) -> int: return sum(1 for i in self.issues if i.severity == "critical") @property def high_count(self) -> int: return sum(1 for i in self.issues if i.severity == "high") @property def medium_count(self) -> int: return sum(1 for i in self.issues if i.severity == "medium") @property def low_count(self) -> int: return sum(1 for i in self.issues if i.severity == "low") def sorted_issues(self) -> list[Issue]: return sorted(self.issues, key=lambda i: i.severity_rank) class Analyzer(Protocol): """Protocol for content analyzers.""" name: str def analyze(self, sources: dict[str, SourceLocation]) -> list[Issue]: ... def discover_files(source: SourceLocation) -> list[Path]: """Discover all matching files for a source location. Args: source: Source location with path and file patterns. Returns: List of matching file paths. """ path = source.path if not path.exists(): return [] if path.is_file(): return [path] files: list[Path] = [] for pattern in source.file_patterns: for f in path.rglob(pattern): if f.is_file() and not any( part.startswith(".") or part == "node_modules" or part == "__pycache__" for part in f.parts ): files.append(f) return sorted(files) def read_file_lines(path: Path) -> list[str]: """Read a file and return lines with error handling.""" try: return path.read_text(encoding="utf-8", errors="replace").splitlines() except OSError: return [] def run_scan( analyzers: list[type[Analyzer]], sources: dict[str, SourceLocation] | None = None, source_filter: str | None = None, path_filter: Path | None = None, console: Console | None = None, ) -> ScanResult: """Run a scan with the given analyzers. Args: analyzers: List of analyzer classes to instantiate and run. sources: Dictionary of source locations to scan. If None, caller must provide. source_filter: Optional source name filter (e.g., "seo", "terminology"). path_filter: Optional path to restrict scanning to. console: Rich console for progress output. Returns: ScanResult with all found issues. """ if sources is None: raise ValueError("sources parameter is required") all_sources = dict(sources) # Apply source filter if source_filter: filtered = {} for name, source in all_sources.items(): if source_filter.lower() in name.lower(): filtered[name] = source if not filtered: if console: console.print(f"[yellow]No sources matching '{source_filter}'[/yellow]") return ScanResult() all_sources = filtered # Apply path filter if path_filter: path_filter = path_filter.expanduser().resolve() all_sources = { name: SourceLocation( name=source.name, path=path_filter, description=f"Custom path: {path_filter}", file_patterns=source.file_patterns, ) for name, source in all_sources.items() } result = ScanResult() result.sources_scanned = len(all_sources) # Count files for source in all_sources.values(): result.files_scanned += len(discover_files(source)) # Run each analyzer for analyzer_cls in analyzers: analyzer = analyzer_cls() if console: console.print(f" Running [cyan]{analyzer.name}[/cyan]...") issues = analyzer.analyze(all_sources) result.issues.extend(issues) return result