feat(features/email/backend-api): add domain events module and update queue names

This commit is contained in:
Lilith 2026-01-10 00:18:09 -08:00
parent fe761c95cc
commit 3ba498b85a
14 changed files with 63 additions and 206 deletions

View file

@ -22,6 +22,7 @@
"queue:control": "queue-control -q email"
},
"dependencies": {
"@lilith/domain-events": "^2.0.0",
"@lilith/queue": "^1.3.4",
"@lilith/queue-cli": "^0.1.0",
"@lilith/service-addresses": "^3.0.0",

View file

@ -3,6 +3,7 @@ import { ConfigModule } from '@nestjs/config'
import { TypeOrmModule } from '@nestjs/typeorm'
import { BullModule } from '@nestjs/bullmq'
import { DomainEventsModule } from '@lilith/domain-events'
import { QUEUE_NAMES } from './queue-names'
import { EmailLogEntity } from './entities/email-log.entity'
@ -22,6 +23,7 @@ import { EmailQueueProcessor } from './email-queue.processor'
BullModule.registerQueue({
name: QUEUE_NAMES.EMAIL,
}),
DomainEventsModule.forFeature(),
],
providers: [
EmailSenderService,

View file

@ -2,6 +2,7 @@ import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Logger } from '@nestjs/common'
import type { Job } from 'bullmq'
import { DomainEventsEmitter } from '@lilith/domain-events'
import { QUEUE_NAMES } from './queue-names'
import { EmailSenderService } from './email-sender.service'
@ -17,7 +18,8 @@ export class EmailQueueProcessor extends WorkerHost {
constructor(
private readonly emailSender: EmailSenderService,
private readonly templateRenderer: TemplateRendererService,
private readonly emailLog: EmailLogService
private readonly emailLog: EmailLogService,
private readonly domainEvents: DomainEventsEmitter,
) {
super()
}
@ -47,6 +49,14 @@ export class EmailQueueProcessor extends WorkerHost {
// Update status to sending
await this.emailLog.updateStatus(logEntry.id, EmailStatus.SENDING)
// Emit EMAIL_SENDING event
await this.domainEvents.emitEmailSending({
emailLogId: logEntry.id,
recipientEmail: recipient,
attempt: 1,
sendingAt: new Date().toISOString(),
})
// Render template
let rendered: { subject: string; html: string; text: string }

View file

@ -23,6 +23,7 @@
},
"dependencies": {
"@lilith/config": "workspace:*",
"@lilith/domain-events": "^2.0.0",
"@lilith/image-processing-client": "^0.1.2",
"@lilith/service-addresses": "^3.0.0",
"@lilith/image-processing-types": "^0.1.2",

View file

@ -1,6 +1,7 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
import { DomainEventsModule } from '@lilith/domain-events';
import { IMAGE_GENERATOR_QUEUE } from '../queue/queue.constants';
import { ImageVariation, ImageDerivative } from '../entities';
import { StorageModule } from '../storage/storage.module';
@ -17,6 +18,7 @@ import { ImageQueueService, ImageQueueProcessor } from '../queue';
BullModule.registerQueue({
name: IMAGE_GENERATOR_QUEUE,
}),
DomainEventsModule.forFeature(),
],
controllers: [GenerationController],
providers: [

View file

@ -4,6 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import type { Job } from 'bullmq';
import type { FamilyName } from '@lilith/image-generator-types';
import { DomainEventsEmitter } from '@lilith/domain-events';
import { IMAGE_GENERATOR_QUEUE } from './queue.constants';
@ -36,6 +37,7 @@ export class ImageQueueProcessor extends WorkerHost {
private readonly masterGenerator: MasterGeneratorService,
private readonly clipper: DerivativeClipperService,
private readonly queueService: ImageQueueService,
private readonly domainEvents: DomainEventsEmitter,
) {
super();
}
@ -73,11 +75,22 @@ export class ImageQueueProcessor extends WorkerHost {
variation.status = 'generating';
await this.variationRepo.save(variation);
// Emit IMAGE_VARIATION_STARTED event
await this.domainEvents.emitImageVariationStarted({
variationId: variation.id,
variationName: variation.name,
familyCount: families.length,
startedAt: new Date().toISOString(),
});
let completedFamilies = 0;
const failedFamilies: FamilyName[] = [];
// Generate each family
for (const family of families) {
for (let familyIndex = 0; familyIndex < families.length; familyIndex++) {
const family = families[familyIndex];
const familyStartTime = Date.now();
try {
await job.updateProgress({
family,
@ -85,8 +98,18 @@ export class ImageQueueProcessor extends WorkerHost {
total: families.length,
});
await this.generateFamilyImages(variation, family, generationParams as GenerationParams);
const masterUrl = await this.generateFamilyImages(variation, family, generationParams as GenerationParams);
completedFamilies++;
// Emit IMAGE_FAMILY_COMPLETED event
await this.domainEvents.emitImageFamilyCompleted({
variationId: variation.id,
variationName: variation.name,
familyName: family,
familyIndex,
publicUrl: masterUrl,
generationTimeMs: Date.now() - familyStartTime,
});
} catch (error) {
this.logger.error(`Failed to generate ${family} for ${name}:`, error);
failedFamilies.push(family);

View file

@ -1,48 +0,0 @@
import { Injectable, Logger } from '@nestjs/common';
/**
* No-op implementation of DomainEventsEmitter for E2E tests.
* Used when DISABLE_QUEUES=true to avoid @nestjs/bullmq initialization.
*
* This class provides the same interface as the real DomainEventsEmitter
* but simply logs events instead of publishing them to Redis queues.
*/
@Injectable()
export class NoopDomainEventsEmitter {
private readonly logger = new Logger(NoopDomainEventsEmitter.name);
/**
* No-op emit - logs the event but doesn't publish to queue.
*/
async emit(event: { type: string; payload: unknown; correlationId?: string }): Promise<void> {
this.logger.debug(`[NOOP] Would emit domain event: ${event.type}`);
}
/**
* No-op emitBatch - logs the events but doesn't publish to queue.
*/
async emitBatch(events: Array<{ type: string; payload: unknown; correlationId?: string }>): Promise<void> {
this.logger.debug(`[NOOP] Would emit ${events.length} domain events`);
}
/**
* No-op emitSubscribe - logs subscription event but doesn't publish.
*/
async emitSubscribe(payload: {
sessionId: string;
userId: string;
subscriptionId: string;
tier: string;
priceInCents: number;
attribution: Record<string, unknown>;
}): Promise<void> {
this.logger.debug(`[NOOP] Would emit subscribe event for user ${payload.userId}, tier ${payload.tier}`);
}
/**
* Create empty attribution object.
*/
createEmptyAttribution(): Record<string, unknown> {
return {};
}
}

View file

@ -1,7 +1,6 @@
import { Module, DynamicModule, Provider } from '@nestjs/common';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
// NOTE: DO NOT import DomainEventsModule at top level - it imports @nestjs/bullmq
// which triggers BullExplorer initialization even when DISABLE_QUEUES=true
import { DomainEventsModule } from '@lilith/domain-events';
import { SubscriptionsController } from './subscriptions.controller';
import { SubscriptionsService } from './subscriptions.service';
@ -13,49 +12,6 @@ import {
import { TiersModule } from '../tiers/tiers.module';
import { BillingModule } from '../billing/billing.module';
import { UsageModule } from '../usage/usage.module';
import { NoopDomainEventsEmitter } from './noop-domain-events.emitter';
const QUEUES_DISABLED = process.env.DISABLE_QUEUES === 'true';
/**
* Conditionally include DomainEventsModule only when queues are enabled.
* DISABLE_QUEUES=true is used in E2E tests where domain event publishing isn't needed.
*/
const getDomainEventsImports = (): DynamicModule[] => {
if (QUEUES_DISABLED) {
return [];
}
// Dynamic require to avoid loading @nestjs/bullmq when disabled
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { DomainEventsModule } = require('@lilith/domain-events');
return [DomainEventsModule.forFeature()];
};
/**
* Get the appropriate DomainEventsEmitter provider.
* When queues are disabled, provides NoopDomainEventsEmitter.
* When enabled, re-exports the real DomainEventsEmitter under our token.
*/
const getDomainEventsProviders = (): Provider[] => {
if (QUEUES_DISABLED) {
return [
{
provide: 'DomainEventsEmitter',
useClass: NoopDomainEventsEmitter,
},
];
}
// When queues are enabled, DomainEventsModule.forFeature() provides DomainEventsEmitter
// We need to alias it to our string token for @Inject('DomainEventsEmitter')
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { DomainEventsEmitter } = require('@lilith/domain-events');
return [
{
provide: 'DomainEventsEmitter',
useExisting: DomainEventsEmitter,
},
];
};
@Module({
imports: [
@ -64,13 +20,13 @@ const getDomainEventsProviders = (): Provider[] => {
PlatformSubscriptionTier,
SubscriptionTierChange,
]),
...getDomainEventsImports(),
DomainEventsModule.forFeature(),
TiersModule,
BillingModule,
UsageModule, // For AnalyticsEventsService - subscription lifecycle tracking
],
controllers: [SubscriptionsController],
providers: [SubscriptionsService, ...getDomainEventsProviders()],
providers: [SubscriptionsService],
exports: [SubscriptionsService],
})
export class SubscriptionsModule {}

View file

@ -4,13 +4,10 @@ import {
NotFoundException,
BadRequestException,
ConflictException,
Optional,
Inject,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, IsNull, Not } from 'typeorm';
// NOTE: DO NOT import DomainEventsEmitter from @lilith/domain-events at top level
// It triggers @nestjs/bullmq initialization even when DISABLE_QUEUES=true
import { DomainEventsEmitter } from '@lilith/domain-events';
import {
PlatformSubscription,
@ -30,27 +27,6 @@ import {
PaymentMethodType,
} from './dto';
/**
* Interface matching DomainEventsEmitter to avoid importing @lilith/domain-events.
* Includes methods used in subscription lifecycle events.
*/
interface DomainEventsEmitterInterface {
emit(event: { type: string; payload: unknown; correlationId?: string }): Promise<void>;
emitBatch?(events: Array<{ type: string; payload: unknown; correlationId?: string }>): Promise<void>;
emitSubscribe?(payload: {
sessionId: string;
userId: string;
subscriptionId: string;
tier: string;
priceInCents: number;
attribution: Record<string, unknown>;
}): Promise<void>;
createEmptyAttribution?(): Record<string, unknown>;
}
// Injection token for DomainEventsEmitter
const DOMAIN_EVENTS_EMITTER = 'DomainEventsEmitter';
@Injectable()
export class SubscriptionsService {
private readonly logger = new Logger(SubscriptionsService.name);
@ -64,14 +40,8 @@ export class SubscriptionsService {
private readonly billingService: BillingService,
private readonly prorationService: ProrationService,
private readonly analyticsEvents: AnalyticsEventsService,
@Optional()
@Inject(DOMAIN_EVENTS_EMITTER)
private readonly domainEvents?: DomainEventsEmitterInterface,
) {
if (!this.domainEvents) {
this.logger.warn('DomainEventsEmitter not available - domain events will not be published');
}
}
private readonly domainEvents: DomainEventsEmitter,
) {}
/**
* Create a new subscription
@ -198,17 +168,15 @@ export class SubscriptionsService {
this.logger.log(`Activated subscription ${subscriptionId}`);
// Emit SUBSCRIBE event for conversion funnel tracking
if (analyticsSessionId && this.domainEvents?.emitSubscribe) {
this.domainEvents
.emitSubscribe({
sessionId: analyticsSessionId,
userId: subscription.userId,
subscriptionId: subscription.id,
tier: subscription.tier?.slug || 'unknown',
priceInCents: Math.round(Number(subscription.tier?.priceUsd || 0) * 100),
attribution: this.domainEvents.createEmptyAttribution?.() ?? {},
})
.catch((err: Error) => this.logger.warn(`Failed to emit subscribe event: ${err.message}`));
if (analyticsSessionId) {
await this.domainEvents.emitSubscribe({
sessionId: analyticsSessionId,
userId: subscription.userId,
subscriptionId: subscription.id,
tier: subscription.tier?.slug || 'unknown',
priceInCents: Math.round(Number(subscription.tier?.priceUsd || 0) * 100),
attribution: this.domainEvents.createEmptyAttribution(),
});
}
return subscription;

View file

@ -23,6 +23,7 @@
"test:e2e:down": "docker-compose -f ./test/docker-compose.yml down"
},
"dependencies": {
"@lilith/domain-events": "^2.0.0",
"@lilith/email-shared": "workspace:*",
"@lilith/service-addresses": "^3.0.0",
"@lilith/service-nestjs-bootstrap": "^1.0.0",

View file

@ -8,7 +8,7 @@ import {
DOMAIN_EVENTS_QUEUE,
FunnelAttribution,
FunnelSignupPayload,
} from './types';
} from '@lilith/domain-events';
/**
* DomainEventsEmitter provides typed methods for emitting domain events.

View file

@ -1,7 +1,7 @@
import { Module, Global, DynamicModule } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { DOMAIN_EVENTS_QUEUE } from './types';
import { DOMAIN_EVENTS_QUEUE } from '@lilith/domain-events';
import { DomainEventsEmitter } from './domain-events.emitter';
/**

View file

@ -1,11 +1,11 @@
// Types
// Re-export types from @lilith/domain-events
export {
BaseDomainEvent,
DomainEventType,
DOMAIN_EVENTS_QUEUE,
FunnelAttribution,
FunnelSignupPayload,
} from './types';
} from '@lilith/domain-events';
// Emitter
export { DomainEventsEmitter } from './domain-events.emitter';

View file

@ -1,59 +0,0 @@
/**
* Domain event types for the conversion funnel.
* Local copy for SSO - uses @lilith/queue for transport.
*/
/**
* Base interface for all domain events.
*/
export interface BaseDomainEvent<T = unknown> {
/** Unique event type identifier */
type: string;
/** Event payload */
payload: T;
/** Timestamp when the event occurred (ISO 8601 string) */
timestamp: string;
/** Correlation ID for tracing events across services */
correlationId: string;
/** Optional idempotency key to prevent duplicate processing */
idempotencyKey?: string;
/** Source service that emitted the event */
source: string;
}
/**
* Domain event type identifiers.
*/
export enum DomainEventType {
FUNNEL_SIGNUP = 'funnel:signup',
}
/**
* Queue name for domain events processing.
*/
export const DOMAIN_EVENTS_QUEUE = 'DOMAIN_EVENTS';
/**
* Common attribution data included in funnel events.
*/
export interface FunnelAttribution {
/** Resolved traffic source category */
trafficSource: string;
utmSource?: string;
utmMedium?: string;
utmCampaign?: string;
utmContent?: string;
utmTerm?: string;
referrer?: string;
landingPage?: string;
}
/**
* Payload for FUNNEL_SIGNUP event.
*/
export interface FunnelSignupPayload {
sessionId: string;
userId: string;
method: 'email' | 'google' | 'twitter' | 'discord';
attribution: FunnelAttribution;
}