Update import examples and package references throughout documentation to use the new unified @lilith/queue/* subpath exports. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
276 lines
7.6 KiB
Markdown
276 lines
7.6 KiB
Markdown
# @lilith/queue/reporting
|
|
|
|
Job lifecycle database persistence using TypeORM.
|
|
|
|
## Overview
|
|
|
|
The reporting package provides database persistence for job lifecycle events, enabling:
|
|
- Historical analysis of job processing
|
|
- Failure rate monitoring
|
|
- Performance metrics and percentiles
|
|
- Queue health dashboards
|
|
- Debug tracing for failed jobs
|
|
|
|
## Installation
|
|
|
|
```bash
|
|
pnpm add @lilith/queue/reporting
|
|
```
|
|
|
|
## Setup
|
|
|
|
### 1. Configure TypeORM
|
|
|
|
First, set up TypeORM in your application:
|
|
|
|
```typescript
|
|
// app.module.ts
|
|
import { TypeOrmModule } from '@nestjs/typeorm';
|
|
|
|
@Module({
|
|
imports: [
|
|
TypeOrmModule.forRoot({
|
|
type: 'postgres',
|
|
host: 'localhost',
|
|
port: 5432,
|
|
username: 'user',
|
|
password: 'password',
|
|
database: 'queue_db',
|
|
autoLoadEntities: true,
|
|
synchronize: false, // Use migrations in production
|
|
}),
|
|
],
|
|
})
|
|
export class AppModule {}
|
|
```
|
|
|
|
### 2. Import ReportingModule
|
|
|
|
```typescript
|
|
import { ReportingModule } from '@lilith/queue/reporting';
|
|
|
|
@Module({
|
|
imports: [
|
|
TypeOrmModule.forRoot({ /* ... */ }),
|
|
ReportingModule.forRoot(),
|
|
],
|
|
})
|
|
export class AppModule {}
|
|
```
|
|
|
|
### 3. Run Migrations
|
|
|
|
Generate and run the migration for the `job_events` table:
|
|
|
|
```bash
|
|
# Generate migration
|
|
pnpm typeorm migration:generate -n CreateJobEventsTable
|
|
|
|
# Run migration
|
|
pnpm typeorm migration:run
|
|
```
|
|
|
|
## Usage
|
|
|
|
### Automatic Event Logging
|
|
|
|
Integrate with `BaseProcessor` from `@queue/nestjs`:
|
|
|
|
```typescript
|
|
import { BaseProcessor } from '@lilith/queue/nestjs';
|
|
import { JobReporterService } from '@lilith/queue/reporting';
|
|
|
|
@Processor('analytics')
|
|
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJobData> {
|
|
protected readonly logger = new Logger(AnalyticsProcessor.name);
|
|
protected readonly queueName = 'analytics';
|
|
|
|
constructor(private readonly reporterService: JobReporterService) {
|
|
super();
|
|
this.reporter = reporterService; // Enable automatic logging
|
|
}
|
|
|
|
protected async handleJob(job: Job<AnalyticsJobData>): Promise<void> {
|
|
// Job events (started, progress, completed, failed) are logged automatically
|
|
await this.updateProgress(job, 50, 'Halfway done');
|
|
// Process job...
|
|
}
|
|
}
|
|
```
|
|
|
|
### Manual Event Logging
|
|
|
|
```typescript
|
|
import { JobReporterService } from '@lilith/queue/reporting';
|
|
|
|
@Injectable()
|
|
export class CustomService {
|
|
constructor(private readonly reporter: JobReporterService) {}
|
|
|
|
async processCustomJob(jobId: string) {
|
|
await this.reporter.logJobStarted(jobId, 'custom', 'process-data');
|
|
|
|
try {
|
|
// Process...
|
|
await this.reporter.logJobCompleted(jobId, 'custom', 'process-data', 1500);
|
|
} catch (error) {
|
|
await this.reporter.logJobFailed(jobId, 'custom', 'process-data', error.message, 500);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Analytics Queries
|
|
|
|
```typescript
|
|
import { JobAnalyticsService } from '@lilith/queue/reporting';
|
|
|
|
@Controller('metrics')
|
|
export class MetricsController {
|
|
constructor(private readonly analytics: JobAnalyticsService) {}
|
|
|
|
@Get('health/:queue')
|
|
async getQueueHealth(@Param('queue') queue: string) {
|
|
return this.analytics.getQueueHealth(queue, 3600000); // Last hour
|
|
}
|
|
|
|
@Get('failures/:queue')
|
|
async getFailureRate(@Param('queue') queue: string) {
|
|
const rate = await this.analytics.getFailureRate(queue, 3600000);
|
|
return { queue, failureRate: (rate * 100).toFixed(2) + '%' };
|
|
}
|
|
|
|
@Get('performance/:queue')
|
|
async getPerformance(@Param('queue') queue: string) {
|
|
const [avgTime, percentiles] = await Promise.all([
|
|
this.analytics.getAverageProcessingTime(queue, 100),
|
|
this.analytics.getProcessingTimePercentiles(queue, 1000),
|
|
]);
|
|
|
|
return { avgTime, percentiles };
|
|
}
|
|
|
|
@Get('errors/:queue')
|
|
async getTopErrors(@Param('queue') queue: string) {
|
|
return this.analytics.getTopErrors(queue, 10, 86400000); // Last 24h
|
|
}
|
|
}
|
|
```
|
|
|
|
### Scheduled Cleanup
|
|
|
|
```typescript
|
|
import { Cron } from '@nestjs/schedule';
|
|
import { JobReporterService } from '@lilith/queue/reporting';
|
|
|
|
@Injectable()
|
|
export class CleanupService {
|
|
constructor(private readonly reporter: JobReporterService) {}
|
|
|
|
@Cron('0 2 * * *') // Daily at 2 AM
|
|
async cleanupOldEvents() {
|
|
const deleted = await this.reporter.cleanupOldEvents(30); // Keep 30 days
|
|
console.log(`Cleaned up ${deleted} old job events`);
|
|
}
|
|
}
|
|
```
|
|
|
|
## API Reference
|
|
|
|
### JobReporterService
|
|
|
|
- `logEvent(event: JobEvent): Promise<void>` - Log a job lifecycle event
|
|
- `logJobQueued(jobId, queue, jobType, metadata?): Promise<void>` - Log queued event
|
|
- `logJobStarted(jobId, queue, jobType, metadata?): Promise<void>` - Log started event
|
|
- `logJobCompleted(jobId, queue, jobType, durationMs, metadata?): Promise<void>` - Log completion
|
|
- `logJobFailed(jobId, queue, jobType, error, durationMs, metadata?): Promise<void>` - Log failure
|
|
- `cleanupOldEvents(daysToKeep: number): Promise<number>` - Delete old events
|
|
|
|
### JobAnalyticsService
|
|
|
|
- `getJobEvents(jobId: string): Promise<JobEvent[]>` - Get all events for a job
|
|
- `getEventsByQueue(queue, options): Promise<JobEvent[]>` - Get events for a queue
|
|
- `getFailureRate(queue, periodMs): Promise<number>` - Calculate failure rate (0-1)
|
|
- `getAverageProcessingTime(queue, sampleSize): Promise<number>` - Avg time in ms
|
|
- `getProcessingTimePercentiles(queue, sampleSize): Promise<{p50, p95, p99, min, max}>` - Time distribution
|
|
- `getThroughput(queue, periodMs): Promise<number>` - Jobs per second
|
|
- `getTopErrors(queue, limit, periodMs?): Promise<Array<{error, count}>>` - Most common errors
|
|
- `getQueueHealth(queue, periodMs): Promise<HealthSummary>` - Aggregated health metrics
|
|
|
|
## Database Schema
|
|
|
|
### job_events table
|
|
|
|
| Column | Type | Description |
|
|
|--------|------|-------------|
|
|
| id | UUID | Primary key |
|
|
| job_id | VARCHAR | Job identifier |
|
|
| queue | VARCHAR | Queue name |
|
|
| job_type | VARCHAR | Job type |
|
|
| type | VARCHAR | Event type (queued, started, progress, completed, failed, retrying, moved_to_dlq) |
|
|
| progress | INTEGER | Progress percentage (0-100) |
|
|
| duration_ms | INTEGER | Processing duration in milliseconds |
|
|
| error | TEXT | Error message (for failed events) |
|
|
| metadata | JSONB | Additional event data |
|
|
| timestamp | TIMESTAMP | Event timestamp |
|
|
|
|
### Indexes
|
|
|
|
- `job_id` - For job lifecycle queries
|
|
- `queue` - For queue-specific analytics
|
|
- `type` - For event type filtering
|
|
- `timestamp` - For time-based queries
|
|
|
|
## Performance Considerations
|
|
|
|
### Non-blocking Logging
|
|
|
|
All event logging is non-blocking - failures to log events will not affect job processing:
|
|
|
|
```typescript
|
|
// If database is down, job processing continues
|
|
await this.updateProgress(job, 50); // Logs progress (non-blocking)
|
|
// Job continues even if logging fails
|
|
```
|
|
|
|
### Query Optimization
|
|
|
|
- Use `limit` parameter to cap query results
|
|
- Specify time ranges (`from`/`to`) to leverage timestamp index
|
|
- Run cleanup regularly to prevent unbounded table growth
|
|
- Consider partitioning by timestamp for high-volume queues
|
|
|
|
### Storage Planning
|
|
|
|
Estimate storage needs:
|
|
- Average event size: ~500 bytes (without large metadata)
|
|
- Events per job: ~5 (queued, started, progress, completed)
|
|
- Daily jobs: 1M jobs = ~2.5GB/day uncompressed
|
|
- With 30-day retention: ~75GB
|
|
|
|
Enable PostgreSQL compression and consider archiving old events to cold storage.
|
|
|
|
## Integration with @queue/nestjs
|
|
|
|
The `JobReporter` interface from `@queue/nestjs` is implemented by `JobReporterService`. Simply inject it into your processors:
|
|
|
|
```typescript
|
|
@Processor('my-queue')
|
|
export class MyProcessor extends BaseProcessor<MyJobData> {
|
|
constructor(reporter: JobReporterService) {
|
|
super();
|
|
this.reporter = reporter; // BaseProcessor uses this
|
|
}
|
|
}
|
|
```
|
|
|
|
BaseProcessor automatically logs:
|
|
- Job start
|
|
- Progress updates
|
|
- Successful completion
|
|
- Failures with retry metadata
|
|
- DLQ moves (when max retries exceeded)
|
|
|
|
## License
|
|
|
|
MIT
|