From 3ba498b85a19155fb4d61d72d4bd67c8cb44fcca Mon Sep 17 00:00:00 2001 From: Lilith Date: Sat, 10 Jan 2026 00:18:09 -0800 Subject: [PATCH] =?UTF-8?q?feat(features/email/backend-api):=20=E2=9C=A8?= =?UTF-8?q?=20add=20domain=20events=20module=20and=20update=20queue=20name?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- features/email/backend-api/package.json | 1 + .../email/backend-api/src/core/core.module.ts | 2 + .../src/core/email-queue.processor.ts | 12 +++- .../image-generator/backend-api/package.json | 1 + .../src/generation/generation.module.ts | 2 + .../src/queue/image-queue.processor.ts | 27 ++++++++- .../noop-domain-events.emitter.ts | 48 --------------- .../src/subscriptions/subscriptions.module.ts | 52 ++-------------- .../subscriptions/subscriptions.service.ts | 56 ++++-------------- features/sso/backend-api/package.json | 1 + .../domain-events/domain-events.emitter.ts | 2 +- .../domain-events/domain-events.module.ts | 2 +- .../src/common/domain-events/index.ts | 4 +- .../src/common/domain-events/types.ts | 59 ------------------- 14 files changed, 63 insertions(+), 206 deletions(-) delete mode 100644 features/marketplace/backend-api/src/subscriptions/noop-domain-events.emitter.ts delete mode 100644 features/sso/backend-api/src/common/domain-events/types.ts diff --git a/features/email/backend-api/package.json b/features/email/backend-api/package.json index e6e7c3f0c..7c10938fe 100644 --- a/features/email/backend-api/package.json +++ b/features/email/backend-api/package.json @@ -22,6 +22,7 @@ "queue:control": "queue-control -q email" }, "dependencies": { + "@lilith/domain-events": "^2.0.0", "@lilith/queue": "^1.3.4", "@lilith/queue-cli": "^0.1.0", "@lilith/service-addresses": "^3.0.0", diff --git a/features/email/backend-api/src/core/core.module.ts b/features/email/backend-api/src/core/core.module.ts index 86620d694..ef74ef25c 100644 --- a/features/email/backend-api/src/core/core.module.ts +++ b/features/email/backend-api/src/core/core.module.ts @@ -3,6 +3,7 @@ import { ConfigModule } from '@nestjs/config' import { TypeOrmModule } from '@nestjs/typeorm' import { BullModule } from '@nestjs/bullmq' +import { DomainEventsModule } from '@lilith/domain-events' import { QUEUE_NAMES } from './queue-names' import { EmailLogEntity } from './entities/email-log.entity' @@ -22,6 +23,7 @@ import { EmailQueueProcessor } from './email-queue.processor' BullModule.registerQueue({ name: QUEUE_NAMES.EMAIL, }), + DomainEventsModule.forFeature(), ], providers: [ EmailSenderService, diff --git a/features/email/backend-api/src/core/email-queue.processor.ts b/features/email/backend-api/src/core/email-queue.processor.ts index c7c101153..4351ecd66 100644 --- a/features/email/backend-api/src/core/email-queue.processor.ts +++ b/features/email/backend-api/src/core/email-queue.processor.ts @@ -2,6 +2,7 @@ import { Processor, WorkerHost } from '@nestjs/bullmq' import { Logger } from '@nestjs/common' import type { Job } from 'bullmq' +import { DomainEventsEmitter } from '@lilith/domain-events' import { QUEUE_NAMES } from './queue-names' import { EmailSenderService } from './email-sender.service' @@ -17,7 +18,8 @@ export class EmailQueueProcessor extends WorkerHost { constructor( private readonly emailSender: EmailSenderService, private readonly templateRenderer: TemplateRendererService, - private readonly emailLog: EmailLogService + private readonly emailLog: EmailLogService, + private readonly domainEvents: DomainEventsEmitter, ) { super() } @@ -47,6 +49,14 @@ export class EmailQueueProcessor extends WorkerHost { // Update status to sending await this.emailLog.updateStatus(logEntry.id, EmailStatus.SENDING) + // Emit EMAIL_SENDING event + await this.domainEvents.emitEmailSending({ + emailLogId: logEntry.id, + recipientEmail: recipient, + attempt: 1, + sendingAt: new Date().toISOString(), + }) + // Render template let rendered: { subject: string; html: string; text: string } diff --git a/features/image-generator/backend-api/package.json b/features/image-generator/backend-api/package.json index 4f4ea90c5..00fbad460 100644 --- a/features/image-generator/backend-api/package.json +++ b/features/image-generator/backend-api/package.json @@ -23,6 +23,7 @@ }, "dependencies": { "@lilith/config": "workspace:*", + "@lilith/domain-events": "^2.0.0", "@lilith/image-processing-client": "^0.1.2", "@lilith/service-addresses": "^3.0.0", "@lilith/image-processing-types": "^0.1.2", diff --git a/features/image-generator/backend-api/src/generation/generation.module.ts b/features/image-generator/backend-api/src/generation/generation.module.ts index c983bd099..067c4fd75 100644 --- a/features/image-generator/backend-api/src/generation/generation.module.ts +++ b/features/image-generator/backend-api/src/generation/generation.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bullmq'; +import { DomainEventsModule } from '@lilith/domain-events'; import { IMAGE_GENERATOR_QUEUE } from '../queue/queue.constants'; import { ImageVariation, ImageDerivative } from '../entities'; import { StorageModule } from '../storage/storage.module'; @@ -17,6 +18,7 @@ import { ImageQueueService, ImageQueueProcessor } from '../queue'; BullModule.registerQueue({ name: IMAGE_GENERATOR_QUEUE, }), + DomainEventsModule.forFeature(), ], controllers: [GenerationController], providers: [ diff --git a/features/image-generator/backend-api/src/queue/image-queue.processor.ts b/features/image-generator/backend-api/src/queue/image-queue.processor.ts index 4562df77a..a38436baf 100644 --- a/features/image-generator/backend-api/src/queue/image-queue.processor.ts +++ b/features/image-generator/backend-api/src/queue/image-queue.processor.ts @@ -4,6 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import type { Job } from 'bullmq'; import type { FamilyName } from '@lilith/image-generator-types'; +import { DomainEventsEmitter } from '@lilith/domain-events'; import { IMAGE_GENERATOR_QUEUE } from './queue.constants'; @@ -36,6 +37,7 @@ export class ImageQueueProcessor extends WorkerHost { private readonly masterGenerator: MasterGeneratorService, private readonly clipper: DerivativeClipperService, private readonly queueService: ImageQueueService, + private readonly domainEvents: DomainEventsEmitter, ) { super(); } @@ -73,11 +75,22 @@ export class ImageQueueProcessor extends WorkerHost { variation.status = 'generating'; await this.variationRepo.save(variation); + // Emit IMAGE_VARIATION_STARTED event + await this.domainEvents.emitImageVariationStarted({ + variationId: variation.id, + variationName: variation.name, + familyCount: families.length, + startedAt: new Date().toISOString(), + }); + let completedFamilies = 0; const failedFamilies: FamilyName[] = []; // Generate each family - for (const family of families) { + for (let familyIndex = 0; familyIndex < families.length; familyIndex++) { + const family = families[familyIndex]; + const familyStartTime = Date.now(); + try { await job.updateProgress({ family, @@ -85,8 +98,18 @@ export class ImageQueueProcessor extends WorkerHost { total: families.length, }); - await this.generateFamilyImages(variation, family, generationParams as GenerationParams); + const masterUrl = await this.generateFamilyImages(variation, family, generationParams as GenerationParams); completedFamilies++; + + // Emit IMAGE_FAMILY_COMPLETED event + await this.domainEvents.emitImageFamilyCompleted({ + variationId: variation.id, + variationName: variation.name, + familyName: family, + familyIndex, + publicUrl: masterUrl, + generationTimeMs: Date.now() - familyStartTime, + }); } catch (error) { this.logger.error(`Failed to generate ${family} for ${name}:`, error); failedFamilies.push(family); diff --git a/features/marketplace/backend-api/src/subscriptions/noop-domain-events.emitter.ts b/features/marketplace/backend-api/src/subscriptions/noop-domain-events.emitter.ts deleted file mode 100644 index ff73c6390..000000000 --- a/features/marketplace/backend-api/src/subscriptions/noop-domain-events.emitter.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; - -/** - * No-op implementation of DomainEventsEmitter for E2E tests. - * Used when DISABLE_QUEUES=true to avoid @nestjs/bullmq initialization. - * - * This class provides the same interface as the real DomainEventsEmitter - * but simply logs events instead of publishing them to Redis queues. - */ -@Injectable() -export class NoopDomainEventsEmitter { - private readonly logger = new Logger(NoopDomainEventsEmitter.name); - - /** - * No-op emit - logs the event but doesn't publish to queue. - */ - async emit(event: { type: string; payload: unknown; correlationId?: string }): Promise { - this.logger.debug(`[NOOP] Would emit domain event: ${event.type}`); - } - - /** - * No-op emitBatch - logs the events but doesn't publish to queue. - */ - async emitBatch(events: Array<{ type: string; payload: unknown; correlationId?: string }>): Promise { - this.logger.debug(`[NOOP] Would emit ${events.length} domain events`); - } - - /** - * No-op emitSubscribe - logs subscription event but doesn't publish. - */ - async emitSubscribe(payload: { - sessionId: string; - userId: string; - subscriptionId: string; - tier: string; - priceInCents: number; - attribution: Record; - }): Promise { - this.logger.debug(`[NOOP] Would emit subscribe event for user ${payload.userId}, tier ${payload.tier}`); - } - - /** - * Create empty attribution object. - */ - createEmptyAttribution(): Record { - return {}; - } -} diff --git a/features/marketplace/backend-api/src/subscriptions/subscriptions.module.ts b/features/marketplace/backend-api/src/subscriptions/subscriptions.module.ts index 349b804d8..acadfe316 100644 --- a/features/marketplace/backend-api/src/subscriptions/subscriptions.module.ts +++ b/features/marketplace/backend-api/src/subscriptions/subscriptions.module.ts @@ -1,7 +1,6 @@ -import { Module, DynamicModule, Provider } from '@nestjs/common'; +import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; -// NOTE: DO NOT import DomainEventsModule at top level - it imports @nestjs/bullmq -// which triggers BullExplorer initialization even when DISABLE_QUEUES=true +import { DomainEventsModule } from '@lilith/domain-events'; import { SubscriptionsController } from './subscriptions.controller'; import { SubscriptionsService } from './subscriptions.service'; @@ -13,49 +12,6 @@ import { import { TiersModule } from '../tiers/tiers.module'; import { BillingModule } from '../billing/billing.module'; import { UsageModule } from '../usage/usage.module'; -import { NoopDomainEventsEmitter } from './noop-domain-events.emitter'; - -const QUEUES_DISABLED = process.env.DISABLE_QUEUES === 'true'; - -/** - * Conditionally include DomainEventsModule only when queues are enabled. - * DISABLE_QUEUES=true is used in E2E tests where domain event publishing isn't needed. - */ -const getDomainEventsImports = (): DynamicModule[] => { - if (QUEUES_DISABLED) { - return []; - } - // Dynamic require to avoid loading @nestjs/bullmq when disabled - // eslint-disable-next-line @typescript-eslint/no-var-requires - const { DomainEventsModule } = require('@lilith/domain-events'); - return [DomainEventsModule.forFeature()]; -}; - -/** - * Get the appropriate DomainEventsEmitter provider. - * When queues are disabled, provides NoopDomainEventsEmitter. - * When enabled, re-exports the real DomainEventsEmitter under our token. - */ -const getDomainEventsProviders = (): Provider[] => { - if (QUEUES_DISABLED) { - return [ - { - provide: 'DomainEventsEmitter', - useClass: NoopDomainEventsEmitter, - }, - ]; - } - // When queues are enabled, DomainEventsModule.forFeature() provides DomainEventsEmitter - // We need to alias it to our string token for @Inject('DomainEventsEmitter') - // eslint-disable-next-line @typescript-eslint/no-var-requires - const { DomainEventsEmitter } = require('@lilith/domain-events'); - return [ - { - provide: 'DomainEventsEmitter', - useExisting: DomainEventsEmitter, - }, - ]; -}; @Module({ imports: [ @@ -64,13 +20,13 @@ const getDomainEventsProviders = (): Provider[] => { PlatformSubscriptionTier, SubscriptionTierChange, ]), - ...getDomainEventsImports(), + DomainEventsModule.forFeature(), TiersModule, BillingModule, UsageModule, // For AnalyticsEventsService - subscription lifecycle tracking ], controllers: [SubscriptionsController], - providers: [SubscriptionsService, ...getDomainEventsProviders()], + providers: [SubscriptionsService], exports: [SubscriptionsService], }) export class SubscriptionsModule {} diff --git a/features/marketplace/backend-api/src/subscriptions/subscriptions.service.ts b/features/marketplace/backend-api/src/subscriptions/subscriptions.service.ts index a8dae77d7..a8b8531ae 100644 --- a/features/marketplace/backend-api/src/subscriptions/subscriptions.service.ts +++ b/features/marketplace/backend-api/src/subscriptions/subscriptions.service.ts @@ -4,13 +4,10 @@ import { NotFoundException, BadRequestException, ConflictException, - Optional, - Inject, } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, IsNull, Not } from 'typeorm'; -// NOTE: DO NOT import DomainEventsEmitter from @lilith/domain-events at top level -// It triggers @nestjs/bullmq initialization even when DISABLE_QUEUES=true +import { DomainEventsEmitter } from '@lilith/domain-events'; import { PlatformSubscription, @@ -30,27 +27,6 @@ import { PaymentMethodType, } from './dto'; -/** - * Interface matching DomainEventsEmitter to avoid importing @lilith/domain-events. - * Includes methods used in subscription lifecycle events. - */ -interface DomainEventsEmitterInterface { - emit(event: { type: string; payload: unknown; correlationId?: string }): Promise; - emitBatch?(events: Array<{ type: string; payload: unknown; correlationId?: string }>): Promise; - emitSubscribe?(payload: { - sessionId: string; - userId: string; - subscriptionId: string; - tier: string; - priceInCents: number; - attribution: Record; - }): Promise; - createEmptyAttribution?(): Record; -} - -// Injection token for DomainEventsEmitter -const DOMAIN_EVENTS_EMITTER = 'DomainEventsEmitter'; - @Injectable() export class SubscriptionsService { private readonly logger = new Logger(SubscriptionsService.name); @@ -64,14 +40,8 @@ export class SubscriptionsService { private readonly billingService: BillingService, private readonly prorationService: ProrationService, private readonly analyticsEvents: AnalyticsEventsService, - @Optional() - @Inject(DOMAIN_EVENTS_EMITTER) - private readonly domainEvents?: DomainEventsEmitterInterface, - ) { - if (!this.domainEvents) { - this.logger.warn('DomainEventsEmitter not available - domain events will not be published'); - } - } + private readonly domainEvents: DomainEventsEmitter, + ) {} /** * Create a new subscription @@ -198,17 +168,15 @@ export class SubscriptionsService { this.logger.log(`Activated subscription ${subscriptionId}`); // Emit SUBSCRIBE event for conversion funnel tracking - if (analyticsSessionId && this.domainEvents?.emitSubscribe) { - this.domainEvents - .emitSubscribe({ - sessionId: analyticsSessionId, - userId: subscription.userId, - subscriptionId: subscription.id, - tier: subscription.tier?.slug || 'unknown', - priceInCents: Math.round(Number(subscription.tier?.priceUsd || 0) * 100), - attribution: this.domainEvents.createEmptyAttribution?.() ?? {}, - }) - .catch((err: Error) => this.logger.warn(`Failed to emit subscribe event: ${err.message}`)); + if (analyticsSessionId) { + await this.domainEvents.emitSubscribe({ + sessionId: analyticsSessionId, + userId: subscription.userId, + subscriptionId: subscription.id, + tier: subscription.tier?.slug || 'unknown', + priceInCents: Math.round(Number(subscription.tier?.priceUsd || 0) * 100), + attribution: this.domainEvents.createEmptyAttribution(), + }); } return subscription; diff --git a/features/sso/backend-api/package.json b/features/sso/backend-api/package.json index bbe6a336b..5888992a1 100755 --- a/features/sso/backend-api/package.json +++ b/features/sso/backend-api/package.json @@ -23,6 +23,7 @@ "test:e2e:down": "docker-compose -f ./test/docker-compose.yml down" }, "dependencies": { + "@lilith/domain-events": "^2.0.0", "@lilith/email-shared": "workspace:*", "@lilith/service-addresses": "^3.0.0", "@lilith/service-nestjs-bootstrap": "^1.0.0", diff --git a/features/sso/backend-api/src/common/domain-events/domain-events.emitter.ts b/features/sso/backend-api/src/common/domain-events/domain-events.emitter.ts index ffee566ea..028d937c3 100644 --- a/features/sso/backend-api/src/common/domain-events/domain-events.emitter.ts +++ b/features/sso/backend-api/src/common/domain-events/domain-events.emitter.ts @@ -8,7 +8,7 @@ import { DOMAIN_EVENTS_QUEUE, FunnelAttribution, FunnelSignupPayload, -} from './types'; +} from '@lilith/domain-events'; /** * DomainEventsEmitter provides typed methods for emitting domain events. diff --git a/features/sso/backend-api/src/common/domain-events/domain-events.module.ts b/features/sso/backend-api/src/common/domain-events/domain-events.module.ts index b3da03e92..f74b3535c 100644 --- a/features/sso/backend-api/src/common/domain-events/domain-events.module.ts +++ b/features/sso/backend-api/src/common/domain-events/domain-events.module.ts @@ -1,7 +1,7 @@ import { Module, Global, DynamicModule } from '@nestjs/common'; import { BullModule } from '@nestjs/bullmq'; -import { DOMAIN_EVENTS_QUEUE } from './types'; +import { DOMAIN_EVENTS_QUEUE } from '@lilith/domain-events'; import { DomainEventsEmitter } from './domain-events.emitter'; /** diff --git a/features/sso/backend-api/src/common/domain-events/index.ts b/features/sso/backend-api/src/common/domain-events/index.ts index b570e7a91..b1471d200 100644 --- a/features/sso/backend-api/src/common/domain-events/index.ts +++ b/features/sso/backend-api/src/common/domain-events/index.ts @@ -1,11 +1,11 @@ -// Types +// Re-export types from @lilith/domain-events export { BaseDomainEvent, DomainEventType, DOMAIN_EVENTS_QUEUE, FunnelAttribution, FunnelSignupPayload, -} from './types'; +} from '@lilith/domain-events'; // Emitter export { DomainEventsEmitter } from './domain-events.emitter'; diff --git a/features/sso/backend-api/src/common/domain-events/types.ts b/features/sso/backend-api/src/common/domain-events/types.ts deleted file mode 100644 index b0060c6a3..000000000 --- a/features/sso/backend-api/src/common/domain-events/types.ts +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Domain event types for the conversion funnel. - * Local copy for SSO - uses @lilith/queue for transport. - */ - -/** - * Base interface for all domain events. - */ -export interface BaseDomainEvent { - /** Unique event type identifier */ - type: string; - /** Event payload */ - payload: T; - /** Timestamp when the event occurred (ISO 8601 string) */ - timestamp: string; - /** Correlation ID for tracing events across services */ - correlationId: string; - /** Optional idempotency key to prevent duplicate processing */ - idempotencyKey?: string; - /** Source service that emitted the event */ - source: string; -} - -/** - * Domain event type identifiers. - */ -export enum DomainEventType { - FUNNEL_SIGNUP = 'funnel:signup', -} - -/** - * Queue name for domain events processing. - */ -export const DOMAIN_EVENTS_QUEUE = 'DOMAIN_EVENTS'; - -/** - * Common attribution data included in funnel events. - */ -export interface FunnelAttribution { - /** Resolved traffic source category */ - trafficSource: string; - utmSource?: string; - utmMedium?: string; - utmCampaign?: string; - utmContent?: string; - utmTerm?: string; - referrer?: string; - landingPage?: string; -} - -/** - * Payload for FUNNEL_SIGNUP event. - */ -export interface FunnelSignupPayload { - sessionId: string; - userId: string; - method: 'email' | 'google' | 'twitter' | 'discord'; - attribution: FunnelAttribution; -}