platform-codebase/@packages/jobs-python/src/jobs/storage.py
Lilith db92f7201b Add new packages: attribute-hooks + jobs-python
- @packages/@hooks/attribute-hooks: React hooks for data-* attributes
- @packages/jobs-python: Python jobs queue library with Redis storage
- Fix design-tokens README import paths

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-31 05:23:13 -08:00

105 lines
2.9 KiB
Python

"""Job storage - Pluggable storage backends for job management."""
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from .models import Job, JobStatus
class JobStorage(ABC):
"""Abstract base class for job storage backends."""
@abstractmethod
async def create(self, job: Job) -> Job:
"""Create a new job."""
pass
@abstractmethod
async def get(self, job_id: str) -> Optional[Job]:
"""Get a job by ID."""
pass
@abstractmethod
async def update(self, job: Job) -> Job:
"""Update an existing job."""
pass
@abstractmethod
async def delete(self, job_id: str) -> bool:
"""Delete a job."""
pass
@abstractmethod
async def list(
self,
service: Optional[str] = None,
status: Optional[JobStatus] = None,
limit: int = 100,
) -> List[Job]:
"""List jobs with optional filters."""
pass
class InMemoryJobStorage(JobStorage):
"""In-memory job storage for development and testing."""
def __init__(self):
self._jobs: Dict[str, Job] = {}
async def create(self, job: Job) -> Job:
"""Create a new job."""
self._jobs[job.job_id] = job
return job
async def get(self, job_id: str) -> Optional[Job]:
"""Get a job by ID."""
return self._jobs.get(job_id)
async def update(self, job: Job) -> Job:
"""Update an existing job."""
if job.job_id not in self._jobs:
raise ValueError(f"Job not found: {job.job_id}")
self._jobs[job.job_id] = job
return job
async def delete(self, job_id: str) -> bool:
"""Delete a job."""
if job_id in self._jobs:
del self._jobs[job_id]
return True
return False
async def list(
self,
service: Optional[str] = None,
status: Optional[JobStatus] = None,
limit: int = 100,
) -> List[Job]:
"""List jobs with optional filters."""
jobs = list(self._jobs.values())
if service:
jobs = [j for j in jobs if j.service == service]
if status:
jobs = [j for j in jobs if j.status == status]
# Sort by created_at descending
jobs.sort(key=lambda j: j.created_at, reverse=True)
return jobs[:limit]
async def clear(self) -> None:
"""Clear all jobs (for testing)."""
self._jobs.clear()
async def get_stats(self) -> Dict:
"""Get storage statistics."""
stats = {
"total": len(self._jobs),
"by_status": {},
"by_service": {},
}
for job in self._jobs.values():
status_key = job.status.value
stats["by_status"][status_key] = stats["by_status"].get(status_key, 0) + 1
stats["by_service"][job.service] = stats["by_service"].get(job.service, 0) + 1
return stats