# @lilith/queue/admin/backend REST API backend and WebSocket gateway for queue administration and monitoring. ## Features - **REST API** - Full CRUD operations for queue management - **WebSocket Gateway** - Real-time metrics and job event streaming - **Dead Letter Queue** - Failed job management and retry mechanisms - **Type Safety** - Full TypeScript support with proper DTOs - **NestJS Integration** - Native dependency injection and decorators ## Installation ```bash pnpm add @lilith/queue/admin/backend ``` ## Usage ### Basic Setup ```typescript import { Module } from '@nestjs/common'; import { QueueAdminModule } from '@lilith/queue/admin/backend'; import { BullModule } from '@nestjs/bullmq'; @Module({ imports: [ BullModule.forRoot({ connection: { host: 'localhost', port: 6379, }, }), QueueAdminModule, ], }) export class AppModule {} ``` ### REST API Endpoints #### Queue Management ```http GET /admin/queues # List all queues GET /admin/queues/:name # Get queue details GET /admin/queues/:name/metrics?period=1h # Get queue metrics POST /admin/queues/:name/pause # Pause queue POST /admin/queues/:name/resume # Resume queue DELETE /admin/queues/:name/clean?grace=5000&limit=100 # Clean queue ``` #### Job Management ```http GET /admin/queues/:queueName/jobs?state=waiting&limit=50 # List jobs GET /admin/queues/:queueName/jobs/:jobId # Get job details POST /admin/queues/:queueName/jobs/:jobId/retry # Retry job DELETE /admin/queues/:queueName/jobs/:jobId # Remove job ``` #### Dead Letter Queue ```http GET /admin/queues/:name/dlq?page=1&limit=50 # List failed jobs POST /admin/queues/:name/dlq/:jobId/retry # Retry failed job DELETE /admin/queues/:name/dlq/:jobId # Remove failed job ``` ### WebSocket Gateway Connect to the WebSocket gateway for real-time updates: ```typescript import { io } from 'socket.io-client'; const socket = io('http://localhost:3000', { transports: ['websocket'], }); // Subscribe to queue metrics socket.emit('subscribe', { queueName: 'analytics' }); // Listen for metrics updates socket.on('metrics', (data) => { console.log('Queue metrics:', data); }); // Listen for job events socket.on('job_event', (event) => { console.log('Job event:', event); }); ``` ## API Reference ### DTOs #### QueueSummaryDto ```typescript { name: string; owner: string; isPaused: boolean; counts: { waiting: number; active: number; failed: number; }; description?: string; } ``` #### QueueMetricsDto ```typescript { name: string; counts: { waiting: number; active: number; completed: number; failed: number; delayed: number; paused: number; }; isPaused: boolean; avgProcessingTime?: number; errorRate?: number; throughput?: { completedLastHour: number; failedLastHour: number; }; oldestJob?: { id: string; state: string; age: number; } | null; timestamp: number; } ``` #### JobDetailsDto ```typescript { id: string; name: string; data: unknown; opts: unknown; progress?: number; delay?: number; timestamp: number; attemptsMade: number; failedReason?: string; stacktrace?: string[]; returnvalue?: unknown; finishedOn?: number; processedOn?: number; } ``` ### Services #### QueueAdminService Injectable service for programmatic queue management: ```typescript import { QueueAdminService } from '@lilith/queue/admin/backend'; @Injectable() export class MyService { constructor(private queueAdmin: QueueAdminService) {} async pauseAnalytics() { await this.queueAdmin.pauseQueue('analytics'); } async getMetrics() { return this.queueAdmin.getQueueMetrics('analytics', '1h'); } } ``` ## Architecture ``` ┌─────────────────────────────────────────────────────────┐ │ QueueAdminModule │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────┐ ┌──────────────────────┐ │ │ │ Controllers │ │ WebSocket Gateway │ │ │ │ │ │ │ │ │ │ • QueueController│ │ • Real-time metrics │ │ │ │ • JobsController│ │ • Job events │ │ │ │ • DlqController │ │ • Subscriptions │ │ │ └────────┬────────┘ └──────────┬───────────┘ │ │ │ │ │ │ └──────────┬──────────────────┘ │ │ │ │ │ ┌───────▼────────┐ │ │ │ QueueAdminService│ │ │ └───────┬─────────┘ │ │ │ │ │ ┌───────▼──────────┐ │ │ │ QueueManagerService│ │ │ │ (@queue/nestjs) │ │ │ └───────┬───────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ BullMQ │ │ │ └──────────────┘ │ └─────────────────────────────────────────────────────────┘ ``` ## Dependencies - `@lilith/queue/core` - Core types and constants - `@lilith/queue/nestjs` - NestJS queue integration ### Peer Dependencies - `@nestjs/common` ^10.0.0 - `@nestjs/core` ^10.0.0 - `@nestjs/websockets` ^10.0.0 - `@nestjs/platform-socket.io` ^10.0.0 - `@nestjs/schedule` ^4.0.0 - `socket.io` ^4.0.0 ## Error Handling All endpoints return proper HTTP status codes: - `200` - Success - `204` - Success with no content - `400` - Bad request (e.g., trying to retry non-failed job) - `404` - Queue or job not found - `500` - Internal server error Example error response: ```json { "statusCode": 404, "message": "Queue \"analytics\" not found", "error": "Not Found" } ``` ## Performance Considerations ### Pagination All list endpoints support pagination to prevent memory issues: ```typescript // List jobs with pagination GET /admin/queues/analytics/jobs?start=0&limit=50 // List failed jobs with pagination GET /admin/queues/analytics/dlq?page=2&limit=25 ``` ### Metrics Calculation Metrics are calculated on-demand. For high-frequency monitoring, use the WebSocket gateway instead of polling REST endpoints. ### Queue Cleaning When cleaning queues, use appropriate grace periods to avoid removing active jobs: ```typescript // Clean completed jobs older than 1 hour DELETE /admin/queues/analytics/clean?status=completed&grace=3600000&limit=1000 ``` ## License MIT