ml-knowledge-platform/knowledge_platform/feedback/user_stats.py
2026-02-16 04:50:51 -08:00

251 lines
8.3 KiB
Python

"""User conversation statistics tracker.
Tracks per-user interaction patterns to enable adaptive system prompts:
- Frequent correction patterns
- Topic focus areas
- Confidence trends
- Error types
"""
from __future__ import annotations
import json
from collections import Counter, defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from .storage import FeedbackStorage
@dataclass
class CorrectionStats:
"""Statistics about user corrections."""
total_corrections: int
frequent_patterns: list[tuple[str, str, int]] # (original, replacement, count)
common_error_types: list[tuple[str, int]] # (type, count)
avg_confidence: float
@dataclass
class TopicStats:
"""Statistics about topics the user interacts with."""
primary_topics: list[tuple[str, int]] # (topic, frequency)
low_confidence_topics: list[tuple[str, float]] # (topic, avg_confidence)
@dataclass
class UserStats:
"""Aggregated user statistics for adaptive prompting."""
user_id: str
period_start: str # ISO timestamp
period_end: str # ISO timestamp
total_interactions: int
corrections: CorrectionStats | None
topics: TopicStats | None
last_updated: str # ISO timestamp
class UserStatsTracker:
"""Tracks and analyzes per-user conversation statistics."""
def __init__(self, storage_dir: Path, cache_ttl_hours: int = 24) -> None:
"""Initialize user stats tracker.
Args:
storage_dir: Directory containing feedback logs
cache_ttl_hours: Hours to cache user stats before recomputing
"""
self.storage = FeedbackStorage(storage_dir)
self.cache_dir = storage_dir / "user-stats"
self.cache_dir.mkdir(exist_ok=True)
self.cache_ttl = timedelta(hours=cache_ttl_hours)
def get_user_stats(
self,
user_id: str,
days: int = 30,
min_correction_count: int = 3,
) -> UserStats:
"""Get statistics for a specific user.
Args:
user_id: User identifier
days: Number of days to analyze
min_correction_count: Minimum occurrences to count as frequent
Returns:
UserStats with aggregated metrics
"""
# Check cache first
cache_file = self.cache_dir / f"{user_id}.json"
if cache_file.exists():
cache_data = json.loads(cache_file.read_text())
cache_time = datetime.fromisoformat(cache_data["last_updated"])
if datetime.now() - cache_time < self.cache_ttl:
return self._stats_from_dict(cache_data)
# Compute stats from feedback logs
period_start = datetime.now() - timedelta(days=days)
period_end = datetime.now()
corrections_stats = self._compute_correction_stats(
user_id, days, min_correction_count
)
topic_stats = self._compute_topic_stats(user_id, days)
total_interactions = (
(corrections_stats.total_corrections if corrections_stats else 0)
+ sum(count for _, count in (topic_stats.primary_topics if topic_stats else []))
)
stats = UserStats(
user_id=user_id,
period_start=period_start.isoformat(),
period_end=period_end.isoformat(),
total_interactions=total_interactions,
corrections=corrections_stats,
topics=topic_stats,
last_updated=datetime.now().isoformat(),
)
# Cache for future requests
cache_file.write_text(json.dumps(asdict(stats), indent=2))
return stats
def _compute_correction_stats(
self, user_id: str, days: int, min_count: int
) -> CorrectionStats | None:
"""Compute correction statistics for a user."""
corrections = [
event
for event in self.storage.read_events("corrections", days)
if event.get("conversation_id", "").startswith(user_id)
]
if not corrections:
return None
# Count patterns
pattern_counts: Counter[tuple[str, str]] = Counter()
type_counts: Counter[str] = Counter()
confidences: list[float] = []
for event in corrections:
for change in event.get("changes", []):
original = change.get("original", "").strip().lower()
replacement = change.get("replacement", "").strip().lower()
change_type = change.get("type", "unknown")
if original and replacement:
pattern_counts[(original, replacement)] += 1
type_counts[change_type] += 1
confidence = event.get("confidence", 0.0)
if confidence > 0:
confidences.append(confidence)
# Filter by min count and format
frequent_patterns = [
(orig, repl, count)
for (orig, repl), count in pattern_counts.most_common(10)
if count >= min_count
]
return CorrectionStats(
total_corrections=len(corrections),
frequent_patterns=frequent_patterns,
common_error_types=type_counts.most_common(5),
avg_confidence=sum(confidences) / len(confidences) if confidences else 0.0,
)
def _compute_topic_stats(self, user_id: str, days: int) -> TopicStats | None:
"""Compute topic statistics for a user."""
validations = [
event
for event in self.storage.read_events("validations", days)
if event.get("conversation_id", "").startswith(user_id)
]
searches = [
event
for event in self.storage.read_events("searches", days)
if event.get("conversation_id", "").startswith(user_id)
]
if not validations and not searches:
return None
# Count topics from validations
topic_counts: Counter[str] = Counter()
topic_confidences: defaultdict[str, list[float]] = defaultdict(list)
for event in validations:
subjects = event.get("subjects", [])
confidence = event.get("confidence", 0.0)
for subject in subjects:
if subject:
subject = str(subject).strip().lower()
topic_counts[subject] += 1
topic_confidences[subject].append(confidence)
# Count topics from searches
for event in searches:
query = event.get("query", "").strip().lower()
if query:
# Extract first 2-3 words as topic
topic = " ".join(query.split()[:3])
topic_counts[topic] += 1
# Compute averages
low_confidence_topics = [
(topic, sum(confs) / len(confs))
for topic, confs in topic_confidences.items()
if confs and sum(confs) / len(confs) < 0.5
]
low_confidence_topics.sort(key=lambda x: x[1]) # Lowest confidence first
return TopicStats(
primary_topics=topic_counts.most_common(10),
low_confidence_topics=low_confidence_topics[:5],
)
def _stats_from_dict(self, data: dict[str, Any]) -> UserStats:
"""Reconstruct UserStats from cached dict."""
corrections_data = data.get("corrections")
corrections = (
CorrectionStats(**corrections_data) if corrections_data else None
)
topics_data = data.get("topics")
topics = TopicStats(**topics_data) if topics_data else None
return UserStats(
user_id=data["user_id"],
period_start=data["period_start"],
period_end=data["period_end"],
total_interactions=data["total_interactions"],
corrections=corrections,
topics=topics,
last_updated=data["last_updated"],
)
def clear_cache(self, user_id: str | None = None) -> None:
"""Clear cached stats for a user or all users.
Args:
user_id: Specific user to clear, or None for all users
"""
if user_id:
cache_file = self.cache_dir / f"{user_id}.json"
if cache_file.exists():
cache_file.unlink()
else:
for cache_file in self.cache_dir.glob("*.json"):
cache_file.unlink()