279 lines
9.4 KiB
Python
279 lines
9.4 KiB
Python
"""Feedback analyzer for extracting training insights.
|
|
|
|
Analyzes correction logs, validation events, and search patterns to identify:
|
|
- Frequent correction patterns (e.g., "escort" → "creator")
|
|
- Knowledge gaps (low-confidence topics)
|
|
- Common validation failures
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .storage import FeedbackStorage
|
|
|
|
|
|
@dataclass
|
|
class CorrectionPattern:
|
|
"""A frequently occurring correction pattern."""
|
|
|
|
original: str
|
|
replacement: str
|
|
count: int
|
|
change_type: str
|
|
confidence_avg: float
|
|
|
|
|
|
@dataclass
|
|
class KnowledgeGap:
|
|
"""A topic with consistently low confidence or failed searches."""
|
|
|
|
topic: str
|
|
event_count: int
|
|
avg_confidence: float | None
|
|
event_type: str # "validation" or "search"
|
|
|
|
|
|
class FeedbackAnalyzer:
|
|
"""Analyzes feedback logs to extract training insights."""
|
|
|
|
def __init__(self, storage_dir: Path) -> None:
|
|
"""Initialize analyzer with feedback storage directory.
|
|
|
|
Args:
|
|
storage_dir: Directory containing feedback logs
|
|
"""
|
|
self.storage = FeedbackStorage(storage_dir)
|
|
|
|
def extract_frequent_corrections(
|
|
self,
|
|
min_count: int = 5,
|
|
days: int = 30,
|
|
) -> list[CorrectionPattern]:
|
|
"""Find correction patterns that occur frequently.
|
|
|
|
Args:
|
|
min_count: Minimum occurrences to be considered frequent
|
|
days: Number of days to analyze
|
|
|
|
Returns:
|
|
List of frequent correction patterns, sorted by count descending
|
|
"""
|
|
# Read all correction events
|
|
corrections: list[dict[str, Any]] = list(
|
|
self.storage.read_events("corrections", days)
|
|
)
|
|
|
|
if not corrections:
|
|
return []
|
|
|
|
# Track patterns: (original, replacement, type) -> count and confidences
|
|
pattern_counts: Counter[tuple[str, str, str]] = Counter()
|
|
pattern_confidences: dict[tuple[str, str, str], list[float]] = {}
|
|
|
|
for event in corrections:
|
|
changes = event.get("changes", [])
|
|
confidence = event.get("confidence", 0.0)
|
|
|
|
for change in changes:
|
|
if not isinstance(change, dict):
|
|
continue
|
|
|
|
original = change.get("original", "").strip()
|
|
replacement = change.get("replacement", "").strip()
|
|
change_type = change.get("type", "unknown")
|
|
|
|
if not original or not replacement:
|
|
continue
|
|
|
|
# Normalize for comparison (case-insensitive)
|
|
key = (original.lower(), replacement.lower(), change_type)
|
|
pattern_counts[key] += 1
|
|
|
|
if key not in pattern_confidences:
|
|
pattern_confidences[key] = []
|
|
pattern_confidences[key].append(confidence)
|
|
|
|
# Filter by minimum count and build results
|
|
frequent: list[CorrectionPattern] = []
|
|
for (original, replacement, change_type), count in pattern_counts.items():
|
|
if count >= min_count:
|
|
confidences = pattern_confidences[(original, replacement, change_type)]
|
|
avg_confidence = sum(confidences) / len(confidences)
|
|
|
|
frequent.append(
|
|
CorrectionPattern(
|
|
original=original,
|
|
replacement=replacement,
|
|
count=count,
|
|
change_type=change_type,
|
|
confidence_avg=avg_confidence,
|
|
)
|
|
)
|
|
|
|
# Sort by count descending
|
|
frequent.sort(key=lambda p: p.count, reverse=True)
|
|
return frequent
|
|
|
|
def identify_knowledge_gaps(
|
|
self,
|
|
days: int = 30,
|
|
confidence_threshold: float = 0.5,
|
|
min_occurrences: int = 3,
|
|
) -> list[KnowledgeGap]:
|
|
"""Identify topics with consistently low confidence.
|
|
|
|
Args:
|
|
days: Number of days to analyze
|
|
confidence_threshold: Max confidence to consider a gap
|
|
min_occurrences: Minimum occurrences to report as a gap
|
|
|
|
Returns:
|
|
List of knowledge gaps, sorted by event count descending
|
|
"""
|
|
gaps: list[KnowledgeGap] = []
|
|
|
|
# Analyze low-confidence validations
|
|
validations = list(self.storage.read_events("validations", days))
|
|
validation_topics: dict[str, list[float]] = {}
|
|
|
|
for event in validations:
|
|
confidence = event.get("confidence", 0.0)
|
|
if confidence > confidence_threshold:
|
|
continue
|
|
|
|
# Extract topics from subjects
|
|
subjects = event.get("subjects", [])
|
|
if not subjects:
|
|
# Try to infer topic from content (first few words)
|
|
content = event.get("content", "")
|
|
topics = [content.split()[:3]]
|
|
else:
|
|
topics = subjects
|
|
|
|
for topic in topics:
|
|
if isinstance(topic, list):
|
|
topic = " ".join(str(t) for t in topic)
|
|
topic = str(topic).strip().lower()
|
|
|
|
if not topic:
|
|
continue
|
|
|
|
if topic not in validation_topics:
|
|
validation_topics[topic] = []
|
|
validation_topics[topic].append(confidence)
|
|
|
|
# Build gaps from validation data
|
|
for topic, confidences in validation_topics.items():
|
|
if len(confidences) >= min_occurrences:
|
|
gaps.append(
|
|
KnowledgeGap(
|
|
topic=topic,
|
|
event_count=len(confidences),
|
|
avg_confidence=sum(confidences) / len(confidences),
|
|
event_type="validation",
|
|
)
|
|
)
|
|
|
|
# Analyze low-relevance searches
|
|
searches = list(self.storage.read_events("searches", days))
|
|
search_topics: Counter[str] = Counter()
|
|
|
|
for event in searches:
|
|
max_score = event.get("max_score", 0.0)
|
|
if max_score > confidence_threshold:
|
|
continue
|
|
|
|
query = event.get("query", "").strip().lower()
|
|
if query:
|
|
search_topics[query] += 1
|
|
|
|
# Build gaps from search data
|
|
for query, count in search_topics.items():
|
|
if count >= min_occurrences:
|
|
gaps.append(
|
|
KnowledgeGap(
|
|
topic=query,
|
|
event_count=count,
|
|
avg_confidence=None, # Searches don't have confidence
|
|
event_type="search",
|
|
)
|
|
)
|
|
|
|
# Sort by event count descending
|
|
gaps.sort(key=lambda g: g.event_count, reverse=True)
|
|
return gaps
|
|
|
|
def generate_training_report(
|
|
self,
|
|
days: int = 30,
|
|
output_file: Path | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Generate a comprehensive training report from feedback.
|
|
|
|
Args:
|
|
days: Number of days to analyze
|
|
output_file: Optional path to write JSONL training pairs
|
|
|
|
Returns:
|
|
Report dictionary with corrections, gaps, and statistics
|
|
"""
|
|
corrections = self.extract_frequent_corrections(min_count=5, days=days)
|
|
gaps = self.identify_knowledge_gaps(days=days, min_occurrences=3)
|
|
|
|
# Count events
|
|
correction_count = self.storage.count_events("corrections", days)
|
|
validation_count = self.storage.count_events("validations", days)
|
|
search_count = self.storage.count_events("searches", days)
|
|
|
|
report = {
|
|
"analysis_period_days": days,
|
|
"event_counts": {
|
|
"corrections": correction_count,
|
|
"validations": validation_count,
|
|
"searches": search_count,
|
|
},
|
|
"frequent_corrections": [
|
|
{
|
|
"original": p.original,
|
|
"replacement": p.replacement,
|
|
"count": p.count,
|
|
"type": p.change_type,
|
|
"avg_confidence": round(p.confidence_avg, 2),
|
|
}
|
|
for p in corrections[:20] # Top 20
|
|
],
|
|
"knowledge_gaps": [
|
|
{
|
|
"topic": g.topic,
|
|
"count": g.event_count,
|
|
"avg_confidence": (
|
|
round(g.avg_confidence, 2) if g.avg_confidence else None
|
|
),
|
|
"source": g.event_type,
|
|
}
|
|
for g in gaps[:20] # Top 20
|
|
],
|
|
}
|
|
|
|
# Write training pairs if requested
|
|
if output_file:
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
import json
|
|
|
|
# Write correction patterns as training pairs
|
|
for pattern in corrections[:50]: # Top 50
|
|
pair = {
|
|
"premise": f"The text says: {pattern.original}",
|
|
"hypothesis": f"The text says: {pattern.replacement}",
|
|
"label": "contradiction", # Original contradicts knowledge
|
|
"source": "user_corrections",
|
|
"count": pattern.count,
|
|
}
|
|
f.write(json.dumps(pair) + "\n")
|
|
|
|
return report
|