lilith-platform.live/codebase/@features/user-data/shared/src/batch-queue.ts

68 lines
1.6 KiB
TypeScript

import { createLogger } from './logger';
import type { BatchedEvent } from './types';
export class BatchQueue {
private queue: BatchedEvent[] = [];
private batchSize: number;
private batchInterval: number;
private flushCallback: (events: BatchedEvent[]) => Promise<void>;
private intervalId?: ReturnType<typeof setInterval>;
private log;
constructor(
batchSize: number,
batchInterval: number,
flushCallback: (events: BatchedEvent[]) => Promise<void>,
debugLogging = false,
) {
this.batchSize = batchSize;
this.batchInterval = batchInterval;
this.flushCallback = flushCallback;
this.log = createLogger(debugLogging);
this.startInterval();
}
add(event: BatchedEvent): void {
this.queue.push(event);
this.log.debug(`Event queued: ${event.type} (${this.queue.length} pending)`);
if (this.queue.length >= this.batchSize) {
this.flush();
}
}
async flush(): Promise<void> {
if (this.queue.length === 0) {
return;
}
const eventsToFlush = [...this.queue];
this.queue = [];
this.log.debug(`Flushing batch: ${eventsToFlush.length} events`);
try {
await this.flushCallback(eventsToFlush);
this.log.debug('Batch flushed successfully');
} catch (error) {
this.log.error('Batch flush failed:', error);
// Re-queue failed events
this.queue.unshift(...eventsToFlush);
}
}
private startInterval(): void {
this.intervalId = setInterval(() => {
this.flush();
}, this.batchInterval);
}
destroy(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
}
this.flush();
}
}