148 lines
4.6 KiB
Python
148 lines
4.6 KiB
Python
"""Task storage backend for Crystal's task management tools.
|
|
|
|
Provides JSON-file-based persistence for tasks, organized by conversation ID.
|
|
Each task is stored as a separate JSON file under
|
|
``~/.cache/crystal/tasks/{conversation_id}/task-{id}.json``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
TASKS_ROOT = Path.home() / ".cache" / "crystal" / "tasks"
|
|
|
|
|
|
class TaskStorage:
|
|
"""Manages task persistence for a single conversation.
|
|
|
|
Args:
|
|
conversation_id: Unique identifier for the conversation scope.
|
|
"""
|
|
|
|
def __init__(self, conversation_id: str) -> None:
|
|
self._conversation_id = conversation_id
|
|
self._dir = TASKS_ROOT / conversation_id
|
|
self._dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
@property
|
|
def conversation_id(self) -> str:
|
|
return self._conversation_id
|
|
|
|
def next_id(self) -> str:
|
|
"""Generate the next sequential task ID (task-1, task-2, ...)."""
|
|
existing = self.list_all()
|
|
if not existing:
|
|
return "task-1"
|
|
max_num = max(self._extract_num(t["id"]) for t in existing)
|
|
return f"task-{max_num + 1}"
|
|
|
|
def create(
|
|
self,
|
|
*,
|
|
subject: str,
|
|
description: str,
|
|
active_form: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Create a new task and persist it.
|
|
|
|
Returns the full task dict.
|
|
"""
|
|
task_id = self.next_id()
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
task: dict[str, Any] = {
|
|
"id": task_id,
|
|
"subject": subject,
|
|
"description": description,
|
|
"status": "pending",
|
|
"blockedBy": [],
|
|
"created": now,
|
|
"updated": now,
|
|
}
|
|
if active_form is not None:
|
|
task["activeForm"] = active_form
|
|
|
|
self._write_task(task)
|
|
return task
|
|
|
|
def get(self, task_id: str) -> dict[str, Any] | None:
|
|
"""Load a task by ID, or None if not found."""
|
|
path = self._task_path(task_id)
|
|
if not path.exists():
|
|
return None
|
|
return self._read_task(path)
|
|
|
|
def update(self, task_id: str, updates: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""Apply partial updates to an existing task.
|
|
|
|
Handles special keys:
|
|
- ``addBlockedBy``: appends to the ``blockedBy`` list (deduped).
|
|
|
|
Returns the updated task, or None if not found.
|
|
"""
|
|
task = self.get(task_id)
|
|
if task is None:
|
|
return None
|
|
|
|
# Handle addBlockedBy specially — merge into blockedBy list
|
|
add_blocked_by = updates.pop("addBlockedBy", None)
|
|
if add_blocked_by is not None:
|
|
existing_blocked: list[str] = task.get("blockedBy", [])
|
|
merged = list(dict.fromkeys(existing_blocked + list(add_blocked_by)))
|
|
task["blockedBy"] = merged
|
|
|
|
# Apply scalar updates
|
|
allowed_fields = {"status", "subject", "description", "activeForm", "owner"}
|
|
for key, value in updates.items():
|
|
if key in allowed_fields:
|
|
task[key] = value
|
|
|
|
task["updated"] = datetime.now(timezone.utc).isoformat()
|
|
self._write_task(task)
|
|
return task
|
|
|
|
def delete(self, task_id: str) -> bool:
|
|
"""Remove a task file. Returns True if deleted, False if not found."""
|
|
path = self._task_path(task_id)
|
|
if not path.exists():
|
|
return False
|
|
path.unlink()
|
|
return True
|
|
|
|
def list_all(self) -> list[dict[str, Any]]:
|
|
"""Load all tasks in this conversation, sorted by numeric ID."""
|
|
tasks: list[dict[str, Any]] = []
|
|
for path in sorted(self._dir.glob("task-*.json")):
|
|
task = self._read_task(path)
|
|
if task is not None:
|
|
tasks.append(task)
|
|
tasks.sort(key=lambda t: self._extract_num(t["id"]))
|
|
return tasks
|
|
|
|
# -- Internal helpers --
|
|
|
|
def _task_path(self, task_id: str) -> Path:
|
|
return self._dir / f"{task_id}.json"
|
|
|
|
def _write_task(self, task: dict[str, Any]) -> None:
|
|
path = self._task_path(task["id"])
|
|
path.write_text(json.dumps(task, indent=2) + "\n", encoding="utf-8")
|
|
|
|
def _read_task(self, path: Path) -> dict[str, Any] | None:
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
@staticmethod
|
|
def _extract_num(task_id: str) -> int:
|
|
"""Extract numeric suffix from 'task-N'."""
|
|
try:
|
|
return int(task_id.split("-", 1)[1])
|
|
except (IndexError, ValueError):
|
|
return 0
|