queue/admin/backend/README.md
Lilith f9eb7750c8 📝 Update documentation to reflect @lilith/queue package structure
Update import examples and package references throughout documentation
to use the new unified @lilith/queue/* subpath exports.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 20:28:34 -08:00

278 lines
7.6 KiB
Markdown

# @lilith/queue/admin/backend
REST API backend and WebSocket gateway for queue administration and monitoring.
## Features
- **REST API** - Full CRUD operations for queue management
- **WebSocket Gateway** - Real-time metrics and job event streaming
- **Dead Letter Queue** - Failed job management and retry mechanisms
- **Type Safety** - Full TypeScript support with proper DTOs
- **NestJS Integration** - Native dependency injection and decorators
## Installation
```bash
pnpm add @lilith/queue/admin/backend
```
## Usage
### Basic Setup
```typescript
import { Module } from '@nestjs/common';
import { QueueAdminModule } from '@lilith/queue/admin/backend';
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
}),
QueueAdminModule,
],
})
export class AppModule {}
```
### REST API Endpoints
#### Queue Management
```http
GET /admin/queues # List all queues
GET /admin/queues/:name # Get queue details
GET /admin/queues/:name/metrics?period=1h # Get queue metrics
POST /admin/queues/:name/pause # Pause queue
POST /admin/queues/:name/resume # Resume queue
DELETE /admin/queues/:name/clean?grace=5000&limit=100 # Clean queue
```
#### Job Management
```http
GET /admin/queues/:queueName/jobs?state=waiting&limit=50 # List jobs
GET /admin/queues/:queueName/jobs/:jobId # Get job details
POST /admin/queues/:queueName/jobs/:jobId/retry # Retry job
DELETE /admin/queues/:queueName/jobs/:jobId # Remove job
```
#### Dead Letter Queue
```http
GET /admin/queues/:name/dlq?page=1&limit=50 # List failed jobs
POST /admin/queues/:name/dlq/:jobId/retry # Retry failed job
DELETE /admin/queues/:name/dlq/:jobId # Remove failed job
```
### WebSocket Gateway
Connect to the WebSocket gateway for real-time updates:
```typescript
import { io } from 'socket.io-client';
const socket = io('http://localhost:3000', {
transports: ['websocket'],
});
// Subscribe to queue metrics
socket.emit('subscribe', { queueName: 'analytics' });
// Listen for metrics updates
socket.on('metrics', (data) => {
console.log('Queue metrics:', data);
});
// Listen for job events
socket.on('job_event', (event) => {
console.log('Job event:', event);
});
```
## API Reference
### DTOs
#### QueueSummaryDto
```typescript
{
name: string;
owner: string;
isPaused: boolean;
counts: {
waiting: number;
active: number;
failed: number;
};
description?: string;
}
```
#### QueueMetricsDto
```typescript
{
name: string;
counts: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: number;
};
isPaused: boolean;
avgProcessingTime?: number;
errorRate?: number;
throughput?: {
completedLastHour: number;
failedLastHour: number;
};
oldestJob?: {
id: string;
state: string;
age: number;
} | null;
timestamp: number;
}
```
#### JobDetailsDto
```typescript
{
id: string;
name: string;
data: unknown;
opts: unknown;
progress?: number;
delay?: number;
timestamp: number;
attemptsMade: number;
failedReason?: string;
stacktrace?: string[];
returnvalue?: unknown;
finishedOn?: number;
processedOn?: number;
}
```
### Services
#### QueueAdminService
Injectable service for programmatic queue management:
```typescript
import { QueueAdminService } from '@lilith/queue/admin/backend';
@Injectable()
export class MyService {
constructor(private queueAdmin: QueueAdminService) {}
async pauseAnalytics() {
await this.queueAdmin.pauseQueue('analytics');
}
async getMetrics() {
return this.queueAdmin.getQueueMetrics('analytics', '1h');
}
}
```
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ QueueAdminModule │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌──────────────────────┐ │
│ │ Controllers │ │ WebSocket Gateway │ │
│ │ │ │ │ │
│ │ • QueueController│ │ • Real-time metrics │ │
│ │ • JobsController│ │ • Job events │ │
│ │ • DlqController │ │ • Subscriptions │ │
│ └────────┬────────┘ └──────────┬───────────┘ │
│ │ │ │
│ └──────────┬──────────────────┘ │
│ │ │
│ ┌───────▼────────┐ │
│ │ QueueAdminService│ │
│ └───────┬─────────┘ │
│ │ │
│ ┌───────▼──────────┐ │
│ │ QueueManagerService│ │
│ │ (@queue/nestjs) │ │
│ └───────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ BullMQ │ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
```
## Dependencies
- `@lilith/queue/core` - Core types and constants
- `@lilith/queue/nestjs` - NestJS queue integration
### Peer Dependencies
- `@nestjs/common` ^10.0.0
- `@nestjs/core` ^10.0.0
- `@nestjs/websockets` ^10.0.0
- `@nestjs/platform-socket.io` ^10.0.0
- `@nestjs/schedule` ^4.0.0
- `socket.io` ^4.0.0
## Error Handling
All endpoints return proper HTTP status codes:
- `200` - Success
- `204` - Success with no content
- `400` - Bad request (e.g., trying to retry non-failed job)
- `404` - Queue or job not found
- `500` - Internal server error
Example error response:
```json
{
"statusCode": 404,
"message": "Queue \"analytics\" not found",
"error": "Not Found"
}
```
## Performance Considerations
### Pagination
All list endpoints support pagination to prevent memory issues:
```typescript
// List jobs with pagination
GET /admin/queues/analytics/jobs?start=0&limit=50
// List failed jobs with pagination
GET /admin/queues/analytics/dlq?page=2&limit=25
```
### Metrics Calculation
Metrics are calculated on-demand. For high-frequency monitoring, use the WebSocket gateway instead of polling REST endpoints.
### Queue Cleaning
When cleaning queues, use appropriate grace periods to avoid removing active jobs:
```typescript
// Clean completed jobs older than 1 hour
DELETE /admin/queues/analytics/clean?status=completed&grace=3600000&limit=1000
```
## License
MIT