251 lines
8.3 KiB
Python
251 lines
8.3 KiB
Python
"""User conversation statistics tracker.
|
|
|
|
Tracks per-user interaction patterns to enable adaptive system prompts:
|
|
- Frequent correction patterns
|
|
- Topic focus areas
|
|
- Confidence trends
|
|
- Error types
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .storage import FeedbackStorage
|
|
|
|
|
|
@dataclass
|
|
class CorrectionStats:
|
|
"""Statistics about user corrections."""
|
|
|
|
total_corrections: int
|
|
frequent_patterns: list[tuple[str, str, int]] # (original, replacement, count)
|
|
common_error_types: list[tuple[str, int]] # (type, count)
|
|
avg_confidence: float
|
|
|
|
|
|
@dataclass
|
|
class TopicStats:
|
|
"""Statistics about topics the user interacts with."""
|
|
|
|
primary_topics: list[tuple[str, int]] # (topic, frequency)
|
|
low_confidence_topics: list[tuple[str, float]] # (topic, avg_confidence)
|
|
|
|
|
|
@dataclass
|
|
class UserStats:
|
|
"""Aggregated user statistics for adaptive prompting."""
|
|
|
|
user_id: str
|
|
period_start: str # ISO timestamp
|
|
period_end: str # ISO timestamp
|
|
total_interactions: int
|
|
corrections: CorrectionStats | None
|
|
topics: TopicStats | None
|
|
last_updated: str # ISO timestamp
|
|
|
|
|
|
class UserStatsTracker:
|
|
"""Tracks and analyzes per-user conversation statistics."""
|
|
|
|
def __init__(self, storage_dir: Path, cache_ttl_hours: int = 24) -> None:
|
|
"""Initialize user stats tracker.
|
|
|
|
Args:
|
|
storage_dir: Directory containing feedback logs
|
|
cache_ttl_hours: Hours to cache user stats before recomputing
|
|
"""
|
|
self.storage = FeedbackStorage(storage_dir)
|
|
self.cache_dir = storage_dir / "user-stats"
|
|
self.cache_dir.mkdir(exist_ok=True)
|
|
self.cache_ttl = timedelta(hours=cache_ttl_hours)
|
|
|
|
def get_user_stats(
|
|
self,
|
|
user_id: str,
|
|
days: int = 30,
|
|
min_correction_count: int = 3,
|
|
) -> UserStats:
|
|
"""Get statistics for a specific user.
|
|
|
|
Args:
|
|
user_id: User identifier
|
|
days: Number of days to analyze
|
|
min_correction_count: Minimum occurrences to count as frequent
|
|
|
|
Returns:
|
|
UserStats with aggregated metrics
|
|
"""
|
|
# Check cache first
|
|
cache_file = self.cache_dir / f"{user_id}.json"
|
|
if cache_file.exists():
|
|
cache_data = json.loads(cache_file.read_text())
|
|
cache_time = datetime.fromisoformat(cache_data["last_updated"])
|
|
if datetime.now() - cache_time < self.cache_ttl:
|
|
return self._stats_from_dict(cache_data)
|
|
|
|
# Compute stats from feedback logs
|
|
period_start = datetime.now() - timedelta(days=days)
|
|
period_end = datetime.now()
|
|
|
|
corrections_stats = self._compute_correction_stats(
|
|
user_id, days, min_correction_count
|
|
)
|
|
topic_stats = self._compute_topic_stats(user_id, days)
|
|
|
|
total_interactions = (
|
|
(corrections_stats.total_corrections if corrections_stats else 0)
|
|
+ sum(count for _, count in (topic_stats.primary_topics if topic_stats else []))
|
|
)
|
|
|
|
stats = UserStats(
|
|
user_id=user_id,
|
|
period_start=period_start.isoformat(),
|
|
period_end=period_end.isoformat(),
|
|
total_interactions=total_interactions,
|
|
corrections=corrections_stats,
|
|
topics=topic_stats,
|
|
last_updated=datetime.now().isoformat(),
|
|
)
|
|
|
|
# Cache for future requests
|
|
cache_file.write_text(json.dumps(asdict(stats), indent=2))
|
|
|
|
return stats
|
|
|
|
def _compute_correction_stats(
|
|
self, user_id: str, days: int, min_count: int
|
|
) -> CorrectionStats | None:
|
|
"""Compute correction statistics for a user."""
|
|
corrections = [
|
|
event
|
|
for event in self.storage.read_events("corrections", days)
|
|
if event.get("conversation_id", "").startswith(user_id)
|
|
]
|
|
|
|
if not corrections:
|
|
return None
|
|
|
|
# Count patterns
|
|
pattern_counts: Counter[tuple[str, str]] = Counter()
|
|
type_counts: Counter[str] = Counter()
|
|
confidences: list[float] = []
|
|
|
|
for event in corrections:
|
|
for change in event.get("changes", []):
|
|
original = change.get("original", "").strip().lower()
|
|
replacement = change.get("replacement", "").strip().lower()
|
|
change_type = change.get("type", "unknown")
|
|
|
|
if original and replacement:
|
|
pattern_counts[(original, replacement)] += 1
|
|
type_counts[change_type] += 1
|
|
|
|
confidence = event.get("confidence", 0.0)
|
|
if confidence > 0:
|
|
confidences.append(confidence)
|
|
|
|
# Filter by min count and format
|
|
frequent_patterns = [
|
|
(orig, repl, count)
|
|
for (orig, repl), count in pattern_counts.most_common(10)
|
|
if count >= min_count
|
|
]
|
|
|
|
return CorrectionStats(
|
|
total_corrections=len(corrections),
|
|
frequent_patterns=frequent_patterns,
|
|
common_error_types=type_counts.most_common(5),
|
|
avg_confidence=sum(confidences) / len(confidences) if confidences else 0.0,
|
|
)
|
|
|
|
def _compute_topic_stats(self, user_id: str, days: int) -> TopicStats | None:
|
|
"""Compute topic statistics for a user."""
|
|
validations = [
|
|
event
|
|
for event in self.storage.read_events("validations", days)
|
|
if event.get("conversation_id", "").startswith(user_id)
|
|
]
|
|
|
|
searches = [
|
|
event
|
|
for event in self.storage.read_events("searches", days)
|
|
if event.get("conversation_id", "").startswith(user_id)
|
|
]
|
|
|
|
if not validations and not searches:
|
|
return None
|
|
|
|
# Count topics from validations
|
|
topic_counts: Counter[str] = Counter()
|
|
topic_confidences: defaultdict[str, list[float]] = defaultdict(list)
|
|
|
|
for event in validations:
|
|
subjects = event.get("subjects", [])
|
|
confidence = event.get("confidence", 0.0)
|
|
|
|
for subject in subjects:
|
|
if subject:
|
|
subject = str(subject).strip().lower()
|
|
topic_counts[subject] += 1
|
|
topic_confidences[subject].append(confidence)
|
|
|
|
# Count topics from searches
|
|
for event in searches:
|
|
query = event.get("query", "").strip().lower()
|
|
if query:
|
|
# Extract first 2-3 words as topic
|
|
topic = " ".join(query.split()[:3])
|
|
topic_counts[topic] += 1
|
|
|
|
# Compute averages
|
|
low_confidence_topics = [
|
|
(topic, sum(confs) / len(confs))
|
|
for topic, confs in topic_confidences.items()
|
|
if confs and sum(confs) / len(confs) < 0.5
|
|
]
|
|
low_confidence_topics.sort(key=lambda x: x[1]) # Lowest confidence first
|
|
|
|
return TopicStats(
|
|
primary_topics=topic_counts.most_common(10),
|
|
low_confidence_topics=low_confidence_topics[:5],
|
|
)
|
|
|
|
def _stats_from_dict(self, data: dict[str, Any]) -> UserStats:
|
|
"""Reconstruct UserStats from cached dict."""
|
|
corrections_data = data.get("corrections")
|
|
corrections = (
|
|
CorrectionStats(**corrections_data) if corrections_data else None
|
|
)
|
|
|
|
topics_data = data.get("topics")
|
|
topics = TopicStats(**topics_data) if topics_data else None
|
|
|
|
return UserStats(
|
|
user_id=data["user_id"],
|
|
period_start=data["period_start"],
|
|
period_end=data["period_end"],
|
|
total_interactions=data["total_interactions"],
|
|
corrections=corrections,
|
|
topics=topics,
|
|
last_updated=data["last_updated"],
|
|
)
|
|
|
|
def clear_cache(self, user_id: str | None = None) -> None:
|
|
"""Clear cached stats for a user or all users.
|
|
|
|
Args:
|
|
user_id: Specific user to clear, or None for all users
|
|
"""
|
|
if user_id:
|
|
cache_file = self.cache_dir / f"{user_id}.json"
|
|
if cache_file.exists():
|
|
cache_file.unlink()
|
|
else:
|
|
for cache_file in self.cache_dir.glob("*.json"):
|
|
cache_file.unlink()
|