"""User conversation statistics tracker. Tracks per-user interaction patterns to enable adaptive system prompts: - Frequent correction patterns - Topic focus areas - Confidence trends - Error types """ from __future__ import annotations import json from collections import Counter, defaultdict from dataclasses import asdict, dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Any from .storage import FeedbackStorage @dataclass class CorrectionStats: """Statistics about user corrections.""" total_corrections: int frequent_patterns: list[tuple[str, str, int]] # (original, replacement, count) common_error_types: list[tuple[str, int]] # (type, count) avg_confidence: float @dataclass class TopicStats: """Statistics about topics the user interacts with.""" primary_topics: list[tuple[str, int]] # (topic, frequency) low_confidence_topics: list[tuple[str, float]] # (topic, avg_confidence) @dataclass class UserStats: """Aggregated user statistics for adaptive prompting.""" user_id: str period_start: str # ISO timestamp period_end: str # ISO timestamp total_interactions: int corrections: CorrectionStats | None topics: TopicStats | None last_updated: str # ISO timestamp class UserStatsTracker: """Tracks and analyzes per-user conversation statistics.""" def __init__(self, storage_dir: Path, cache_ttl_hours: int = 24) -> None: """Initialize user stats tracker. Args: storage_dir: Directory containing feedback logs cache_ttl_hours: Hours to cache user stats before recomputing """ self.storage = FeedbackStorage(storage_dir) self.cache_dir = storage_dir / "user-stats" self.cache_dir.mkdir(exist_ok=True) self.cache_ttl = timedelta(hours=cache_ttl_hours) def get_user_stats( self, user_id: str, days: int = 30, min_correction_count: int = 3, ) -> UserStats: """Get statistics for a specific user. Args: user_id: User identifier days: Number of days to analyze min_correction_count: Minimum occurrences to count as frequent Returns: UserStats with aggregated metrics """ # Check cache first cache_file = self.cache_dir / f"{user_id}.json" if cache_file.exists(): cache_data = json.loads(cache_file.read_text()) cache_time = datetime.fromisoformat(cache_data["last_updated"]) if datetime.now() - cache_time < self.cache_ttl: return self._stats_from_dict(cache_data) # Compute stats from feedback logs period_start = datetime.now() - timedelta(days=days) period_end = datetime.now() corrections_stats = self._compute_correction_stats( user_id, days, min_correction_count ) topic_stats = self._compute_topic_stats(user_id, days) total_interactions = ( (corrections_stats.total_corrections if corrections_stats else 0) + sum(count for _, count in (topic_stats.primary_topics if topic_stats else [])) ) stats = UserStats( user_id=user_id, period_start=period_start.isoformat(), period_end=period_end.isoformat(), total_interactions=total_interactions, corrections=corrections_stats, topics=topic_stats, last_updated=datetime.now().isoformat(), ) # Cache for future requests cache_file.write_text(json.dumps(asdict(stats), indent=2)) return stats def _compute_correction_stats( self, user_id: str, days: int, min_count: int ) -> CorrectionStats | None: """Compute correction statistics for a user.""" corrections = [ event for event in self.storage.read_events("corrections", days) if event.get("conversation_id", "").startswith(user_id) ] if not corrections: return None # Count patterns pattern_counts: Counter[tuple[str, str]] = Counter() type_counts: Counter[str] = Counter() confidences: list[float] = [] for event in corrections: for change in event.get("changes", []): original = change.get("original", "").strip().lower() replacement = change.get("replacement", "").strip().lower() change_type = change.get("type", "unknown") if original and replacement: pattern_counts[(original, replacement)] += 1 type_counts[change_type] += 1 confidence = event.get("confidence", 0.0) if confidence > 0: confidences.append(confidence) # Filter by min count and format frequent_patterns = [ (orig, repl, count) for (orig, repl), count in pattern_counts.most_common(10) if count >= min_count ] return CorrectionStats( total_corrections=len(corrections), frequent_patterns=frequent_patterns, common_error_types=type_counts.most_common(5), avg_confidence=sum(confidences) / len(confidences) if confidences else 0.0, ) def _compute_topic_stats(self, user_id: str, days: int) -> TopicStats | None: """Compute topic statistics for a user.""" validations = [ event for event in self.storage.read_events("validations", days) if event.get("conversation_id", "").startswith(user_id) ] searches = [ event for event in self.storage.read_events("searches", days) if event.get("conversation_id", "").startswith(user_id) ] if not validations and not searches: return None # Count topics from validations topic_counts: Counter[str] = Counter() topic_confidences: defaultdict[str, list[float]] = defaultdict(list) for event in validations: subjects = event.get("subjects", []) confidence = event.get("confidence", 0.0) for subject in subjects: if subject: subject = str(subject).strip().lower() topic_counts[subject] += 1 topic_confidences[subject].append(confidence) # Count topics from searches for event in searches: query = event.get("query", "").strip().lower() if query: # Extract first 2-3 words as topic topic = " ".join(query.split()[:3]) topic_counts[topic] += 1 # Compute averages low_confidence_topics = [ (topic, sum(confs) / len(confs)) for topic, confs in topic_confidences.items() if confs and sum(confs) / len(confs) < 0.5 ] low_confidence_topics.sort(key=lambda x: x[1]) # Lowest confidence first return TopicStats( primary_topics=topic_counts.most_common(10), low_confidence_topics=low_confidence_topics[:5], ) def _stats_from_dict(self, data: dict[str, Any]) -> UserStats: """Reconstruct UserStats from cached dict.""" corrections_data = data.get("corrections") corrections = ( CorrectionStats(**corrections_data) if corrections_data else None ) topics_data = data.get("topics") topics = TopicStats(**topics_data) if topics_data else None return UserStats( user_id=data["user_id"], period_start=data["period_start"], period_end=data["period_end"], total_interactions=data["total_interactions"], corrections=corrections, topics=topics, last_updated=data["last_updated"], ) def clear_cache(self, user_id: str | None = None) -> None: """Clear cached stats for a user or all users. Args: user_id: Specific user to clear, or None for all users """ if user_id: cache_file = self.cache_dir / f"{user_id}.json" if cache_file.exists(): cache_file.unlink() else: for cache_file in self.cache_dir.glob("*.json"): cache_file.unlink()