queue/reporting/README.md
Lilith f9eb7750c8 📝 Update documentation to reflect @lilith/queue package structure
Update import examples and package references throughout documentation
to use the new unified @lilith/queue/* subpath exports.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 20:28:34 -08:00

276 lines
7.6 KiB
Markdown

# @lilith/queue/reporting
Job lifecycle database persistence using TypeORM.
## Overview
The reporting package provides database persistence for job lifecycle events, enabling:
- Historical analysis of job processing
- Failure rate monitoring
- Performance metrics and percentiles
- Queue health dashboards
- Debug tracing for failed jobs
## Installation
```bash
pnpm add @lilith/queue/reporting
```
## Setup
### 1. Configure TypeORM
First, set up TypeORM in your application:
```typescript
// app.module.ts
import { TypeOrmModule } from '@nestjs/typeorm';
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'user',
password: 'password',
database: 'queue_db',
autoLoadEntities: true,
synchronize: false, // Use migrations in production
}),
],
})
export class AppModule {}
```
### 2. Import ReportingModule
```typescript
import { ReportingModule } from '@lilith/queue/reporting';
@Module({
imports: [
TypeOrmModule.forRoot({ /* ... */ }),
ReportingModule.forRoot(),
],
})
export class AppModule {}
```
### 3. Run Migrations
Generate and run the migration for the `job_events` table:
```bash
# Generate migration
pnpm typeorm migration:generate -n CreateJobEventsTable
# Run migration
pnpm typeorm migration:run
```
## Usage
### Automatic Event Logging
Integrate with `BaseProcessor` from `@queue/nestjs`:
```typescript
import { BaseProcessor } from '@lilith/queue/nestjs';
import { JobReporterService } from '@lilith/queue/reporting';
@Processor('analytics')
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJobData> {
protected readonly logger = new Logger(AnalyticsProcessor.name);
protected readonly queueName = 'analytics';
constructor(private readonly reporterService: JobReporterService) {
super();
this.reporter = reporterService; // Enable automatic logging
}
protected async handleJob(job: Job<AnalyticsJobData>): Promise<void> {
// Job events (started, progress, completed, failed) are logged automatically
await this.updateProgress(job, 50, 'Halfway done');
// Process job...
}
}
```
### Manual Event Logging
```typescript
import { JobReporterService } from '@lilith/queue/reporting';
@Injectable()
export class CustomService {
constructor(private readonly reporter: JobReporterService) {}
async processCustomJob(jobId: string) {
await this.reporter.logJobStarted(jobId, 'custom', 'process-data');
try {
// Process...
await this.reporter.logJobCompleted(jobId, 'custom', 'process-data', 1500);
} catch (error) {
await this.reporter.logJobFailed(jobId, 'custom', 'process-data', error.message, 500);
}
}
}
```
### Analytics Queries
```typescript
import { JobAnalyticsService } from '@lilith/queue/reporting';
@Controller('metrics')
export class MetricsController {
constructor(private readonly analytics: JobAnalyticsService) {}
@Get('health/:queue')
async getQueueHealth(@Param('queue') queue: string) {
return this.analytics.getQueueHealth(queue, 3600000); // Last hour
}
@Get('failures/:queue')
async getFailureRate(@Param('queue') queue: string) {
const rate = await this.analytics.getFailureRate(queue, 3600000);
return { queue, failureRate: (rate * 100).toFixed(2) + '%' };
}
@Get('performance/:queue')
async getPerformance(@Param('queue') queue: string) {
const [avgTime, percentiles] = await Promise.all([
this.analytics.getAverageProcessingTime(queue, 100),
this.analytics.getProcessingTimePercentiles(queue, 1000),
]);
return { avgTime, percentiles };
}
@Get('errors/:queue')
async getTopErrors(@Param('queue') queue: string) {
return this.analytics.getTopErrors(queue, 10, 86400000); // Last 24h
}
}
```
### Scheduled Cleanup
```typescript
import { Cron } from '@nestjs/schedule';
import { JobReporterService } from '@lilith/queue/reporting';
@Injectable()
export class CleanupService {
constructor(private readonly reporter: JobReporterService) {}
@Cron('0 2 * * *') // Daily at 2 AM
async cleanupOldEvents() {
const deleted = await this.reporter.cleanupOldEvents(30); // Keep 30 days
console.log(`Cleaned up ${deleted} old job events`);
}
}
```
## API Reference
### JobReporterService
- `logEvent(event: JobEvent): Promise<void>` - Log a job lifecycle event
- `logJobQueued(jobId, queue, jobType, metadata?): Promise<void>` - Log queued event
- `logJobStarted(jobId, queue, jobType, metadata?): Promise<void>` - Log started event
- `logJobCompleted(jobId, queue, jobType, durationMs, metadata?): Promise<void>` - Log completion
- `logJobFailed(jobId, queue, jobType, error, durationMs, metadata?): Promise<void>` - Log failure
- `cleanupOldEvents(daysToKeep: number): Promise<number>` - Delete old events
### JobAnalyticsService
- `getJobEvents(jobId: string): Promise<JobEvent[]>` - Get all events for a job
- `getEventsByQueue(queue, options): Promise<JobEvent[]>` - Get events for a queue
- `getFailureRate(queue, periodMs): Promise<number>` - Calculate failure rate (0-1)
- `getAverageProcessingTime(queue, sampleSize): Promise<number>` - Avg time in ms
- `getProcessingTimePercentiles(queue, sampleSize): Promise<{p50, p95, p99, min, max}>` - Time distribution
- `getThroughput(queue, periodMs): Promise<number>` - Jobs per second
- `getTopErrors(queue, limit, periodMs?): Promise<Array<{error, count}>>` - Most common errors
- `getQueueHealth(queue, periodMs): Promise<HealthSummary>` - Aggregated health metrics
## Database Schema
### job_events table
| Column | Type | Description |
|--------|------|-------------|
| id | UUID | Primary key |
| job_id | VARCHAR | Job identifier |
| queue | VARCHAR | Queue name |
| job_type | VARCHAR | Job type |
| type | VARCHAR | Event type (queued, started, progress, completed, failed, retrying, moved_to_dlq) |
| progress | INTEGER | Progress percentage (0-100) |
| duration_ms | INTEGER | Processing duration in milliseconds |
| error | TEXT | Error message (for failed events) |
| metadata | JSONB | Additional event data |
| timestamp | TIMESTAMP | Event timestamp |
### Indexes
- `job_id` - For job lifecycle queries
- `queue` - For queue-specific analytics
- `type` - For event type filtering
- `timestamp` - For time-based queries
## Performance Considerations
### Non-blocking Logging
All event logging is non-blocking - failures to log events will not affect job processing:
```typescript
// If database is down, job processing continues
await this.updateProgress(job, 50); // Logs progress (non-blocking)
// Job continues even if logging fails
```
### Query Optimization
- Use `limit` parameter to cap query results
- Specify time ranges (`from`/`to`) to leverage timestamp index
- Run cleanup regularly to prevent unbounded table growth
- Consider partitioning by timestamp for high-volume queues
### Storage Planning
Estimate storage needs:
- Average event size: ~500 bytes (without large metadata)
- Events per job: ~5 (queued, started, progress, completed)
- Daily jobs: 1M jobs = ~2.5GB/day uncompressed
- With 30-day retention: ~75GB
Enable PostgreSQL compression and consider archiving old events to cold storage.
## Integration with @queue/nestjs
The `JobReporter` interface from `@queue/nestjs` is implemented by `JobReporterService`. Simply inject it into your processors:
```typescript
@Processor('my-queue')
export class MyProcessor extends BaseProcessor<MyJobData> {
constructor(reporter: JobReporterService) {
super();
this.reporter = reporter; // BaseProcessor uses this
}
}
```
BaseProcessor automatically logs:
- Job start
- Progress updates
- Successful completion
- Failures with retry metadata
- DLQ moves (when max retries exceeded)
## License
MIT