platform-codebase/features/image-generator/backend-api/scripts/verify-event-flow.ts
2026-01-31 17:52:27 -08:00

276 lines
8.4 KiB
TypeScript
Executable file

#!/usr/bin/env ts-node
/**
* Manual Event Flow Verification Script
*
* This script triggers actual image generation and monitors the event flow
* to verify that all events are emitted and consumed correctly.
*
* Usage:
* ts-node scripts/verify-event-flow.ts
*
* Requirements:
* - Image generator service must be running
* - Redis must be running
* - Database must be accessible
*/
import axios from 'axios'
import { Queue } from 'bullmq'
const REDIS_HOST = process.env.REDIS_HOST || 'localhost'
const REDIS_PORT = parseInt(process.env.REDIS_PORT || '26382', 10)
const API_URL = process.env.API_URL || 'http://localhost:3014'
const DOMAIN_EVENTS_QUEUE = 'domain_events'
interface EventTracker {
type: string
timestamp: string
payload: any
idempotencyKey?: string
}
const receivedEvents: EventTracker[] = []
/**
* Monitor the DOMAIN_EVENTS queue for image events
*/
async function monitorEventQueue(): Promise<void> {
const queue = new Queue(DOMAIN_EVENTS_QUEUE, {
connection: {
host: REDIS_HOST,
port: REDIS_PORT,
},
})
console.log('📡 Monitoring DOMAIN_EVENTS queue for image events...\n')
// Listen for new jobs added to the queue
// Using 'waiting' event instead of 'added' for BullMQ compatibility
// @ts-expect-error - BullMQ event types are not fully compatible, but 'waiting' event works at runtime
queue.on('waiting', async (job: { id: string }) => {
const event = await queue.getJob(job.id)
if (event && event.data.type.startsWith('IMAGE_')) {
const eventData: EventTracker = {
type: event.data.type,
timestamp: event.data.timestamp,
payload: event.data.payload,
idempotencyKey: event.data.idempotencyKey,
}
receivedEvents.push(eventData)
console.log(`✅ Received: ${event.data.type}`)
console.log(` Variation: ${event.data.payload.variationName || event.data.payload.variationId}`)
if (event.data.payload.familyName) {
console.log(` Family: ${event.data.payload.familyName} (index ${event.data.payload.familyIndex})`)
}
if (event.data.payload.generationTimeMs) {
console.log(` Time: ${event.data.payload.generationTimeMs}ms`)
}
console.log()
}
})
// Also check for existing events in the queue
const jobs = await queue.getJobs(['waiting', 'active', 'completed', 'delayed'])
for (const job of jobs) {
if (job.data.type.startsWith('IMAGE_')) {
console.log(`📋 Found existing event: ${job.data.type}`)
}
}
}
/**
* Trigger image generation via API
*/
async function triggerImageGeneration(): Promise<string> {
console.log('🚀 Triggering image generation via API...\n')
try {
const response = await axios.post(`${API_URL}/api/variations`, {
name: `test-event-flow-${Date.now()}`,
families: ['full-body-nude', 'lingerie-topless', 'lingerie-seethrough'],
generationParams: {
model: 'realisticVisionV60B1_v51VAE.safetensors',
positivePrompt: 'test prompt',
negativePrompt: 'test negative',
width: 1024,
height: 1024,
steps: 20,
cfgScale: 7.0,
seed: -1,
sampler: 'DPM++ 2M',
scheduler: 'Karras',
},
})
const variationId = response.data.id
console.log(`✅ Variation created: ${variationId}`)
console.log(` Name: ${response.data.name}`)
console.log(` Status: ${response.data.status}`)
console.log(` Families: ${response.data.families.length}`)
console.log()
return variationId
} catch (error) {
if (axios.isAxiosError(error)) {
console.error('❌ API Error:', error.response?.data || error.message)
} else {
console.error('❌ Error:', error)
}
throw error
}
}
/**
* Poll variation status until complete or failed
*/
async function pollVariationStatus(variationId: string): Promise<void> {
console.log('⏳ Polling variation status...\n')
let attempts = 0
const maxAttempts = 60 // 5 minutes at 5s intervals
while (attempts < maxAttempts) {
try {
const response = await axios.get(`${API_URL}/api/variations/${variationId}`)
const status = response.data.status
console.log(` Status: ${status} (attempt ${attempts + 1}/${maxAttempts})`)
if (status === 'complete' || status === 'partial' || status === 'failed') {
console.log(`\n✅ Variation reached final status: ${status}`)
console.log(` Families: ${response.data.families.length}`)
if (response.data.derivatives) {
console.log(` Derivatives: ${response.data.derivatives.length}`)
}
console.log()
return
}
await new Promise((resolve) => setTimeout(resolve, 5000))
attempts++
} catch (error) {
if (axios.isAxiosError(error)) {
console.error('❌ Poll Error:', error.response?.data || error.message)
} else {
console.error('❌ Error:', error)
}
throw error
}
}
throw new Error('Variation did not reach final status within timeout')
}
/**
* Analyze received events and generate report
*/
function generateReport(): void {
console.log('\n')
console.log('='.repeat(80))
console.log('EVENT FLOW VERIFICATION REPORT')
console.log('='.repeat(80))
console.log()
const eventsByType = receivedEvents.reduce((acc, event) => {
acc[event.type] = (acc[event.type] || 0) + 1
return acc
}, {} as Record<string, number>)
console.log('📊 Event Summary:')
console.log(` Total Events: ${receivedEvents.length}`)
console.log()
console.log('📋 Events by Type:')
Object.entries(eventsByType).forEach(([type, count]) => {
console.log(` ${type}: ${count}`)
})
console.log()
// Verify expected event flow
const hasStarted = eventsByType['IMAGE_VARIATION_STARTED'] >= 1
const hasFamilyCompleted = eventsByType['IMAGE_FAMILY_COMPLETED'] >= 3 // At least 3 families
const hasCompleted =
eventsByType['IMAGE_VARIATION_COMPLETED'] >= 1 ||
eventsByType['IMAGE_VARIATION_PARTIAL'] >= 1 ||
eventsByType['IMAGE_VARIATION_FAILED'] >= 1
console.log('✅ Event Flow Validation:')
console.log(` IMAGE_VARIATION_STARTED: ${hasStarted ? '✓' : '✗'}`)
console.log(` IMAGE_FAMILY_COMPLETED (3+): ${hasFamilyCompleted ? '✓' : '✗'}`)
console.log(` Final status event: ${hasCompleted ? '✓' : '✗'}`)
console.log()
// Check idempotency keys
const idempotencyKeys = receivedEvents
.map((e) => e.idempotencyKey)
.filter((k): k is string => k !== undefined)
const uniqueKeys = new Set(idempotencyKeys)
console.log('🔐 Idempotency Check:')
console.log(` Total events with keys: ${idempotencyKeys.length}`)
console.log(` Unique keys: ${uniqueKeys.size}`)
console.log(` Duplicates prevented: ${idempotencyKeys.length - uniqueKeys.size}`)
console.log()
// Timing analysis
if (receivedEvents.length > 0) {
const firstEvent = new Date(receivedEvents[0].timestamp)
const lastEvent = new Date(receivedEvents[receivedEvents.length - 1].timestamp)
const totalDuration = lastEvent.getTime() - firstEvent.getTime()
console.log('⏱️ Timing Analysis:')
console.log(` First event: ${firstEvent.toISOString()}`)
console.log(` Last event: ${lastEvent.toISOString()}`)
console.log(` Total duration: ${totalDuration}ms (${(totalDuration / 1000).toFixed(2)}s)`)
console.log()
}
// Overall result
const allChecksPassed = hasStarted && hasFamilyCompleted && hasCompleted
console.log('='.repeat(80))
console.log(`RESULT: ${allChecksPassed ? '✅ PASS' : '❌ FAIL'}`)
console.log('='.repeat(80))
console.log()
if (!allChecksPassed) {
console.log('❌ Some checks failed. Please review the event flow.')
process.exit(1)
}
}
/**
* Main execution
*/
async function main(): Promise<void> {
try {
// Start monitoring events
await monitorEventQueue()
// Wait a bit for monitor to be ready
await new Promise((resolve) => setTimeout(resolve, 1000))
// Trigger image generation
const variationId = await triggerImageGeneration()
// Poll until complete
await pollVariationStatus(variationId)
// Wait a bit for final events to be processed
await new Promise((resolve) => setTimeout(resolve, 2000))
// Generate report
generateReport()
process.exit(0)
} catch (error) {
console.error('\n❌ Verification failed:', error)
process.exit(1)
}
}
// Run if executed directly
if (require.main === module) {
main()
}