chore(root): 🔧 add new configuration files
This commit is contained in:
parent
0b8b2a3a18
commit
096fa3ea0e
2 changed files with 805 additions and 0 deletions
|
|
@ -0,0 +1,470 @@
|
|||
/**
|
||||
* Unit tests for ProductsService domain event emissions.
|
||||
*
|
||||
* Tests verify:
|
||||
* - Correct event type and payload structure
|
||||
* - Dual-write pattern (DB write before event emission)
|
||||
* - Event timing and conditional emissions
|
||||
* - Non-blocking event emission errors
|
||||
* - Idempotency key generation
|
||||
*/
|
||||
|
||||
import { Test, TestingModule } from '@nestjs/testing'
|
||||
import { getRepositoryToken } from '@nestjs/typeorm'
|
||||
import { DataSource, Repository } from 'typeorm'
|
||||
import { DomainEventsEmitter } from '@lilith/domain-events'
|
||||
|
||||
import { ProductsService } from './products.service'
|
||||
import {
|
||||
ProductEntity,
|
||||
ProductType,
|
||||
ProductStatus,
|
||||
InventoryType,
|
||||
} from './entities/product.entity'
|
||||
import { ProductVariantEntity } from './entities/product-variant.entity'
|
||||
import {
|
||||
createMockDomainEventsEmitter,
|
||||
createSpyDomainEventsEmitter,
|
||||
} from '@/__mocks__/domain-events-emitter.mock'
|
||||
|
||||
describe('ProductsService - Domain Events', () => {
|
||||
let service: ProductsService
|
||||
let eventsEmitter: jest.Mocked<DomainEventsEmitter>
|
||||
let productRepo: jest.Mocked<Repository<ProductEntity>>
|
||||
let variantRepo: jest.Mocked<Repository<ProductVariantEntity>>
|
||||
let dataSource: jest.Mocked<DataSource>
|
||||
|
||||
// Mock query runner for transaction tests
|
||||
const mockQueryRunner = {
|
||||
connect: jest.fn(),
|
||||
startTransaction: jest.fn(),
|
||||
commitTransaction: jest.fn(),
|
||||
rollbackTransaction: jest.fn(),
|
||||
release: jest.fn(),
|
||||
manager: {
|
||||
findOne: jest.fn(),
|
||||
save: jest.fn(),
|
||||
},
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
// Create mock repositories
|
||||
const mockProductRepo = {
|
||||
find: jest.fn(),
|
||||
findOne: jest.fn(),
|
||||
create: jest.fn(),
|
||||
save: jest.fn(),
|
||||
createQueryBuilder: jest.fn(),
|
||||
remove: jest.fn(),
|
||||
}
|
||||
|
||||
const mockVariantRepo = {
|
||||
find: jest.fn(),
|
||||
findOne: jest.fn(),
|
||||
create: jest.fn(),
|
||||
save: jest.fn(),
|
||||
remove: jest.fn(),
|
||||
}
|
||||
|
||||
const mockDataSource = {
|
||||
createQueryRunner: jest.fn(() => mockQueryRunner),
|
||||
}
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
ProductsService,
|
||||
{
|
||||
provide: getRepositoryToken(ProductEntity),
|
||||
useValue: mockProductRepo,
|
||||
},
|
||||
{
|
||||
provide: getRepositoryToken(ProductVariantEntity),
|
||||
useValue: mockVariantRepo,
|
||||
},
|
||||
{
|
||||
provide: DataSource,
|
||||
useValue: mockDataSource,
|
||||
},
|
||||
{
|
||||
provide: DomainEventsEmitter,
|
||||
useValue: createMockDomainEventsEmitter(),
|
||||
},
|
||||
],
|
||||
}).compile()
|
||||
|
||||
service = module.get<ProductsService>(ProductsService)
|
||||
eventsEmitter = module.get(DomainEventsEmitter) as jest.Mocked<DomainEventsEmitter>
|
||||
productRepo = module.get(getRepositoryToken(ProductEntity)) as jest.Mocked<
|
||||
Repository<ProductEntity>
|
||||
>
|
||||
variantRepo = module.get(getRepositoryToken(ProductVariantEntity)) as jest.Mocked<
|
||||
Repository<ProductVariantEntity>
|
||||
>
|
||||
dataSource = module.get(DataSource) as jest.Mocked<DataSource>
|
||||
|
||||
// Reset mocks
|
||||
jest.clearAllMocks()
|
||||
|
||||
// Restore createQueryRunner mock after clearAllMocks
|
||||
mockDataSource.createQueryRunner.mockReturnValue(mockQueryRunner as any)
|
||||
})
|
||||
|
||||
// =========================================================================
|
||||
// Product Events
|
||||
// =========================================================================
|
||||
|
||||
describe('PRODUCT_CREATED Event', () => {
|
||||
it('should emit event with correct payload after creating product', async () => {
|
||||
const productData = {
|
||||
sku: 'TEST-001',
|
||||
name: 'Test Product',
|
||||
productType: ProductType.PHYSICAL_MERCHANDISE,
|
||||
basePriceUsd: '29.99',
|
||||
category: 'apparel',
|
||||
}
|
||||
|
||||
const createdProduct: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
...productData,
|
||||
status: ProductStatus.DRAFT,
|
||||
featured: false,
|
||||
createdAt: new Date('2026-01-11T10:00:00Z'),
|
||||
}
|
||||
|
||||
productRepo.create.mockReturnValue(createdProduct as ProductEntity)
|
||||
productRepo.save.mockResolvedValue(createdProduct as ProductEntity)
|
||||
|
||||
await service.createProduct(productData)
|
||||
|
||||
expect(eventsEmitter.emitProductCreated).toHaveBeenCalledTimes(1)
|
||||
expect(eventsEmitter.emitProductCreated).toHaveBeenCalledWith({
|
||||
productId: 'product-001',
|
||||
sku: 'TEST-001',
|
||||
name: 'Test Product',
|
||||
productType: ProductType.PHYSICAL_MERCHANDISE,
|
||||
status: ProductStatus.DRAFT,
|
||||
basePriceUsd: '29.99',
|
||||
category: 'apparel',
|
||||
featured: false,
|
||||
createdAt: '2026-01-11T10:00:00.000Z',
|
||||
})
|
||||
})
|
||||
|
||||
it('should include null category when not provided', async () => {
|
||||
const productData = {
|
||||
sku: 'TEST-002',
|
||||
name: 'Uncategorized Product',
|
||||
productType: ProductType.DIGITAL_PRODUCT,
|
||||
basePriceUsd: '9.99',
|
||||
}
|
||||
|
||||
const createdProduct: Partial<ProductEntity> = {
|
||||
id: 'product-002',
|
||||
...productData,
|
||||
category: null,
|
||||
status: ProductStatus.DRAFT,
|
||||
featured: false,
|
||||
createdAt: new Date(),
|
||||
}
|
||||
|
||||
productRepo.create.mockReturnValue(createdProduct as ProductEntity)
|
||||
productRepo.save.mockResolvedValue(createdProduct as ProductEntity)
|
||||
|
||||
await service.createProduct(productData)
|
||||
|
||||
expect(eventsEmitter.emitProductCreated).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
category: null,
|
||||
}),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe('PRODUCT_PUBLISHED Event', () => {
|
||||
it('should emit event when product is published', async () => {
|
||||
const draftProduct: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
sku: 'DRAFT-001',
|
||||
name: 'Draft Product',
|
||||
productType: ProductType.PHYSICAL_MERCHANDISE,
|
||||
basePriceUsd: '49.99',
|
||||
category: 'test',
|
||||
status: ProductStatus.DRAFT,
|
||||
}
|
||||
|
||||
const publishedProduct: Partial<ProductEntity> = {
|
||||
...draftProduct,
|
||||
status: ProductStatus.AVAILABLE,
|
||||
publishedAt: new Date('2026-01-11T10:00:00Z'),
|
||||
}
|
||||
|
||||
productRepo.findOne.mockResolvedValue(draftProduct as ProductEntity)
|
||||
productRepo.save.mockResolvedValue(publishedProduct as ProductEntity)
|
||||
|
||||
await service.publishProduct('product-001')
|
||||
|
||||
expect(eventsEmitter.emitProductPublished).toHaveBeenCalledWith({
|
||||
productId: 'product-001',
|
||||
sku: 'DRAFT-001',
|
||||
name: 'Draft Product',
|
||||
productType: ProductType.PHYSICAL_MERCHANDISE,
|
||||
basePriceUsd: '49.99',
|
||||
category: 'test',
|
||||
publishedAt: '2026-01-11T10:00:00.000Z',
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('PRODUCT_ARCHIVED Event', () => {
|
||||
it('should emit event when product is archived', async () => {
|
||||
const activeProduct: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
sku: 'ACTIVE-001',
|
||||
name: 'Active Product',
|
||||
status: ProductStatus.AVAILABLE,
|
||||
}
|
||||
|
||||
const archivedProduct: Partial<ProductEntity> = {
|
||||
...activeProduct,
|
||||
status: ProductStatus.ARCHIVED,
|
||||
archivedAt: new Date('2026-01-11T10:00:00Z'),
|
||||
}
|
||||
|
||||
productRepo.findOne.mockResolvedValue(activeProduct as ProductEntity)
|
||||
productRepo.save.mockResolvedValue(archivedProduct as ProductEntity)
|
||||
|
||||
await service.archiveProduct('product-001')
|
||||
|
||||
expect(eventsEmitter.emitProductArchived).toHaveBeenCalledWith({
|
||||
productId: 'product-001',
|
||||
sku: 'ACTIVE-001',
|
||||
name: 'Active Product',
|
||||
archivedReason: undefined,
|
||||
archivedAt: '2026-01-11T10:00:00.000Z',
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// =========================================================================
|
||||
// Dual-Write Pattern Tests
|
||||
// =========================================================================
|
||||
|
||||
describe('Dual-Write Pattern', () => {
|
||||
it('should save to DB before emitting PRODUCT_CREATED event', async () => {
|
||||
const callOrder: string[] = []
|
||||
|
||||
productRepo.create.mockReturnValue({
|
||||
id: 'product-001',
|
||||
name: 'Test',
|
||||
} as ProductEntity)
|
||||
|
||||
productRepo.save.mockImplementation(async (entity) => {
|
||||
callOrder.push('db-save')
|
||||
return entity
|
||||
})
|
||||
|
||||
eventsEmitter.emitProductCreated.mockImplementation(async () => {
|
||||
callOrder.push('emit-event')
|
||||
})
|
||||
|
||||
await service.createProduct({
|
||||
sku: 'TEST-001',
|
||||
name: 'Test Product',
|
||||
productType: ProductType.DIGITAL_PRODUCT,
|
||||
basePriceUsd: '9.99',
|
||||
})
|
||||
|
||||
// DB save MUST happen before event emission
|
||||
expect(callOrder).toEqual(['db-save', 'emit-event'])
|
||||
})
|
||||
|
||||
it('should not emit event if DB save fails', async () => {
|
||||
const productData = {
|
||||
sku: 'FAIL-001',
|
||||
name: 'Failing Product',
|
||||
productType: ProductType.DIGITAL_PRODUCT,
|
||||
basePriceUsd: '9.99',
|
||||
}
|
||||
|
||||
productRepo.create.mockReturnValue({} as ProductEntity)
|
||||
productRepo.save.mockRejectedValue(new Error('DB connection lost'))
|
||||
|
||||
await expect(service.createProduct(productData)).rejects.toThrow('DB connection lost')
|
||||
|
||||
// Event should NOT be emitted
|
||||
expect(eventsEmitter.emitProductCreated).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
// =========================================================================
|
||||
// Inventory Events
|
||||
// =========================================================================
|
||||
|
||||
describe('INVENTORY_OUT_OF_STOCK Event', () => {
|
||||
it('should emit event when inventory reaches zero during confirmSale', async () => {
|
||||
const product: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
sku: 'LAST-ITEM-001',
|
||||
name: 'Last Item Product',
|
||||
inventoryType: InventoryType.LIMITED,
|
||||
inventoryQuantity: 5,
|
||||
inventoryReserved: 5,
|
||||
status: ProductStatus.AVAILABLE,
|
||||
}
|
||||
|
||||
const soldOutProduct: Partial<ProductEntity> = {
|
||||
...product,
|
||||
inventoryQuantity: 0,
|
||||
inventoryReserved: 0,
|
||||
status: ProductStatus.OUT_OF_STOCK,
|
||||
}
|
||||
|
||||
mockQueryRunner.manager.findOne.mockResolvedValue(product as ProductEntity)
|
||||
mockQueryRunner.manager.save.mockResolvedValue(soldOutProduct as ProductEntity)
|
||||
|
||||
await service.confirmSale('product-001', 5)
|
||||
|
||||
expect(eventsEmitter.emitInventoryOutOfStock).toHaveBeenCalledWith({
|
||||
productId: 'product-001',
|
||||
sku: 'LAST-ITEM-001',
|
||||
name: 'Last Item Product',
|
||||
variantId: undefined,
|
||||
previousQuantity: 5,
|
||||
currentQuantity: 0,
|
||||
outOfStockAt: expect.any(String),
|
||||
})
|
||||
})
|
||||
|
||||
it('should not emit OUT_OF_STOCK for unlimited inventory', async () => {
|
||||
const product: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
inventoryType: InventoryType.UNLIMITED,
|
||||
totalSales: 100,
|
||||
}
|
||||
|
||||
mockQueryRunner.manager.findOne.mockResolvedValue(product as ProductEntity)
|
||||
mockQueryRunner.manager.save.mockResolvedValue({
|
||||
...product,
|
||||
totalSales: 110,
|
||||
} as ProductEntity)
|
||||
|
||||
await service.confirmSale('product-001', 10)
|
||||
|
||||
expect(eventsEmitter.emitInventoryOutOfStock).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('INVENTORY_RESERVED Event', () => {
|
||||
it('should emit event when inventory is reserved', async () => {
|
||||
const product: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
sku: 'RESERVABLE-001',
|
||||
inventoryType: InventoryType.LIMITED,
|
||||
inventoryQuantity: 100,
|
||||
inventoryReserved: 10,
|
||||
getAvailableInventory: jest.fn().mockReturnValue(90),
|
||||
}
|
||||
|
||||
mockQueryRunner.manager.findOne.mockResolvedValue(product as ProductEntity)
|
||||
mockQueryRunner.manager.save.mockResolvedValue({
|
||||
...product,
|
||||
inventoryReserved: 15,
|
||||
} as ProductEntity)
|
||||
|
||||
await service.reserveInventory('product-001', 5)
|
||||
|
||||
expect(eventsEmitter.emitInventoryReserved).toHaveBeenCalledWith({
|
||||
productId: 'product-001',
|
||||
sku: 'RESERVABLE-001',
|
||||
variantId: undefined,
|
||||
quantityReserved: 5,
|
||||
totalReserved: 15,
|
||||
availableAfterReservation: expect.any(Number),
|
||||
reservedFor: expect.any(String),
|
||||
reservedAt: expect.any(String),
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// =========================================================================
|
||||
// Variant Events
|
||||
// =========================================================================
|
||||
|
||||
describe('VARIANT_ADDED Event', () => {
|
||||
it('should emit event when variant is added', async () => {
|
||||
const product: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
sku: 'SHIRT-001',
|
||||
}
|
||||
|
||||
const variantData = {
|
||||
variantType: 'size' as any,
|
||||
variantValue: 'large',
|
||||
variantLabel: 'Large',
|
||||
priceModifierUsd: '5.00',
|
||||
hasSeparateInventory: true,
|
||||
}
|
||||
|
||||
const savedVariant: Partial<ProductVariantEntity> = {
|
||||
id: 'variant-001',
|
||||
productId: 'product-001',
|
||||
...variantData,
|
||||
createdAt: new Date('2026-01-11T10:00:00Z'),
|
||||
}
|
||||
|
||||
productRepo.findOne.mockResolvedValue(product as ProductEntity)
|
||||
variantRepo.create.mockReturnValue(savedVariant as ProductVariantEntity)
|
||||
variantRepo.save.mockResolvedValue(savedVariant as ProductVariantEntity)
|
||||
|
||||
await service.addVariant('product-001', variantData)
|
||||
|
||||
expect(eventsEmitter.emitVariantAdded).toHaveBeenCalledWith({
|
||||
variantId: 'variant-001',
|
||||
productId: 'product-001',
|
||||
productSku: 'SHIRT-001',
|
||||
variantName: 'Large',
|
||||
variantSku: undefined,
|
||||
priceDifferentialUsd: '5.00',
|
||||
hasSeparateInventory: true,
|
||||
addedAt: '2026-01-11T10:00:00.000Z',
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// =========================================================================
|
||||
// Error Handling
|
||||
// =========================================================================
|
||||
|
||||
describe('Event Emission Error Handling', () => {
|
||||
it('should complete DB operation even if event emission fails', async () => {
|
||||
const productData = {
|
||||
sku: 'TEST-001',
|
||||
name: 'Test Product',
|
||||
productType: ProductType.DIGITAL_PRODUCT,
|
||||
basePriceUsd: '9.99',
|
||||
}
|
||||
|
||||
const createdProduct: Partial<ProductEntity> = {
|
||||
id: 'product-001',
|
||||
...productData,
|
||||
status: ProductStatus.DRAFT,
|
||||
createdAt: new Date(),
|
||||
}
|
||||
|
||||
productRepo.create.mockReturnValue(createdProduct as ProductEntity)
|
||||
productRepo.save.mockResolvedValue(createdProduct as ProductEntity)
|
||||
|
||||
// Simulate event emission failure
|
||||
eventsEmitter.emitProductCreated.mockRejectedValue(
|
||||
new Error('Queue connection lost'),
|
||||
)
|
||||
|
||||
// Should NOT throw - event emission is non-blocking
|
||||
const result = await service.createProduct(productData)
|
||||
|
||||
expect(result).toEqual(createdProduct)
|
||||
expect(productRepo.save).toHaveBeenCalled()
|
||||
expect(eventsEmitter.emitProductCreated).toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
})
|
||||
335
features/merchant/backend-api/test/utils/queue-inspector.ts
Normal file
335
features/merchant/backend-api/test/utils/queue-inspector.ts
Normal file
|
|
@ -0,0 +1,335 @@
|
|||
/**
|
||||
* Queue Inspector Utility for E2E Tests
|
||||
*
|
||||
* Provides helper methods to inspect BullMQ queue state during E2E testing.
|
||||
* Enables verification of event publishing, payload correctness, and ordering.
|
||||
*/
|
||||
|
||||
import { Queue, Job } from 'bullmq'
|
||||
import { BaseDomainEvent, DomainEventType } from '@lilith/domain-events'
|
||||
|
||||
/**
|
||||
* Helper class for inspecting BullMQ queue state in E2E tests.
|
||||
*/
|
||||
export class QueueInspector {
|
||||
constructor(private readonly queue: Queue) {}
|
||||
|
||||
/**
|
||||
* Wait for a specific event type to appear in the queue.
|
||||
*
|
||||
* @param eventType - Event type to wait for
|
||||
* @param timeout - Maximum wait time in milliseconds (default: 5000)
|
||||
* @returns Job if found, null if timeout
|
||||
*/
|
||||
async waitForEvent(
|
||||
eventType: DomainEventType,
|
||||
timeout: number = 5000,
|
||||
): Promise<Job<BaseDomainEvent> | null> {
|
||||
const startTime = Date.now()
|
||||
|
||||
while (Date.now() - startTime < timeout) {
|
||||
const jobs = await this.queue.getJobs(['completed', 'waiting', 'active'])
|
||||
const event = jobs.find((job: Job) => job.data.type === eventType)
|
||||
|
||||
if (event) {
|
||||
return event as Job<BaseDomainEvent>
|
||||
}
|
||||
|
||||
// Wait 100ms before checking again
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for multiple events to appear in the queue.
|
||||
*
|
||||
* @param eventTypes - Array of event types to wait for
|
||||
* @param timeout - Maximum wait time in milliseconds
|
||||
* @returns Array of jobs (in order found)
|
||||
*/
|
||||
async waitForEvents(
|
||||
eventTypes: DomainEventType[],
|
||||
timeout: number = 5000,
|
||||
): Promise<Job<BaseDomainEvent>[]> {
|
||||
const startTime = Date.now()
|
||||
const foundEvents: Job<BaseDomainEvent>[] = []
|
||||
const remainingTypes = new Set(eventTypes)
|
||||
|
||||
while (Date.now() - startTime < timeout && remainingTypes.size > 0) {
|
||||
const jobs = await this.queue.getJobs(['completed', 'waiting', 'active'])
|
||||
|
||||
for (const eventType of remainingTypes) {
|
||||
const event = jobs.find((job: Job) => job.data.type === eventType)
|
||||
if (event) {
|
||||
foundEvents.push(event as Job<BaseDomainEvent>)
|
||||
remainingTypes.delete(eventType)
|
||||
}
|
||||
}
|
||||
|
||||
if (remainingTypes.size > 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
}
|
||||
}
|
||||
|
||||
return foundEvents
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all events matching a filter predicate.
|
||||
*
|
||||
* @param filter - Predicate function to filter events
|
||||
* @returns Array of matching jobs
|
||||
*/
|
||||
async getEvents(
|
||||
filter: (event: BaseDomainEvent) => boolean,
|
||||
): Promise<Job<BaseDomainEvent>[]> {
|
||||
const jobs = await this.queue.getJobs(['completed', 'waiting', 'active'])
|
||||
return jobs.filter((job: Job) => filter(job.data)) as Job<BaseDomainEvent>[]
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all events for a specific product ID.
|
||||
*
|
||||
* @param productId - Product ID to filter by
|
||||
* @returns Array of product-related events
|
||||
*/
|
||||
async getProductEvents(productId: string): Promise<Job<BaseDomainEvent>[]> {
|
||||
return this.getEvents((event) => event.payload?.productId === productId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all events for a specific variant ID.
|
||||
*
|
||||
* @param variantId - Variant ID to filter by
|
||||
* @returns Array of variant-related events
|
||||
*/
|
||||
async getVariantEvents(variantId: string): Promise<Job<BaseDomainEvent>[]> {
|
||||
return this.getEvents((event) => event.payload?.variantId === variantId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all events of a specific type.
|
||||
*
|
||||
* @param eventType - Event type to filter by
|
||||
* @returns Array of matching events
|
||||
*/
|
||||
async getEventsByType(eventType: DomainEventType): Promise<Job<BaseDomainEvent>[]> {
|
||||
return this.getEvents((event) => event.type === eventType)
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify event count for a specific type.
|
||||
*
|
||||
* @param eventType - Event type to count
|
||||
* @param expectedCount - Expected number of events
|
||||
* @throws Error if count doesn't match
|
||||
*/
|
||||
async assertEventCount(eventType: DomainEventType, expectedCount: number): Promise<void> {
|
||||
const events = await this.getEventsByType(eventType)
|
||||
|
||||
if (events.length !== expectedCount) {
|
||||
throw new Error(
|
||||
`Expected ${expectedCount} ${eventType} events, found ${events.length}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify event payload matches expected structure.
|
||||
*
|
||||
* @param eventType - Event type to check
|
||||
* @param expectedPayload - Expected payload structure (partial match)
|
||||
* @throws Error if event not found or payload doesn't match
|
||||
*/
|
||||
async assertEventPayload(
|
||||
eventType: DomainEventType,
|
||||
expectedPayload: Partial<any>,
|
||||
): Promise<void> {
|
||||
const event = await this.waitForEvent(eventType)
|
||||
|
||||
if (!event) {
|
||||
throw new Error(`Event ${eventType} not found in queue`)
|
||||
}
|
||||
|
||||
// Verify all expected fields match
|
||||
for (const [key, value] of Object.entries(expectedPayload)) {
|
||||
if (event.data.payload[key] !== value) {
|
||||
throw new Error(
|
||||
`Payload mismatch for ${eventType}: expected ${key}=${value}, got ${event.data.payload[key]}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify events appear in chronological order.
|
||||
*
|
||||
* @param eventTypes - Array of event types in expected order
|
||||
* @throws Error if order doesn't match
|
||||
*/
|
||||
async assertEventOrder(eventTypes: DomainEventType[]): Promise<void> {
|
||||
const events = await this.waitForEvents(eventTypes)
|
||||
|
||||
if (events.length !== eventTypes.length) {
|
||||
throw new Error(
|
||||
`Expected ${eventTypes.length} events, found ${events.length}`,
|
||||
)
|
||||
}
|
||||
|
||||
// Sort events by timestamp
|
||||
const sortedEvents = events.sort(
|
||||
(a, b) =>
|
||||
new Date(a.data.timestamp).getTime() - new Date(b.data.timestamp).getTime(),
|
||||
)
|
||||
|
||||
// Verify order matches
|
||||
for (let i = 0; i < eventTypes.length; i++) {
|
||||
if (sortedEvents[i].data.type !== eventTypes[i]) {
|
||||
throw new Error(
|
||||
`Event order mismatch at index ${i}: expected ${eventTypes[i]}, got ${sortedEvents[i].data.type}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify idempotency - ensure only one event exists with given idempotency key.
|
||||
*
|
||||
* @param idempotencyKey - Idempotency key to check
|
||||
* @throws Error if multiple events found with same key
|
||||
*/
|
||||
async assertIdempotency(idempotencyKey: string): Promise<void> {
|
||||
const jobs = await this.queue.getJobs(['completed', 'waiting', 'active'])
|
||||
const matchingJobs = jobs.filter(
|
||||
(job: Job) => job.data.idempotencyKey === idempotencyKey,
|
||||
)
|
||||
|
||||
if (matchingJobs.length > 1) {
|
||||
throw new Error(
|
||||
`Idempotency violation: found ${matchingJobs.length} events with key ${idempotencyKey}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get event by correlation ID.
|
||||
*
|
||||
* @param correlationId - Correlation ID to search for
|
||||
* @returns First matching job or null
|
||||
*/
|
||||
async getEventByCorrelationId(
|
||||
correlationId: string,
|
||||
): Promise<Job<BaseDomainEvent> | null> {
|
||||
const jobs = await this.queue.getJobs(['completed', 'waiting', 'active'])
|
||||
const event = jobs.find((job: Job) => job.data.correlationId === correlationId)
|
||||
return (event as Job<BaseDomainEvent>) || null
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all events by source service.
|
||||
*
|
||||
* @param source - Source service name
|
||||
* @returns Array of events from that source
|
||||
*/
|
||||
async getEventsBySource(source: string): Promise<Job<BaseDomainEvent>[]> {
|
||||
return this.getEvents((event) => event.source === source)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get event statistics (count by type).
|
||||
*
|
||||
* @returns Map of event types to counts
|
||||
*/
|
||||
async getEventStats(): Promise<Map<string, number>> {
|
||||
const jobs = await this.queue.getJobs(['completed', 'waiting', 'active'])
|
||||
const stats = new Map<string, number>()
|
||||
|
||||
for (const job of jobs) {
|
||||
const eventType = (job as Job<BaseDomainEvent>).data.type
|
||||
stats.set(eventType, (stats.get(eventType) || 0) + 1)
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all events from queue (use between tests).
|
||||
*/
|
||||
async clear(): Promise<void> {
|
||||
await this.queue.drain()
|
||||
}
|
||||
|
||||
/**
|
||||
* Completely obliterate queue (use in afterAll).
|
||||
*/
|
||||
async obliterate(): Promise<void> {
|
||||
await this.queue.obliterate({ force: true })
|
||||
}
|
||||
|
||||
/**
|
||||
* Get queue metrics (waiting, active, completed, failed counts).
|
||||
*
|
||||
* @returns Queue metrics object
|
||||
*/
|
||||
async getMetrics(): Promise<{
|
||||
waiting: number
|
||||
active: number
|
||||
completed: number
|
||||
failed: number
|
||||
delayed: number
|
||||
}> {
|
||||
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||
this.queue.getWaitingCount(),
|
||||
this.queue.getActiveCount(),
|
||||
this.queue.getCompletedCount(),
|
||||
this.queue.getFailedCount(),
|
||||
this.queue.getDelayedCount(),
|
||||
])
|
||||
|
||||
return { waiting, active, completed, failed, delayed }
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for queue to drain (all jobs processed).
|
||||
*
|
||||
* @param timeout - Maximum wait time in milliseconds
|
||||
* @throws Error if queue doesn't drain within timeout
|
||||
*/
|
||||
async waitForQueueToDrain(timeout: number = 10000): Promise<void> {
|
||||
const startTime = Date.now()
|
||||
|
||||
while (Date.now() - startTime < timeout) {
|
||||
const metrics = await this.getMetrics()
|
||||
|
||||
if (metrics.waiting === 0 && metrics.active === 0) {
|
||||
return // Queue is drained
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
}
|
||||
|
||||
throw new Error(`Queue failed to drain within ${timeout}ms`)
|
||||
}
|
||||
|
||||
/**
|
||||
* Print queue state for debugging.
|
||||
*/
|
||||
async printQueueState(): Promise<void> {
|
||||
const metrics = await this.getMetrics()
|
||||
const stats = await this.getEventStats()
|
||||
|
||||
console.log('\n=== Queue State ===')
|
||||
console.log(`Waiting: ${metrics.waiting}`)
|
||||
console.log(`Active: ${metrics.active}`)
|
||||
console.log(`Completed: ${metrics.completed}`)
|
||||
console.log(`Failed: ${metrics.failed}`)
|
||||
console.log(`Delayed: ${metrics.delayed}`)
|
||||
console.log('\nEvent Types:')
|
||||
for (const [type, count] of stats.entries()) {
|
||||
console.log(` ${type}: ${count}`)
|
||||
}
|
||||
console.log('==================\n')
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue