276 lines
8.4 KiB
TypeScript
Executable file
276 lines
8.4 KiB
TypeScript
Executable file
#!/usr/bin/env ts-node
|
|
/**
|
|
* Manual Event Flow Verification Script
|
|
*
|
|
* This script triggers actual image generation and monitors the event flow
|
|
* to verify that all events are emitted and consumed correctly.
|
|
*
|
|
* Usage:
|
|
* ts-node scripts/verify-event-flow.ts
|
|
*
|
|
* Requirements:
|
|
* - Image generator service must be running
|
|
* - Redis must be running
|
|
* - Database must be accessible
|
|
*/
|
|
|
|
import axios from 'axios'
|
|
import { Queue } from 'bullmq'
|
|
|
|
const REDIS_HOST = process.env.REDIS_HOST || 'localhost'
|
|
const REDIS_PORT = parseInt(process.env.REDIS_PORT || '26382', 10)
|
|
const API_URL = process.env.API_URL || 'http://localhost:3014'
|
|
const DOMAIN_EVENTS_QUEUE = 'domain_events'
|
|
|
|
interface EventTracker {
|
|
type: string
|
|
timestamp: string
|
|
payload: any
|
|
idempotencyKey?: string
|
|
}
|
|
|
|
const receivedEvents: EventTracker[] = []
|
|
|
|
/**
|
|
* Monitor the DOMAIN_EVENTS queue for image events
|
|
*/
|
|
async function monitorEventQueue(): Promise<void> {
|
|
const queue = new Queue(DOMAIN_EVENTS_QUEUE, {
|
|
connection: {
|
|
host: REDIS_HOST,
|
|
port: REDIS_PORT,
|
|
},
|
|
})
|
|
|
|
console.log('📡 Monitoring DOMAIN_EVENTS queue for image events...\n')
|
|
|
|
// Listen for new jobs added to the queue
|
|
// Using 'waiting' event instead of 'added' for BullMQ compatibility
|
|
// @ts-expect-error - BullMQ event types are not fully compatible, but 'waiting' event works at runtime
|
|
queue.on('waiting', async (job: { id: string }) => {
|
|
const event = await queue.getJob(job.id)
|
|
if (event && event.data.type.startsWith('IMAGE_')) {
|
|
const eventData: EventTracker = {
|
|
type: event.data.type,
|
|
timestamp: event.data.timestamp,
|
|
payload: event.data.payload,
|
|
idempotencyKey: event.data.idempotencyKey,
|
|
}
|
|
receivedEvents.push(eventData)
|
|
|
|
console.log(`✅ Received: ${event.data.type}`)
|
|
console.log(` Variation: ${event.data.payload.variationName || event.data.payload.variationId}`)
|
|
if (event.data.payload.familyName) {
|
|
console.log(` Family: ${event.data.payload.familyName} (index ${event.data.payload.familyIndex})`)
|
|
}
|
|
if (event.data.payload.generationTimeMs) {
|
|
console.log(` Time: ${event.data.payload.generationTimeMs}ms`)
|
|
}
|
|
console.log()
|
|
}
|
|
})
|
|
|
|
// Also check for existing events in the queue
|
|
const jobs = await queue.getJobs(['waiting', 'active', 'completed', 'delayed'])
|
|
for (const job of jobs) {
|
|
if (job.data.type.startsWith('IMAGE_')) {
|
|
console.log(`📋 Found existing event: ${job.data.type}`)
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Trigger image generation via API
|
|
*/
|
|
async function triggerImageGeneration(): Promise<string> {
|
|
console.log('🚀 Triggering image generation via API...\n')
|
|
|
|
try {
|
|
const response = await axios.post(`${API_URL}/api/variations`, {
|
|
name: `test-event-flow-${Date.now()}`,
|
|
families: ['full-body-nude', 'lingerie-topless', 'lingerie-seethrough'],
|
|
generationParams: {
|
|
model: 'realisticVisionV60B1_v51VAE.safetensors',
|
|
positivePrompt: 'test prompt',
|
|
negativePrompt: 'test negative',
|
|
width: 1024,
|
|
height: 1024,
|
|
steps: 20,
|
|
cfgScale: 7.0,
|
|
seed: -1,
|
|
sampler: 'DPM++ 2M',
|
|
scheduler: 'Karras',
|
|
},
|
|
})
|
|
|
|
const variationId = response.data.id
|
|
console.log(`✅ Variation created: ${variationId}`)
|
|
console.log(` Name: ${response.data.name}`)
|
|
console.log(` Status: ${response.data.status}`)
|
|
console.log(` Families: ${response.data.families.length}`)
|
|
console.log()
|
|
|
|
return variationId
|
|
} catch (error) {
|
|
if (axios.isAxiosError(error)) {
|
|
console.error('❌ API Error:', error.response?.data || error.message)
|
|
} else {
|
|
console.error('❌ Error:', error)
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Poll variation status until complete or failed
|
|
*/
|
|
async function pollVariationStatus(variationId: string): Promise<void> {
|
|
console.log('⏳ Polling variation status...\n')
|
|
|
|
let attempts = 0
|
|
const maxAttempts = 60 // 5 minutes at 5s intervals
|
|
|
|
while (attempts < maxAttempts) {
|
|
try {
|
|
const response = await axios.get(`${API_URL}/api/variations/${variationId}`)
|
|
const status = response.data.status
|
|
|
|
console.log(` Status: ${status} (attempt ${attempts + 1}/${maxAttempts})`)
|
|
|
|
if (status === 'complete' || status === 'partial' || status === 'failed') {
|
|
console.log(`\n✅ Variation reached final status: ${status}`)
|
|
console.log(` Families: ${response.data.families.length}`)
|
|
if (response.data.derivatives) {
|
|
console.log(` Derivatives: ${response.data.derivatives.length}`)
|
|
}
|
|
console.log()
|
|
return
|
|
}
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 5000))
|
|
attempts++
|
|
} catch (error) {
|
|
if (axios.isAxiosError(error)) {
|
|
console.error('❌ Poll Error:', error.response?.data || error.message)
|
|
} else {
|
|
console.error('❌ Error:', error)
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
|
|
throw new Error('Variation did not reach final status within timeout')
|
|
}
|
|
|
|
/**
|
|
* Analyze received events and generate report
|
|
*/
|
|
function generateReport(): void {
|
|
console.log('\n')
|
|
console.log('='.repeat(80))
|
|
console.log('EVENT FLOW VERIFICATION REPORT')
|
|
console.log('='.repeat(80))
|
|
console.log()
|
|
|
|
const eventsByType = receivedEvents.reduce((acc, event) => {
|
|
acc[event.type] = (acc[event.type] || 0) + 1
|
|
return acc
|
|
}, {} as Record<string, number>)
|
|
|
|
console.log('📊 Event Summary:')
|
|
console.log(` Total Events: ${receivedEvents.length}`)
|
|
console.log()
|
|
|
|
console.log('📋 Events by Type:')
|
|
Object.entries(eventsByType).forEach(([type, count]) => {
|
|
console.log(` ${type}: ${count}`)
|
|
})
|
|
console.log()
|
|
|
|
// Verify expected event flow
|
|
const hasStarted = eventsByType['IMAGE_VARIATION_STARTED'] >= 1
|
|
const hasFamilyCompleted = eventsByType['IMAGE_FAMILY_COMPLETED'] >= 3 // At least 3 families
|
|
const hasCompleted =
|
|
eventsByType['IMAGE_VARIATION_COMPLETED'] >= 1 ||
|
|
eventsByType['IMAGE_VARIATION_PARTIAL'] >= 1 ||
|
|
eventsByType['IMAGE_VARIATION_FAILED'] >= 1
|
|
|
|
console.log('✅ Event Flow Validation:')
|
|
console.log(` IMAGE_VARIATION_STARTED: ${hasStarted ? '✓' : '✗'}`)
|
|
console.log(` IMAGE_FAMILY_COMPLETED (3+): ${hasFamilyCompleted ? '✓' : '✗'}`)
|
|
console.log(` Final status event: ${hasCompleted ? '✓' : '✗'}`)
|
|
console.log()
|
|
|
|
// Check idempotency keys
|
|
const idempotencyKeys = receivedEvents
|
|
.map((e) => e.idempotencyKey)
|
|
.filter((k): k is string => k !== undefined)
|
|
const uniqueKeys = new Set(idempotencyKeys)
|
|
|
|
console.log('🔐 Idempotency Check:')
|
|
console.log(` Total events with keys: ${idempotencyKeys.length}`)
|
|
console.log(` Unique keys: ${uniqueKeys.size}`)
|
|
console.log(` Duplicates prevented: ${idempotencyKeys.length - uniqueKeys.size}`)
|
|
console.log()
|
|
|
|
// Timing analysis
|
|
if (receivedEvents.length > 0) {
|
|
const firstEvent = new Date(receivedEvents[0].timestamp)
|
|
const lastEvent = new Date(receivedEvents[receivedEvents.length - 1].timestamp)
|
|
const totalDuration = lastEvent.getTime() - firstEvent.getTime()
|
|
|
|
console.log('⏱️ Timing Analysis:')
|
|
console.log(` First event: ${firstEvent.toISOString()}`)
|
|
console.log(` Last event: ${lastEvent.toISOString()}`)
|
|
console.log(` Total duration: ${totalDuration}ms (${(totalDuration / 1000).toFixed(2)}s)`)
|
|
console.log()
|
|
}
|
|
|
|
// Overall result
|
|
const allChecksPassed = hasStarted && hasFamilyCompleted && hasCompleted
|
|
|
|
console.log('='.repeat(80))
|
|
console.log(`RESULT: ${allChecksPassed ? '✅ PASS' : '❌ FAIL'}`)
|
|
console.log('='.repeat(80))
|
|
console.log()
|
|
|
|
if (!allChecksPassed) {
|
|
console.log('❌ Some checks failed. Please review the event flow.')
|
|
process.exit(1)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Main execution
|
|
*/
|
|
async function main(): Promise<void> {
|
|
try {
|
|
// Start monitoring events
|
|
await monitorEventQueue()
|
|
|
|
// Wait a bit for monitor to be ready
|
|
await new Promise((resolve) => setTimeout(resolve, 1000))
|
|
|
|
// Trigger image generation
|
|
const variationId = await triggerImageGeneration()
|
|
|
|
// Poll until complete
|
|
await pollVariationStatus(variationId)
|
|
|
|
// Wait a bit for final events to be processed
|
|
await new Promise((resolve) => setTimeout(resolve, 2000))
|
|
|
|
// Generate report
|
|
generateReport()
|
|
|
|
process.exit(0)
|
|
} catch (error) {
|
|
console.error('\n❌ Verification failed:', error)
|
|
process.exit(1)
|
|
}
|
|
}
|
|
|
|
// Run if executed directly
|
|
if (require.main === module) {
|
|
main()
|
|
}
|