"""Task storage backend for Crystal's task management tools. Provides JSON-file-based persistence for tasks, organized by conversation ID. Each task is stored as a separate JSON file under ``~/.cache/crystal/tasks/{conversation_id}/task-{id}.json``. """ from __future__ import annotations import json import time from datetime import datetime, timezone from pathlib import Path from typing import Any TASKS_ROOT = Path.home() / ".cache" / "crystal" / "tasks" class TaskStorage: """Manages task persistence for a single conversation. Args: conversation_id: Unique identifier for the conversation scope. """ def __init__(self, conversation_id: str) -> None: self._conversation_id = conversation_id self._dir = TASKS_ROOT / conversation_id self._dir.mkdir(parents=True, exist_ok=True) @property def conversation_id(self) -> str: return self._conversation_id def next_id(self) -> str: """Generate the next sequential task ID (task-1, task-2, ...).""" existing = self.list_all() if not existing: return "task-1" max_num = max(self._extract_num(t["id"]) for t in existing) return f"task-{max_num + 1}" def create( self, *, subject: str, description: str, active_form: str | None = None, ) -> dict[str, Any]: """Create a new task and persist it. Returns the full task dict. """ task_id = self.next_id() now = datetime.now(timezone.utc).isoformat() task: dict[str, Any] = { "id": task_id, "subject": subject, "description": description, "status": "pending", "blockedBy": [], "created": now, "updated": now, } if active_form is not None: task["activeForm"] = active_form self._write_task(task) return task def get(self, task_id: str) -> dict[str, Any] | None: """Load a task by ID, or None if not found.""" path = self._task_path(task_id) if not path.exists(): return None return self._read_task(path) def update(self, task_id: str, updates: dict[str, Any]) -> dict[str, Any] | None: """Apply partial updates to an existing task. Handles special keys: - ``addBlockedBy``: appends to the ``blockedBy`` list (deduped). Returns the updated task, or None if not found. """ task = self.get(task_id) if task is None: return None # Handle addBlockedBy specially — merge into blockedBy list add_blocked_by = updates.pop("addBlockedBy", None) if add_blocked_by is not None: existing_blocked: list[str] = task.get("blockedBy", []) merged = list(dict.fromkeys(existing_blocked + list(add_blocked_by))) task["blockedBy"] = merged # Apply scalar updates allowed_fields = {"status", "subject", "description", "activeForm", "owner"} for key, value in updates.items(): if key in allowed_fields: task[key] = value task["updated"] = datetime.now(timezone.utc).isoformat() self._write_task(task) return task def delete(self, task_id: str) -> bool: """Remove a task file. Returns True if deleted, False if not found.""" path = self._task_path(task_id) if not path.exists(): return False path.unlink() return True def list_all(self) -> list[dict[str, Any]]: """Load all tasks in this conversation, sorted by numeric ID.""" tasks: list[dict[str, Any]] = [] for path in sorted(self._dir.glob("task-*.json")): task = self._read_task(path) if task is not None: tasks.append(task) tasks.sort(key=lambda t: self._extract_num(t["id"])) return tasks # -- Internal helpers -- def _task_path(self, task_id: str) -> Path: return self._dir / f"{task_id}.json" def _write_task(self, task: dict[str, Any]) -> None: path = self._task_path(task["id"]) path.write_text(json.dumps(task, indent=2) + "\n", encoding="utf-8") def _read_task(self, path: Path) -> dict[str, Any] | None: try: return json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return None @staticmethod def _extract_num(task_id: str) -> int: """Extract numeric suffix from 'task-N'.""" try: return int(task_id.split("-", 1)[1]) except (IndexError, ValueError): return 0