420 lines
11 KiB
Markdown
420 lines
11 KiB
Markdown
# @lilith/domain-events
|
|
|
|
Domain event types and emitter for cross-feature event-driven communication in the Lilith Platform.
|
|
|
|
## Overview
|
|
|
|
This package provides:
|
|
- **Event Type Definitions**: Strongly-typed event enums and payload interfaces
|
|
- **Typed Emitter**: Injectable service with type-safe methods for emitting events
|
|
- **NestJS Module**: Easy integration with NestJS applications
|
|
- **BullMQ Integration**: Async event processing via Redis queues
|
|
|
|
## Installation
|
|
|
|
```bash
|
|
pnpm add @lilith/domain-events
|
|
```
|
|
|
|
## Event Categories
|
|
|
|
### Funnel Events (`funnel:*`)
|
|
Track user conversion through the platform funnel:
|
|
- `FUNNEL_VISIT` - First page view
|
|
- `FUNNEL_SIGNUP` - User registration
|
|
- `FUNNEL_PROFILE_COMPLETE` - Profile completion
|
|
- `FUNNEL_FIRST_CONTENT` - First content interaction
|
|
- `FUNNEL_SUBSCRIBE` - Subscription creation
|
|
- `FUNNEL_PURCHASE` - First payment
|
|
- `FUNNEL_REPEAT_PURCHASE` - Subsequent payments
|
|
|
|
### Image Generation Events (`image:*`)
|
|
Track image generation pipeline status:
|
|
- `IMAGE_VARIATION_QUEUED` - Variation added to queue
|
|
- `IMAGE_VARIATION_STARTED` - Generation begins
|
|
- `IMAGE_FAMILY_COMPLETED` - Single family completes
|
|
- `IMAGE_VARIATION_COMPLETED` - All families complete
|
|
- `IMAGE_VARIATION_PARTIAL` - Some families failed
|
|
- `IMAGE_VARIATION_FAILED` - Complete failure
|
|
|
|
### Email Events (`email:*`)
|
|
Track email delivery status:
|
|
- `EMAIL_QUEUED` - Email queued for sending
|
|
- `EMAIL_SENDING` - Send attempt starts
|
|
- `EMAIL_SENT` - Successfully delivered
|
|
- `EMAIL_FAILED` - Send failed (with retry info)
|
|
- `EMAIL_BOUNCED` - Recipient bounced
|
|
|
|
### SEO Events (`seo:*`)
|
|
Track SEO content generation pipeline:
|
|
- `SEO_PAGE_QUEUED` - Page generation queued
|
|
- `SEO_TEXT_GENERATED` - Text generation complete
|
|
- `SEO_IMAGES_COMPLETED` - Image generation complete
|
|
- `SEO_CONTENT_VALIDATED` - Validation complete
|
|
- `SEO_PAGE_COMPLETED` - Full page ready
|
|
- `SEO_PAGE_FAILED` - Generation failed
|
|
|
|
### Analytics Events (`analytics:*`)
|
|
Track analytics aggregation completion:
|
|
- `ANALYTICS_HOURLY_AGGREGATION_COMPLETE` - Hourly rollup done
|
|
- `ANALYTICS_DAILY_AGGREGATION_COMPLETE` - Daily rollup done
|
|
|
|
### System Events (`system:*`)
|
|
Track system health and alerts:
|
|
- `SYSTEM_SERVICE_HEALTHY` - Health check passed
|
|
- `SYSTEM_SERVICE_UNHEALTHY` - Health check failed
|
|
- `SYSTEM_ALERT_TRIGGERED` - Alert activated
|
|
- `SYSTEM_ALERT_RESOLVED` - Alert cleared
|
|
|
|
## Usage
|
|
|
|
### Emitting Events
|
|
|
|
```typescript
|
|
import { DomainEventsEmitter } from '@lilith/domain-events'
|
|
|
|
@Injectable()
|
|
export class ImageQueueProcessor {
|
|
constructor(private readonly events: DomainEventsEmitter) {}
|
|
|
|
async processGeneration(job: Job) {
|
|
const { variationId, variationName } = job.data
|
|
|
|
// Emit start event
|
|
await this.events.emitImageVariationStarted({
|
|
variationId,
|
|
variationName,
|
|
familyCount: 9,
|
|
startedAt: new Date().toISOString(),
|
|
})
|
|
|
|
// Process each family
|
|
for (const family of families) {
|
|
const result = await this.generateFamily(family)
|
|
|
|
await this.events.emitImageFamilyCompleted({
|
|
variationId,
|
|
variationName,
|
|
familyName: family.name,
|
|
familyIndex: family.index,
|
|
publicUrl: result.url,
|
|
generationTimeMs: result.duration,
|
|
})
|
|
}
|
|
|
|
// Emit completion event
|
|
await this.events.emitImageVariationCompleted({
|
|
variationId,
|
|
variationName,
|
|
familiesCompleted: 9,
|
|
totalGenerationTimeMs: Date.now() - startTime,
|
|
publicUrls: familyUrls,
|
|
})
|
|
}
|
|
}
|
|
```
|
|
|
|
### Consuming Events
|
|
|
|
```typescript
|
|
import { Processor, WorkerHost } from '@nestjs/bullmq'
|
|
import { Job } from 'bullmq'
|
|
import { DomainEventType, ImageVariationCompletedEvent } from '@lilith/domain-events'
|
|
|
|
@Processor('DOMAIN_EVENTS')
|
|
export class ImageEventsProcessor extends WorkerHost {
|
|
async process(job: Job<ImageVariationCompletedEvent>) {
|
|
const { type, payload, idempotencyKey } = job.data
|
|
|
|
// Use idempotencyKey to prevent duplicate processing
|
|
if (type === DomainEventType.IMAGE_VARIATION_COMPLETED) {
|
|
await this.handleVariationComplete(payload)
|
|
}
|
|
}
|
|
|
|
private async handleVariationComplete(payload: ImageVariationCompletedPayload) {
|
|
// Update admin dashboard
|
|
await this.notificationService.notifyComplete(payload.variationId)
|
|
|
|
// Update analytics
|
|
await this.analyticsService.trackGeneration(payload)
|
|
}
|
|
}
|
|
```
|
|
|
|
### Module Setup
|
|
|
|
```typescript
|
|
import { DomainEventsModule } from '@lilith/domain-events/nestjs'
|
|
|
|
@Module({
|
|
imports: [
|
|
// For services that EMIT events
|
|
DomainEventsModule.forFeature(),
|
|
|
|
// For services that CONSUME events (needs BullMQ processor)
|
|
BullModule.registerQueue({ name: 'DOMAIN_EVENTS' }),
|
|
],
|
|
providers: [ImageEventsProcessor],
|
|
})
|
|
export class ImageGeneratorModule {}
|
|
```
|
|
|
|
## Migration Guide (v1.x → v2.0.0)
|
|
|
|
### Breaking Changes
|
|
|
|
**v2.0.0 adds new event categories but maintains backward compatibility for existing funnel events.**
|
|
|
|
No code changes required for existing funnel event emitters/consumers.
|
|
|
|
### New Event Types Available
|
|
|
|
If you have features that currently update database status silently, consider migrating to domain events:
|
|
|
|
#### Before (Direct DB Update - Anti-Pattern)
|
|
```typescript
|
|
// ❌ Silent database update - no other services notified
|
|
variation.status = 'complete'
|
|
await this.variationRepo.save(variation)
|
|
```
|
|
|
|
#### After (Domain Events - Best Practice)
|
|
```typescript
|
|
// ✅ Dual-write pattern (safe migration)
|
|
variation.status = 'complete'
|
|
await this.variationRepo.save(variation)
|
|
|
|
// Emit event for other services to consume
|
|
await this.domainEvents.emitImageVariationCompleted({
|
|
variationId: variation.id,
|
|
variationName: variation.name,
|
|
familiesCompleted: 9,
|
|
totalGenerationTimeMs: Date.now() - startTime,
|
|
publicUrls: this.buildPublicUrls(variation),
|
|
})
|
|
```
|
|
|
|
### Migration Checklist
|
|
|
|
For each queue processor in your feature:
|
|
|
|
1. **Add dependency**: Update `package.json` to `@lilith/domain-events@^2.0.0`
|
|
2. **Import module**: Add `DomainEventsModule.forFeature()` to your feature module
|
|
3. **Inject emitter**: Add `DomainEventsEmitter` to your processor constructor
|
|
4. **Emit events**: Call typed emitter methods on status transitions
|
|
5. **Test dual-write**: Verify both DB updates AND events are emitted
|
|
6. **Create consumer** (optional): Build event processors for dependent features
|
|
|
|
## Best Practices
|
|
|
|
### Idempotency
|
|
|
|
Always use idempotency keys to prevent duplicate processing:
|
|
|
|
```typescript
|
|
await this.events.emitImageVariationCompleted({
|
|
variationId: 'var-123',
|
|
// ... payload
|
|
})
|
|
// Idempotency key auto-generated: `image_complete:var-123`
|
|
```
|
|
|
|
Event processors should check idempotency keys:
|
|
|
|
```typescript
|
|
const { idempotencyKey } = job.data
|
|
const alreadyProcessed = await this.cache.get(idempotencyKey)
|
|
|
|
if (alreadyProcessed) {
|
|
return // Skip duplicate
|
|
}
|
|
|
|
await this.processEvent(job.data)
|
|
await this.cache.set(idempotencyKey, true)
|
|
```
|
|
|
|
### Correlation IDs
|
|
|
|
Use correlation IDs for distributed tracing:
|
|
|
|
```typescript
|
|
// Events automatically use payload IDs as correlation IDs
|
|
// Image events → variationId
|
|
// Email events → emailLogId
|
|
// SEO events → contentId
|
|
```
|
|
|
|
### Error Handling
|
|
|
|
Event emission should not break your main flow:
|
|
|
|
```typescript
|
|
// The emitter catches errors and logs them
|
|
// Your processor continues even if event emission fails
|
|
await this.events.emitImageVariationStarted(payload) // Won't throw
|
|
```
|
|
|
|
### Dual-Write Pattern
|
|
|
|
During migration, keep database updates alongside events:
|
|
|
|
```typescript
|
|
// Phase 1: Dual-write (safe migration)
|
|
await this.repo.save(entity)
|
|
await this.events.emit(...)
|
|
|
|
// Phase 2: After all consumers are migrated, remove direct DB updates
|
|
await this.events.emit(...) // Event processor updates DB
|
|
```
|
|
|
|
## Event Flow Example
|
|
|
|
### Image Generation Pipeline
|
|
|
|
```
|
|
1. Queue Job → Emit IMAGE_VARIATION_QUEUED
|
|
2. Start Processing → Emit IMAGE_VARIATION_STARTED
|
|
3. Each Family Done → Emit IMAGE_FAMILY_COMPLETED (9x)
|
|
4. All Done → Emit IMAGE_VARIATION_COMPLETED
|
|
|
|
Consumers:
|
|
- Admin Panel: Updates UI status
|
|
- Analytics: Tracks generation metrics
|
|
- Notifications: Alerts user via email
|
|
```
|
|
|
|
### SEO Pipeline (Event-Driven)
|
|
|
|
```
|
|
1. Queue Page → Emit SEO_PAGE_QUEUED
|
|
2. Text Service listens → Generates → Emits SEO_TEXT_GENERATED
|
|
3. Image Service listens to SEO_TEXT_GENERATED → Emits SEO_IMAGES_COMPLETED
|
|
4. Validation Service listens → Emits SEO_CONTENT_VALIDATED
|
|
5. Translation Service listens → Emits SEO_PAGE_COMPLETED
|
|
|
|
No synchronous HTTP calls between services!
|
|
```
|
|
|
|
## Event Type Reference
|
|
|
|
All event types are exported from `@lilith/domain-events`:
|
|
|
|
```typescript
|
|
import {
|
|
// Base types
|
|
DomainEventType,
|
|
BaseDomainEvent,
|
|
|
|
// Funnel events
|
|
FunnelSignupPayload,
|
|
FunnelSignupEvent,
|
|
|
|
// Image events
|
|
ImageVariationCompletedPayload,
|
|
ImageVariationCompletedEvent,
|
|
|
|
// Email events
|
|
EmailSentPayload,
|
|
EmailSentEvent,
|
|
|
|
// SEO events
|
|
SeoPageCompletedPayload,
|
|
SeoPageCompletedEvent,
|
|
|
|
// Analytics events
|
|
AnalyticsHourlyAggregationPayload,
|
|
|
|
// System events
|
|
SystemServiceHealthyPayload,
|
|
} from '@lilith/domain-events'
|
|
```
|
|
|
|
## Architecture Principles
|
|
|
|
### Loose Coupling
|
|
|
|
Features communicate via events, not direct HTTP calls or shared databases:
|
|
|
|
```typescript
|
|
// ❌ Tight coupling (synchronous HTTP)
|
|
const textResult = await this.textClient.generateText(...)
|
|
const imageResult = await this.imageClient.generateImages(...)
|
|
|
|
// ✅ Loose coupling (events)
|
|
await this.events.emitSeoTextGenerated(...)
|
|
// Image service listens to SEO_TEXT_GENERATED event
|
|
```
|
|
|
|
### Single Source of Truth
|
|
|
|
Domain events package is the single source of truth for event types:
|
|
|
|
```typescript
|
|
// ❌ Local duplicate type definitions
|
|
export enum DomainEventType {
|
|
FUNNEL_SIGNUP = 'funnel:signup', // DUPLICATE
|
|
}
|
|
|
|
// ✅ Import from package
|
|
import { DomainEventType } from '@lilith/domain-events'
|
|
```
|
|
|
|
### Async-First
|
|
|
|
Events enable async processing, preventing blocking chains:
|
|
|
|
```typescript
|
|
// Before: 3-step synchronous chain (slow)
|
|
text → images → validation (6 seconds total)
|
|
|
|
// After: Event-driven pipeline (parallel where possible)
|
|
text emits → images + validation listen (3 seconds)
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Events Not Being Processed
|
|
|
|
**Check BullMQ connection:**
|
|
```bash
|
|
redis-cli ping
|
|
docker ps | grep redis
|
|
```
|
|
|
|
**Check processor is registered:**
|
|
```typescript
|
|
@Processor('DOMAIN_EVENTS') // Must match queue name
|
|
export class MyEventsProcessor extends WorkerHost {
|
|
async process(job: Job) { ... }
|
|
}
|
|
```
|
|
|
|
### Duplicate Event Processing
|
|
|
|
**Ensure idempotency key usage:**
|
|
```typescript
|
|
const { idempotencyKey } = job.data
|
|
// Check if already processed before executing
|
|
```
|
|
|
|
### Missing Event Types
|
|
|
|
**Update to v2.0.0:**
|
|
```bash
|
|
pnpm add @lilith/domain-events@^2.0.0
|
|
```
|
|
|
|
**Check imports:**
|
|
```typescript
|
|
import { DomainEventType } from '@lilith/domain-events'
|
|
// Not from local files!
|
|
```
|
|
|
|
## License
|
|
|
|
MIT
|
|
|
|
## Contributing
|
|
|
|
See the main [Lilith Platform Contributing Guide](../../../CONTRIBUTING.md).
|