"""Feedback analyzer for extracting training insights. Analyzes correction logs, validation events, and search patterns to identify: - Frequent correction patterns (e.g., "escort" → "creator") - Knowledge gaps (low-confidence topics) - Common validation failures """ from __future__ import annotations from collections import Counter from dataclasses import dataclass from pathlib import Path from typing import Any from .storage import FeedbackStorage @dataclass class CorrectionPattern: """A frequently occurring correction pattern.""" original: str replacement: str count: int change_type: str confidence_avg: float @dataclass class KnowledgeGap: """A topic with consistently low confidence or failed searches.""" topic: str event_count: int avg_confidence: float | None event_type: str # "validation" or "search" class FeedbackAnalyzer: """Analyzes feedback logs to extract training insights.""" def __init__(self, storage_dir: Path) -> None: """Initialize analyzer with feedback storage directory. Args: storage_dir: Directory containing feedback logs """ self.storage = FeedbackStorage(storage_dir) def extract_frequent_corrections( self, min_count: int = 5, days: int = 30, ) -> list[CorrectionPattern]: """Find correction patterns that occur frequently. Args: min_count: Minimum occurrences to be considered frequent days: Number of days to analyze Returns: List of frequent correction patterns, sorted by count descending """ # Read all correction events corrections: list[dict[str, Any]] = list( self.storage.read_events("corrections", days) ) if not corrections: return [] # Track patterns: (original, replacement, type) -> count and confidences pattern_counts: Counter[tuple[str, str, str]] = Counter() pattern_confidences: dict[tuple[str, str, str], list[float]] = {} for event in corrections: changes = event.get("changes", []) confidence = event.get("confidence", 0.0) for change in changes: if not isinstance(change, dict): continue original = change.get("original", "").strip() replacement = change.get("replacement", "").strip() change_type = change.get("type", "unknown") if not original or not replacement: continue # Normalize for comparison (case-insensitive) key = (original.lower(), replacement.lower(), change_type) pattern_counts[key] += 1 if key not in pattern_confidences: pattern_confidences[key] = [] pattern_confidences[key].append(confidence) # Filter by minimum count and build results frequent: list[CorrectionPattern] = [] for (original, replacement, change_type), count in pattern_counts.items(): if count >= min_count: confidences = pattern_confidences[(original, replacement, change_type)] avg_confidence = sum(confidences) / len(confidences) frequent.append( CorrectionPattern( original=original, replacement=replacement, count=count, change_type=change_type, confidence_avg=avg_confidence, ) ) # Sort by count descending frequent.sort(key=lambda p: p.count, reverse=True) return frequent def identify_knowledge_gaps( self, days: int = 30, confidence_threshold: float = 0.5, min_occurrences: int = 3, ) -> list[KnowledgeGap]: """Identify topics with consistently low confidence. Args: days: Number of days to analyze confidence_threshold: Max confidence to consider a gap min_occurrences: Minimum occurrences to report as a gap Returns: List of knowledge gaps, sorted by event count descending """ gaps: list[KnowledgeGap] = [] # Analyze low-confidence validations validations = list(self.storage.read_events("validations", days)) validation_topics: dict[str, list[float]] = {} for event in validations: confidence = event.get("confidence", 0.0) if confidence > confidence_threshold: continue # Extract topics from subjects subjects = event.get("subjects", []) if not subjects: # Try to infer topic from content (first few words) content = event.get("content", "") topics = [content.split()[:3]] else: topics = subjects for topic in topics: if isinstance(topic, list): topic = " ".join(str(t) for t in topic) topic = str(topic).strip().lower() if not topic: continue if topic not in validation_topics: validation_topics[topic] = [] validation_topics[topic].append(confidence) # Build gaps from validation data for topic, confidences in validation_topics.items(): if len(confidences) >= min_occurrences: gaps.append( KnowledgeGap( topic=topic, event_count=len(confidences), avg_confidence=sum(confidences) / len(confidences), event_type="validation", ) ) # Analyze low-relevance searches searches = list(self.storage.read_events("searches", days)) search_topics: Counter[str] = Counter() for event in searches: max_score = event.get("max_score", 0.0) if max_score > confidence_threshold: continue query = event.get("query", "").strip().lower() if query: search_topics[query] += 1 # Build gaps from search data for query, count in search_topics.items(): if count >= min_occurrences: gaps.append( KnowledgeGap( topic=query, event_count=count, avg_confidence=None, # Searches don't have confidence event_type="search", ) ) # Sort by event count descending gaps.sort(key=lambda g: g.event_count, reverse=True) return gaps def generate_training_report( self, days: int = 30, output_file: Path | None = None, ) -> dict[str, Any]: """Generate a comprehensive training report from feedback. Args: days: Number of days to analyze output_file: Optional path to write JSONL training pairs Returns: Report dictionary with corrections, gaps, and statistics """ corrections = self.extract_frequent_corrections(min_count=5, days=days) gaps = self.identify_knowledge_gaps(days=days, min_occurrences=3) # Count events correction_count = self.storage.count_events("corrections", days) validation_count = self.storage.count_events("validations", days) search_count = self.storage.count_events("searches", days) report = { "analysis_period_days": days, "event_counts": { "corrections": correction_count, "validations": validation_count, "searches": search_count, }, "frequent_corrections": [ { "original": p.original, "replacement": p.replacement, "count": p.count, "type": p.change_type, "avg_confidence": round(p.confidence_avg, 2), } for p in corrections[:20] # Top 20 ], "knowledge_gaps": [ { "topic": g.topic, "count": g.event_count, "avg_confidence": ( round(g.avg_confidence, 2) if g.avg_confidence else None ), "source": g.event_type, } for g in gaps[:20] # Top 20 ], } # Write training pairs if requested if output_file: output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: import json # Write correction patterns as training pairs for pattern in corrections[:50]: # Top 50 pair = { "premise": f"The text says: {pattern.original}", "hypothesis": f"The text says: {pattern.replacement}", "label": "contradiction", # Original contradicts knowledge "source": "user_corrections", "count": pattern.count, } f.write(json.dumps(pair) + "\n") return report