Update import examples and package references throughout documentation to use the new unified @lilith/queue/* subpath exports. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
883 lines
25 KiB
Markdown
883 lines
25 KiB
Markdown
# Lilith Platform (egirl-platform) Queue Integration Guide
|
|
|
|
This guide covers integrating the `@lilith/queue` packages into the Lilith Platform services.
|
|
|
|
## Table of Contents
|
|
|
|
1. [Package Overview](#package-overview)
|
|
2. [Installation](#installation)
|
|
3. [Module Registration](#module-registration)
|
|
4. [Example Implementations](#example-implementations)
|
|
- [Chatbot Service (ML Processor)](#chatbot-service---ml-processor)
|
|
- [Content Publisher (Scheduler)](#content-publisher---scheduler)
|
|
- [Fulfillment Service (Standard Processor)](#fulfillment-service---standard-processor)
|
|
5. [Reporting Module Integration](#reporting-module-integration)
|
|
6. [Admin Dashboard Integration](#admin-dashboard-integration)
|
|
7. [Migration from Existing Queue Setup](#migration-from-existing-queue-setup)
|
|
8. [Service Integration Matrix](#service-integration-matrix)
|
|
|
|
---
|
|
|
|
## Package Overview
|
|
|
|
| Package | Purpose | Key Exports |
|
|
|---------|---------|-------------|
|
|
| `@lilith/queue/core` | Base types, priority enum, peak-hour utilities | `JobPriority`, `isPeakHour`, `generateCorrelationId`, `BaseJobData`, `QueueRegistration` |
|
|
| `@lilith/queue/nestjs` | NestJS module with base classes | `QueueModule`, `BaseProcessor`, `BaseScheduler`, `BaseJobService`, `QueueManagerService` |
|
|
| `@lilith/queue/ml` | ML batching strategies, 60s timeout | `BaseMLProcessor`, `RequestBatchingStrategy`, `PipelineBatcher` |
|
|
| `@lilith/queue/reporting` | TypeORM entities for job lifecycle persistence | `ReportingModule`, `JobReporterService`, `JobAnalyticsService`, `JobEvent` |
|
|
| `@lilith/queue/admin` | Backend API + React dashboard | `QueueAdminModule`, `QueueAdminService`, `QueueAdminGateway` |
|
|
|
|
---
|
|
|
|
## Installation
|
|
|
|
```bash
|
|
# Core packages (required)
|
|
pnpm add @lilith/queue/core @lilith/queue/nestjs
|
|
|
|
# ML batching (for chatbot, TTS, translation services)
|
|
pnpm add @lilith/queue/ml
|
|
|
|
# Job analytics and persistence (recommended for production)
|
|
pnpm add @lilith/queue/reporting
|
|
|
|
# Admin dashboard (optional, for queue management UI)
|
|
pnpm add @lilith/queue/admin
|
|
```
|
|
|
|
---
|
|
|
|
## Module Registration
|
|
|
|
### Step 1: Replace Existing QueueModule
|
|
|
|
Replace the current `/var/home/lilith/Code/@applications/@egirl/egirl-platform/@services/platform/src/shared/queue/queue.module.ts`:
|
|
|
|
```typescript
|
|
// src/shared/queue/queue.module.ts
|
|
import { Module, Global } from '@nestjs/common';
|
|
import { ConfigService } from '@nestjs/config';
|
|
import { QueueModule as TransquinnQueueModule } from '@lilith/queue/nestjs';
|
|
import { ReportingModule } from '@lilith/queue/reporting';
|
|
|
|
// Import queue registrations from features
|
|
import { ANALYTICS_QUEUE } from '../../features/analytics/queue/analytics.queue';
|
|
import { CHATBOT_QUEUE } from '../../features/chatbot/queue/chatbot.queue';
|
|
import { FULFILLMENT_QUEUE } from '../../features/fulfillment/queue/fulfillment.queue';
|
|
import { CONTENT_QUEUE } from '../../features/content/queue/content.queue';
|
|
|
|
@Global()
|
|
@Module({
|
|
imports: [
|
|
// Root queue configuration
|
|
TransquinnQueueModule.forRootAsync({
|
|
inject: [ConfigService],
|
|
useFactory: (config: ConfigService) => ({
|
|
connection: {
|
|
host: config.get('REDISHOST', 'localhost'),
|
|
port: config.get('REDISPORT', 6379),
|
|
password: config.get('REDISPASSWORD'),
|
|
},
|
|
defaultJobOptions: {
|
|
removeOnComplete: { count: 1000, age: 3600 * 24 }, // Keep 1000 or 24h
|
|
removeOnFail: false, // Keep failed jobs for debugging
|
|
},
|
|
enableScheduling: true,
|
|
}),
|
|
}),
|
|
|
|
// Job lifecycle persistence (optional but recommended)
|
|
ReportingModule.forRoot(),
|
|
],
|
|
exports: [TransquinnQueueModule],
|
|
})
|
|
export class QueueModule {}
|
|
```
|
|
|
|
### Step 2: Define Queue Registrations
|
|
|
|
Create queue registration files in each feature that needs a queue:
|
|
|
|
```typescript
|
|
// src/features/analytics/queue/analytics.queue.ts
|
|
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
|
|
|
|
export const ANALYTICS_QUEUE: QueueRegistration = {
|
|
name: 'analytics',
|
|
owner: 'features/analytics',
|
|
jobTypes: [
|
|
'process-view',
|
|
'process-engagement',
|
|
'process-revenue',
|
|
'aggregate-hourly',
|
|
'aggregate-daily',
|
|
],
|
|
description: 'Analytics event processing and aggregation',
|
|
config: {
|
|
concurrency: 10,
|
|
peakAvoidance: {
|
|
enabled: true,
|
|
peakHoursUtc: [16, 17, 18, 19, 20, 21], // 4pm-9pm UTC
|
|
bypassPriority: JobPriority.HIGH,
|
|
},
|
|
defaultJobOptions: {
|
|
attempts: 3,
|
|
backoff: { type: 'exponential', delay: 1000 },
|
|
},
|
|
},
|
|
};
|
|
```
|
|
|
|
### Step 3: Register Feature Queues
|
|
|
|
In each feature module that uses queues:
|
|
|
|
```typescript
|
|
// src/features/analytics/analytics.module.ts
|
|
import { Module } from '@nestjs/common';
|
|
import { QueueModule } from '@lilith/queue/nestjs';
|
|
import { ANALYTICS_QUEUE } from './queue/analytics.queue';
|
|
import { AnalyticsProcessor } from './queue/analytics.processor';
|
|
import { AnalyticsScheduler } from './queue/analytics.scheduler';
|
|
import { AnalyticsJobService } from './queue/analytics-job.service';
|
|
|
|
@Module({
|
|
imports: [
|
|
QueueModule.forFeature({
|
|
registration: ANALYTICS_QUEUE,
|
|
processor: AnalyticsProcessor,
|
|
scheduler: AnalyticsScheduler,
|
|
jobService: AnalyticsJobService,
|
|
}),
|
|
],
|
|
providers: [AnalyticsProcessor, AnalyticsScheduler, AnalyticsJobService],
|
|
exports: [AnalyticsJobService],
|
|
})
|
|
export class AnalyticsModule {}
|
|
```
|
|
|
|
---
|
|
|
|
## Example Implementations
|
|
|
|
### Chatbot Service - ML Processor
|
|
|
|
For LLM inference batching and TTS generation with 60s timeout:
|
|
|
|
```typescript
|
|
// src/features/chatbot/queue/chatbot.queue.ts
|
|
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
|
|
|
|
export const CHATBOT_QUEUE: QueueRegistration = {
|
|
name: 'chatbot',
|
|
owner: 'features/chatbot',
|
|
jobTypes: ['llm-inference', 'tts-generation', 'conversation-context'],
|
|
description: 'Chatbot LLM and TTS processing',
|
|
config: {
|
|
concurrency: 5, // Limited by ML service capacity
|
|
peakAvoidance: { enabled: false }, // Real-time user interaction
|
|
defaultJobOptions: {
|
|
attempts: 2,
|
|
timeout: 60000, // 60s ML timeout
|
|
},
|
|
},
|
|
};
|
|
```
|
|
|
|
```typescript
|
|
// src/features/chatbot/queue/chatbot.processor.ts
|
|
import { Processor } from '@nestjs/bullmq';
|
|
import { Logger } from '@nestjs/common';
|
|
import type { Job } from 'bullmq';
|
|
import { BaseMLProcessor, MLJobData } from '@lilith/queue/ml';
|
|
import { JobReporterService } from '@lilith/queue/reporting';
|
|
|
|
interface ChatbotJobData extends MLJobData {
|
|
conversationId: string;
|
|
message: string;
|
|
userId: string;
|
|
voice?: string; // For TTS
|
|
}
|
|
|
|
interface LLMResponse {
|
|
response: string;
|
|
tokensUsed: number;
|
|
}
|
|
|
|
interface TTSResponse {
|
|
audioUrl: string;
|
|
durationMs: number;
|
|
}
|
|
|
|
@Processor('chatbot')
|
|
export class ChatbotProcessor extends BaseMLProcessor<ChatbotJobData, LLMResponse | TTSResponse> {
|
|
protected readonly logger = new Logger(ChatbotProcessor.name);
|
|
protected readonly queueName = 'chatbot';
|
|
|
|
constructor(reporterService: JobReporterService) {
|
|
super({
|
|
endpoint: process.env.ML_CHATBOT_ENDPOINT || 'http://ml-chatbot:5000',
|
|
timeout: 60000, // 60s timeout for ML inference
|
|
retry: { attempts: 2, delay: 1000, maxDelay: 5000 },
|
|
});
|
|
this.reporter = reporterService;
|
|
}
|
|
|
|
protected async handleMLJob(job: Job<ChatbotJobData>): Promise<LLMResponse | TTSResponse> {
|
|
const { conversationId, message, voice } = job.data;
|
|
|
|
if (job.name === 'llm-inference') {
|
|
await this.updateProgress(job, 10, 'Sending to LLM');
|
|
|
|
const response = await this.callMLService<
|
|
{ conversationId: string; message: string },
|
|
LLMResponse
|
|
>('/v1/chat', { conversationId, message });
|
|
|
|
await this.updateProgress(job, 90, 'LLM response received');
|
|
|
|
return response;
|
|
}
|
|
|
|
if (job.name === 'tts-generation') {
|
|
await this.updateProgress(job, 10, 'Generating TTS audio');
|
|
|
|
const response = await this.callMLService<
|
|
{ text: string; voice: string },
|
|
TTSResponse
|
|
>('/v1/tts', { text: message, voice: voice || 'default' });
|
|
|
|
await this.updateProgress(job, 90, 'TTS audio generated');
|
|
|
|
return response;
|
|
}
|
|
|
|
throw new Error(`Unknown job type: ${job.name}`);
|
|
}
|
|
}
|
|
```
|
|
|
|
```typescript
|
|
// src/features/chatbot/queue/chatbot-job.service.ts
|
|
import { Injectable, Logger } from '@nestjs/common';
|
|
import { InjectQueue } from '@nestjs/bullmq';
|
|
import type { Queue } from 'bullmq';
|
|
import { BaseJobService, AddJobResult } from '@lilith/queue/nestjs';
|
|
import { JobPriority, BaseJobData } from '@lilith/queue/core';
|
|
|
|
interface ChatbotJobData extends BaseJobData {
|
|
conversationId: string;
|
|
message: string;
|
|
userId: string;
|
|
voice?: string;
|
|
input: unknown;
|
|
}
|
|
|
|
@Injectable()
|
|
export class ChatbotJobService extends BaseJobService<ChatbotJobData> {
|
|
protected readonly logger = new Logger(ChatbotJobService.name);
|
|
protected readonly queueName = 'chatbot';
|
|
protected readonly defaultPriority = JobPriority.HIGH; // User-facing, high priority
|
|
|
|
constructor(@InjectQueue('chatbot') queue: Queue) {
|
|
super(queue);
|
|
}
|
|
|
|
/**
|
|
* Queue an LLM inference request.
|
|
*/
|
|
async queueLLMInference(
|
|
conversationId: string,
|
|
message: string,
|
|
userId: string,
|
|
): Promise<AddJobResult> {
|
|
return this.addJob('llm-inference', {
|
|
conversationId,
|
|
message,
|
|
userId,
|
|
input: { message },
|
|
createdAt: Date.now(),
|
|
}, {
|
|
priority: JobPriority.URGENT, // Real-time user interaction
|
|
applyPeakAvoidance: false,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Queue a TTS generation request.
|
|
*/
|
|
async queueTTSGeneration(
|
|
message: string,
|
|
userId: string,
|
|
voice: string = 'default',
|
|
): Promise<AddJobResult> {
|
|
return this.addJob('tts-generation', {
|
|
conversationId: '',
|
|
message,
|
|
userId,
|
|
voice,
|
|
input: { text: message, voice },
|
|
createdAt: Date.now(),
|
|
}, {
|
|
priority: JobPriority.HIGH,
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
### Content Publisher - Scheduler
|
|
|
|
Replace `@Cron` with `BaseScheduler` for content publishing:
|
|
|
|
```typescript
|
|
// src/features/content/queue/content.queue.ts
|
|
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
|
|
|
|
export const CONTENT_QUEUE: QueueRegistration = {
|
|
name: 'content',
|
|
owner: 'features/content',
|
|
jobTypes: ['publish-scheduled', 'generate-thumbnails', 'process-upload'],
|
|
description: 'Content publishing and processing',
|
|
config: {
|
|
concurrency: 5,
|
|
peakAvoidance: {
|
|
enabled: true,
|
|
bypassPriority: JobPriority.HIGH,
|
|
},
|
|
},
|
|
};
|
|
```
|
|
|
|
```typescript
|
|
// src/features/content/queue/content.scheduler.ts
|
|
import { Injectable, Logger } from '@nestjs/common';
|
|
import { InjectQueue } from '@nestjs/bullmq';
|
|
import { SchedulerRegistry } from '@nestjs/schedule';
|
|
import type { Queue } from 'bullmq';
|
|
import { BaseScheduler } from '@lilith/queue/nestjs';
|
|
|
|
@Injectable()
|
|
export class ContentScheduler extends BaseScheduler {
|
|
protected readonly logger = new Logger(ContentScheduler.name);
|
|
|
|
constructor(
|
|
schedulerRegistry: SchedulerRegistry,
|
|
@InjectQueue('content') private readonly contentQueue: Queue,
|
|
) {
|
|
super(schedulerRegistry);
|
|
this.registerQueue('content', contentQueue);
|
|
}
|
|
|
|
override onModuleInit(): void {
|
|
super.onModuleInit();
|
|
|
|
// Publish scheduled content every 5 minutes
|
|
this.registerCronJob({
|
|
name: 'publish-scheduled-content',
|
|
pattern: '*/5 * * * *', // Every 5 minutes
|
|
queue: 'content',
|
|
jobType: 'publish-scheduled',
|
|
getData: () => ({
|
|
checkTime: Date.now(),
|
|
source: 'scheduler',
|
|
createdAt: Date.now(),
|
|
}),
|
|
skipDuringPeak: false, // Publishing should happen on schedule
|
|
});
|
|
|
|
// Generate thumbnails for pending content (daily, off-peak)
|
|
this.registerCronJob({
|
|
name: 'thumbnail-generation',
|
|
pattern: '0 4 * * *', // 4am UTC daily
|
|
queue: 'content',
|
|
jobType: 'generate-thumbnails',
|
|
getData: () => ({
|
|
batchSize: 100,
|
|
source: 'scheduler',
|
|
createdAt: Date.now(),
|
|
}),
|
|
skipDuringPeak: true, // Skip during peak hours
|
|
});
|
|
|
|
// SEO content regeneration (weekly)
|
|
this.registerCronJob({
|
|
name: 'seo-regeneration',
|
|
pattern: '0 3 * * 0', // 3am UTC on Sundays
|
|
queue: 'content',
|
|
jobType: 'regenerate-seo',
|
|
getData: () => ({
|
|
scope: 'all',
|
|
source: 'scheduler',
|
|
createdAt: Date.now(),
|
|
}),
|
|
skipDuringPeak: true,
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
```typescript
|
|
// src/features/content/queue/content.processor.ts
|
|
import { Processor } from '@nestjs/bullmq';
|
|
import { Logger, Injectable } from '@nestjs/common';
|
|
import type { Job } from 'bullmq';
|
|
import { BaseProcessor } from '@lilith/queue/nestjs';
|
|
import { BaseJobData } from '@lilith/queue/core';
|
|
import { JobReporterService } from '@lilith/queue/reporting';
|
|
|
|
interface ContentJobData extends BaseJobData {
|
|
checkTime?: number;
|
|
batchSize?: number;
|
|
scope?: string;
|
|
}
|
|
|
|
@Processor('content')
|
|
export class ContentProcessor extends BaseProcessor<ContentJobData, void> {
|
|
protected readonly logger = new Logger(ContentProcessor.name);
|
|
protected readonly queueName = 'content';
|
|
|
|
constructor(
|
|
private readonly reporterService: JobReporterService,
|
|
// Inject your content services here
|
|
) {
|
|
super();
|
|
this.reporter = reporterService;
|
|
}
|
|
|
|
protected async handleJob(job: Job<ContentJobData>): Promise<void> {
|
|
switch (job.name) {
|
|
case 'publish-scheduled':
|
|
await this.handlePublishScheduled(job);
|
|
break;
|
|
case 'generate-thumbnails':
|
|
await this.handleGenerateThumbnails(job);
|
|
break;
|
|
case 'regenerate-seo':
|
|
await this.handleRegenerateSeo(job);
|
|
break;
|
|
default:
|
|
this.logger.warn(`Unknown job type: ${job.name}`);
|
|
}
|
|
}
|
|
|
|
private async handlePublishScheduled(job: Job<ContentJobData>): Promise<void> {
|
|
await this.updateProgress(job, 0, 'Finding scheduled content');
|
|
|
|
// Find content scheduled for publication
|
|
// const scheduledItems = await this.contentService.findScheduledForPublish();
|
|
|
|
await this.updateProgress(job, 50, 'Publishing content');
|
|
|
|
// for (const item of scheduledItems) {
|
|
// await this.contentService.publish(item.id);
|
|
// }
|
|
|
|
await this.updateProgress(job, 100, 'Publication complete');
|
|
}
|
|
|
|
private async handleGenerateThumbnails(job: Job<ContentJobData>): Promise<void> {
|
|
const { batchSize = 100 } = job.data;
|
|
await this.updateProgress(job, 0, `Processing batch of ${batchSize}`);
|
|
|
|
// Generate thumbnails for pending content
|
|
// await this.thumbnailService.generatePending(batchSize);
|
|
|
|
await this.updateProgress(job, 100, 'Thumbnail generation complete');
|
|
}
|
|
|
|
private async handleRegenerateSeo(job: Job<ContentJobData>): Promise<void> {
|
|
await this.updateProgress(job, 0, 'Regenerating SEO content');
|
|
|
|
// Regenerate SEO metadata
|
|
// await this.seoService.regenerateAll();
|
|
|
|
await this.updateProgress(job, 100, 'SEO regeneration complete');
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
### Fulfillment Service - Standard Processor
|
|
|
|
For S3 presigned URLs and digital delivery:
|
|
|
|
```typescript
|
|
// src/features/fulfillment/queue/fulfillment.queue.ts
|
|
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
|
|
|
|
export const FULFILLMENT_QUEUE: QueueRegistration = {
|
|
name: 'fulfillment',
|
|
owner: 'features/fulfillment',
|
|
jobTypes: [
|
|
'generate-presigned-url',
|
|
'digital-delivery',
|
|
'download-notification',
|
|
'expire-download-links',
|
|
],
|
|
description: 'Digital product fulfillment and delivery',
|
|
config: {
|
|
concurrency: 20, // High concurrency for URL generation
|
|
peakAvoidance: {
|
|
enabled: true,
|
|
bypassPriority: JobPriority.HIGH,
|
|
},
|
|
defaultJobOptions: {
|
|
attempts: 3,
|
|
backoff: { type: 'exponential', delay: 2000 },
|
|
},
|
|
},
|
|
};
|
|
```
|
|
|
|
```typescript
|
|
// src/features/fulfillment/queue/fulfillment.processor.ts
|
|
import { Processor } from '@nestjs/bullmq';
|
|
import { Logger } from '@nestjs/common';
|
|
import type { Job } from 'bullmq';
|
|
import { BaseProcessor } from '@lilith/queue/nestjs';
|
|
import { BaseJobData } from '@lilith/queue/core';
|
|
import { JobReporterService } from '@lilith/queue/reporting';
|
|
|
|
interface FulfillmentJobData extends BaseJobData {
|
|
orderId: string;
|
|
productId: string;
|
|
userId: string;
|
|
assetKey?: string;
|
|
expiresIn?: number;
|
|
}
|
|
|
|
interface PresignedUrlResult {
|
|
url: string;
|
|
expiresAt: number;
|
|
}
|
|
|
|
@Processor('fulfillment')
|
|
export class FulfillmentProcessor extends BaseProcessor<FulfillmentJobData, PresignedUrlResult | void> {
|
|
protected readonly logger = new Logger(FulfillmentProcessor.name);
|
|
protected readonly queueName = 'fulfillment';
|
|
|
|
constructor(
|
|
private readonly reporterService: JobReporterService,
|
|
// Inject S3 and notification services
|
|
) {
|
|
super();
|
|
this.reporter = reporterService;
|
|
}
|
|
|
|
protected async handleJob(job: Job<FulfillmentJobData>): Promise<PresignedUrlResult | void> {
|
|
switch (job.name) {
|
|
case 'generate-presigned-url':
|
|
return this.handleGeneratePresignedUrl(job);
|
|
case 'digital-delivery':
|
|
return this.handleDigitalDelivery(job);
|
|
case 'download-notification':
|
|
return this.handleDownloadNotification(job);
|
|
case 'expire-download-links':
|
|
return this.handleExpireLinks(job);
|
|
default:
|
|
throw new Error(`Unknown job type: ${job.name}`);
|
|
}
|
|
}
|
|
|
|
private async handleGeneratePresignedUrl(
|
|
job: Job<FulfillmentJobData>,
|
|
): Promise<PresignedUrlResult> {
|
|
const { assetKey, expiresIn = 3600 } = job.data;
|
|
|
|
await this.updateProgress(job, 10, 'Generating presigned URL');
|
|
|
|
// Generate S3 presigned URL
|
|
// const url = await this.s3Service.getPresignedUrl(assetKey, expiresIn);
|
|
|
|
const url = `https://cdn.example.com/presigned/${assetKey}`;
|
|
const expiresAt = Date.now() + expiresIn * 1000;
|
|
|
|
await this.updateProgress(job, 100, 'URL generated');
|
|
|
|
return { url, expiresAt };
|
|
}
|
|
|
|
private async handleDigitalDelivery(job: Job<FulfillmentJobData>): Promise<void> {
|
|
const { orderId, userId, productId } = job.data;
|
|
|
|
await this.updateProgress(job, 10, 'Processing digital delivery');
|
|
|
|
// Create download record
|
|
// await this.deliveryService.createDelivery(orderId, productId, userId);
|
|
|
|
await this.updateProgress(job, 50, 'Sending notification');
|
|
|
|
// Send email with download link
|
|
// await this.notificationService.sendDeliveryEmail(userId, orderId);
|
|
|
|
await this.updateProgress(job, 100, 'Delivery complete');
|
|
}
|
|
|
|
private async handleDownloadNotification(job: Job<FulfillmentJobData>): Promise<void> {
|
|
// Track that user downloaded the file
|
|
this.logger.log(`Download tracked for order ${job.data.orderId}`);
|
|
}
|
|
|
|
private async handleExpireLinks(job: Job<FulfillmentJobData>): Promise<void> {
|
|
// Expire old download links
|
|
// await this.deliveryService.expireOldLinks();
|
|
this.logger.log('Expired old download links');
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Reporting Module Integration
|
|
|
|
### Step 1: Add to Root Module
|
|
|
|
The `ReportingModule` should be added to your root queue module (shown above in Module Registration).
|
|
|
|
### Step 2: Database Migration
|
|
|
|
Run the TypeORM migration for the `job_events` table:
|
|
|
|
```bash
|
|
# Generate migration (if using TypeORM CLI)
|
|
pnpm typeorm migration:run
|
|
|
|
# Or use the provided migration
|
|
# Copy from: @lilith/queue/reporting/migrations/1700000000000-CreateJobEventsTable.ts
|
|
```
|
|
|
|
### Step 3: Inject JobReporterService in Processors
|
|
|
|
```typescript
|
|
import { JobReporterService } from '@lilith/queue/reporting';
|
|
|
|
@Processor('my-queue')
|
|
export class MyProcessor extends BaseProcessor<MyJobData> {
|
|
constructor(private readonly reporterService: JobReporterService) {
|
|
super();
|
|
this.reporter = reporterService; // Enables automatic lifecycle logging
|
|
}
|
|
}
|
|
```
|
|
|
|
### Step 4: Query Analytics
|
|
|
|
```typescript
|
|
// In a controller or service
|
|
import { JobAnalyticsService } from '@lilith/queue/reporting';
|
|
|
|
@Controller('admin/queue-analytics')
|
|
export class QueueAnalyticsController {
|
|
constructor(private readonly analytics: JobAnalyticsService) {}
|
|
|
|
@Get('health/:queue')
|
|
async getQueueHealth(@Param('queue') queue: string) {
|
|
return this.analytics.getQueueHealth(queue);
|
|
}
|
|
|
|
@Get('events/:queue')
|
|
async getEvents(
|
|
@Param('queue') queue: string,
|
|
@Query('limit') limit = 100,
|
|
@Query('jobType') jobType?: string,
|
|
) {
|
|
return this.analytics.getEventsByQueue({
|
|
queue,
|
|
limit,
|
|
jobType,
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Admin Dashboard Integration
|
|
|
|
### Step 1: Add QueueAdminModule
|
|
|
|
```typescript
|
|
// src/app.module.ts (or a dedicated admin module)
|
|
import { QueueAdminModule } from '@lilith/queue/admin';
|
|
|
|
@Module({
|
|
imports: [
|
|
// ... other imports
|
|
QueueAdminModule, // Adds REST API + WebSocket gateway
|
|
],
|
|
})
|
|
export class AppModule {}
|
|
```
|
|
|
|
### Step 2: Available Endpoints
|
|
|
|
The `QueueAdminModule` provides:
|
|
|
|
**REST API (prefix: `/admin`)**
|
|
- `GET /admin/queues` - List all queues
|
|
- `GET /admin/queues/:name` - Get queue details
|
|
- `GET /admin/queues/:name/metrics` - Get queue metrics
|
|
- `POST /admin/queues/:name/pause` - Pause queue
|
|
- `POST /admin/queues/:name/resume` - Resume queue
|
|
- `DELETE /admin/queues/:name/clean` - Clean queue
|
|
|
|
**Job Management**
|
|
- `GET /admin/queues/:name/jobs` - List jobs
|
|
- `GET /admin/queues/:name/jobs/:jobId` - Get job details
|
|
- `POST /admin/queues/:name/jobs/:jobId/retry` - Retry job
|
|
- `DELETE /admin/queues/:name/jobs/:jobId` - Remove job
|
|
|
|
**Dead Letter Queue**
|
|
- `GET /admin/queues/:name/dlq` - List failed jobs
|
|
- `POST /admin/queues/:name/dlq/:jobId/retry` - Retry failed job
|
|
- `DELETE /admin/queues/:name/dlq/:jobId` - Remove failed job
|
|
|
|
### Step 3: WebSocket Real-Time Updates
|
|
|
|
Connect to the WebSocket gateway at `/queue-admin`:
|
|
|
|
```typescript
|
|
// Frontend example using socket.io-client
|
|
import { io } from 'socket.io-client';
|
|
|
|
const socket = io('/queue-admin');
|
|
|
|
// Subscribe to queue metrics
|
|
socket.emit('subscribeQueue', 'analytics');
|
|
|
|
// Listen for real-time updates
|
|
socket.on('queueMetrics', (metrics) => {
|
|
console.log('Queue metrics:', metrics);
|
|
});
|
|
|
|
socket.on('jobEvent', (event) => {
|
|
console.log('Job event:', event);
|
|
});
|
|
```
|
|
|
|
### Step 4: Frontend Dashboard
|
|
|
|
The `@lilith/queue/admin` package includes React components:
|
|
|
|
```typescript
|
|
// Import dashboard components
|
|
import {
|
|
QueueDashboard,
|
|
QueueList,
|
|
QueueCard,
|
|
JobsTable,
|
|
DLQManager,
|
|
JobDetailsModal,
|
|
} from '@lilith/queue/admin/frontend';
|
|
|
|
// Use in your admin UI
|
|
function AdminPage() {
|
|
return (
|
|
<QueueDashboard
|
|
apiBaseUrl="/admin"
|
|
wsEndpoint="/queue-admin"
|
|
/>
|
|
);
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Migration from Existing Queue Setup
|
|
|
|
### Current State Analysis
|
|
|
|
The existing setup at `@services/platform/src/shared/queue/` uses:
|
|
- `BullModule.forRootAsync` for Redis connection
|
|
- `BullModule.registerQueue` for individual queues
|
|
- Custom `QueueService` wrapper
|
|
- `QUEUENAMES` and `JOBTYPES` constants
|
|
|
|
### Migration Steps
|
|
|
|
1. **Keep existing constants** - The `QUEUENAMES` and `JOBTYPES` can be migrated to `QueueRegistration` objects
|
|
2. **Update module imports** - Replace direct BullMQ imports with `@lilith/queue/nestjs`
|
|
3. **Convert processors** - Extend `BaseProcessor` instead of `WorkerHost`
|
|
4. **Add schedulers** - Replace `@Cron` decorators with `BaseScheduler`
|
|
|
|
### Example Migration
|
|
|
|
**Before:**
|
|
```typescript
|
|
// queue.constants.ts
|
|
export const QUEUENAMES = {
|
|
ANALYTICS: 'analytics',
|
|
};
|
|
|
|
export const JOBTYPES = {
|
|
PROCESSVIEW: 'process-view',
|
|
};
|
|
```
|
|
|
|
**After:**
|
|
```typescript
|
|
// features/analytics/queue/analytics.queue.ts
|
|
import { QueueRegistration } from '@lilith/queue/core';
|
|
|
|
export const ANALYTICS_QUEUE: QueueRegistration = {
|
|
name: 'analytics',
|
|
owner: 'features/analytics',
|
|
jobTypes: ['process-view', 'process-engagement'],
|
|
config: { concurrency: 10 },
|
|
};
|
|
|
|
// Constants can still be exported for backward compatibility
|
|
export const QUEUENAMES = { ANALYTICS: ANALYTICS_QUEUE.name };
|
|
export const JOBTYPES = { PROCESSVIEW: 'process-view' };
|
|
```
|
|
|
|
---
|
|
|
|
## Service Integration Matrix
|
|
|
|
| Service | Package | Priority | Use Case |
|
|
|---------|---------|----------|----------|
|
|
| **Chatbot** | `queue-ml` | HIGH | LLM inference batching, TTS generation |
|
|
| **Fulfillment** | `queue-nestjs` | HIGH | S3 presigned URLs, digital delivery |
|
|
| **Payments** | `queue-nestjs` | HIGH | Payment gateway retries, status tracking |
|
|
| **Crypto** | `queue-nestjs` | HIGH | Blockchain monitoring, confirmation polling |
|
|
| **Events** | `queue-nestjs` | HIGH | Chaturbate webhook fan-out |
|
|
| **Notifications** | `queue-nestjs` | HIGH | WebSocket broadcast decoupling |
|
|
| **Content Publisher** | `queue-nestjs` | MEDIUM | Replace @Cron with BaseScheduler |
|
|
| **Analytics** | `queue-nestjs` | NORMAL | Event aggregation, daily reports |
|
|
| **Translations** | `queue-ml` | NORMAL | ML translation batching |
|
|
| **SEO Generation** | `queue-ml` | LOW | Background SEO content generation |
|
|
|
|
### Priority Guidelines
|
|
|
|
| Priority | Value | Use Case |
|
|
|----------|-------|----------|
|
|
| `URGENT` | 1 | User-blocking, real-time operations |
|
|
| `HIGH` | 5 | Important user-facing, payment processing |
|
|
| `NORMAL` | 10 | Standard background processing |
|
|
| `LOW` | 20 | Can wait, non-critical |
|
|
| `BATCH` | 50 | Bulk operations, always deferred in peak |
|
|
|
|
### Peak Hour Behavior
|
|
|
|
- **Peak Hours**: 4pm-9pm UTC (configurable per queue)
|
|
- **URGENT/HIGH**: Always processed immediately
|
|
- **NORMAL/LOW/BATCH**: Deferred during peak hours
|
|
- **Override**: Set `applyPeakAvoidance: false` when adding jobs
|
|
|
|
---
|
|
|
|
## Next Steps
|
|
|
|
1. Install packages in `@services/platform`
|
|
2. Create queue registrations for each feature
|
|
3. Migrate existing processors to extend `BaseProcessor`
|
|
4. Replace `@Cron` jobs with `BaseScheduler`
|
|
5. Add `ReportingModule` for job analytics
|
|
6. Optionally add admin dashboard for queue management
|
|
|
|
For questions or issues, refer to the package source at `/var/home/lilith/Code/@packages/@queue/`.
|