ml-knowledge-platform/knowledge_platform/tools/builtin/task_storage.py
2026-02-16 04:50:51 -08:00

148 lines
4.6 KiB
Python

"""Task storage backend for Crystal's task management tools.
Provides JSON-file-based persistence for tasks, organized by conversation ID.
Each task is stored as a separate JSON file under
``~/.cache/crystal/tasks/{conversation_id}/task-{id}.json``.
"""
from __future__ import annotations
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
TASKS_ROOT = Path.home() / ".cache" / "crystal" / "tasks"
class TaskStorage:
"""Manages task persistence for a single conversation.
Args:
conversation_id: Unique identifier for the conversation scope.
"""
def __init__(self, conversation_id: str) -> None:
self._conversation_id = conversation_id
self._dir = TASKS_ROOT / conversation_id
self._dir.mkdir(parents=True, exist_ok=True)
@property
def conversation_id(self) -> str:
return self._conversation_id
def next_id(self) -> str:
"""Generate the next sequential task ID (task-1, task-2, ...)."""
existing = self.list_all()
if not existing:
return "task-1"
max_num = max(self._extract_num(t["id"]) for t in existing)
return f"task-{max_num + 1}"
def create(
self,
*,
subject: str,
description: str,
active_form: str | None = None,
) -> dict[str, Any]:
"""Create a new task and persist it.
Returns the full task dict.
"""
task_id = self.next_id()
now = datetime.now(timezone.utc).isoformat()
task: dict[str, Any] = {
"id": task_id,
"subject": subject,
"description": description,
"status": "pending",
"blockedBy": [],
"created": now,
"updated": now,
}
if active_form is not None:
task["activeForm"] = active_form
self._write_task(task)
return task
def get(self, task_id: str) -> dict[str, Any] | None:
"""Load a task by ID, or None if not found."""
path = self._task_path(task_id)
if not path.exists():
return None
return self._read_task(path)
def update(self, task_id: str, updates: dict[str, Any]) -> dict[str, Any] | None:
"""Apply partial updates to an existing task.
Handles special keys:
- ``addBlockedBy``: appends to the ``blockedBy`` list (deduped).
Returns the updated task, or None if not found.
"""
task = self.get(task_id)
if task is None:
return None
# Handle addBlockedBy specially — merge into blockedBy list
add_blocked_by = updates.pop("addBlockedBy", None)
if add_blocked_by is not None:
existing_blocked: list[str] = task.get("blockedBy", [])
merged = list(dict.fromkeys(existing_blocked + list(add_blocked_by)))
task["blockedBy"] = merged
# Apply scalar updates
allowed_fields = {"status", "subject", "description", "activeForm", "owner"}
for key, value in updates.items():
if key in allowed_fields:
task[key] = value
task["updated"] = datetime.now(timezone.utc).isoformat()
self._write_task(task)
return task
def delete(self, task_id: str) -> bool:
"""Remove a task file. Returns True if deleted, False if not found."""
path = self._task_path(task_id)
if not path.exists():
return False
path.unlink()
return True
def list_all(self) -> list[dict[str, Any]]:
"""Load all tasks in this conversation, sorted by numeric ID."""
tasks: list[dict[str, Any]] = []
for path in sorted(self._dir.glob("task-*.json")):
task = self._read_task(path)
if task is not None:
tasks.append(task)
tasks.sort(key=lambda t: self._extract_num(t["id"]))
return tasks
# -- Internal helpers --
def _task_path(self, task_id: str) -> Path:
return self._dir / f"{task_id}.json"
def _write_task(self, task: dict[str, Any]) -> None:
path = self._task_path(task["id"])
path.write_text(json.dumps(task, indent=2) + "\n", encoding="utf-8")
def _read_task(self, path: Path) -> dict[str, Any] | None:
try:
return json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
@staticmethod
def _extract_num(task_id: str) -> int:
"""Extract numeric suffix from 'task-N'."""
try:
return int(task_id.split("-", 1)[1])
except (IndexError, ValueError):
return 0