ml-knowledge-platform/knowledge_platform/scanner.py
2026-02-16 04:50:51 -08:00

177 lines
4.9 KiB
Python

"""Source material scanner — discovers and reads platform content files."""
from __future__ import annotations
import fnmatch
from dataclasses import dataclass, field
from pathlib import Path
from typing import Protocol
from rich.console import Console
@dataclass(frozen=True)
class SourceLocation:
"""A registered source of content to scan."""
name: str
path: Path
description: str
file_patterns: list[str] = field(default_factory=lambda: ["*"])
@dataclass
class Issue:
"""A detected inconsistency in platform content."""
severity: str # "critical", "high", "medium", "low"
analyzer: str # Which analyzer found it
message: str
file: str
line: int | None = None
context: str | None = None # Surrounding text
expected: str | None = None
actual: str | None = None
suggestion: str | None = None
@property
def severity_rank(self) -> int:
return {"critical": 0, "high": 1, "medium": 2, "low": 3}.get(self.severity, 4)
@dataclass
class ScanResult:
"""Results from a full scan."""
issues: list[Issue] = field(default_factory=list)
files_scanned: int = 0
sources_scanned: int = 0
@property
def critical_count(self) -> int:
return sum(1 for i in self.issues if i.severity == "critical")
@property
def high_count(self) -> int:
return sum(1 for i in self.issues if i.severity == "high")
@property
def medium_count(self) -> int:
return sum(1 for i in self.issues if i.severity == "medium")
@property
def low_count(self) -> int:
return sum(1 for i in self.issues if i.severity == "low")
def sorted_issues(self) -> list[Issue]:
return sorted(self.issues, key=lambda i: i.severity_rank)
class Analyzer(Protocol):
"""Protocol for content analyzers."""
name: str
def analyze(self, sources: dict[str, SourceLocation]) -> list[Issue]: ...
def discover_files(source: SourceLocation) -> list[Path]:
"""Discover all matching files for a source location.
Args:
source: Source location with path and file patterns.
Returns:
List of matching file paths.
"""
path = source.path
if not path.exists():
return []
if path.is_file():
return [path]
files: list[Path] = []
for pattern in source.file_patterns:
for f in path.rglob(pattern):
if f.is_file() and not any(
part.startswith(".") or part == "node_modules" or part == "__pycache__"
for part in f.parts
):
files.append(f)
return sorted(files)
def read_file_lines(path: Path) -> list[str]:
"""Read a file and return lines with error handling."""
try:
return path.read_text(encoding="utf-8", errors="replace").splitlines()
except OSError:
return []
def run_scan(
analyzers: list[type[Analyzer]],
sources: dict[str, SourceLocation] | None = None,
source_filter: str | None = None,
path_filter: Path | None = None,
console: Console | None = None,
) -> ScanResult:
"""Run a scan with the given analyzers.
Args:
analyzers: List of analyzer classes to instantiate and run.
sources: Dictionary of source locations to scan. If None, caller must provide.
source_filter: Optional source name filter (e.g., "seo", "terminology").
path_filter: Optional path to restrict scanning to.
console: Rich console for progress output.
Returns:
ScanResult with all found issues.
"""
if sources is None:
raise ValueError("sources parameter is required")
all_sources = dict(sources)
# Apply source filter
if source_filter:
filtered = {}
for name, source in all_sources.items():
if source_filter.lower() in name.lower():
filtered[name] = source
if not filtered:
if console:
console.print(f"[yellow]No sources matching '{source_filter}'[/yellow]")
return ScanResult()
all_sources = filtered
# Apply path filter
if path_filter:
path_filter = path_filter.expanduser().resolve()
all_sources = {
name: SourceLocation(
name=source.name,
path=path_filter,
description=f"Custom path: {path_filter}",
file_patterns=source.file_patterns,
)
for name, source in all_sources.items()
}
result = ScanResult()
result.sources_scanned = len(all_sources)
# Count files
for source in all_sources.values():
result.files_scanned += len(discover_files(source))
# Run each analyzer
for analyzer_cls in analyzers:
analyzer = analyzer_cls()
if console:
console.print(f" Running [cyan]{analyzer.name}[/cyan]...")
issues = analyzer.analyze(all_sources)
result.issues.extend(issues)
return result