feat(cli): Add stream subcommand with real-time data/logs/media streaming support and --follow flag handling

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Quinn Ftw 2026-02-27 23:51:15 -08:00
parent 07f289c97f
commit 56cf49f890
2 changed files with 507 additions and 0 deletions

View file

@ -0,0 +1,490 @@
/**
* Streaming feature commands
*
* Standalone runner for the streaming companion stack (streaming API + SSO).
* Uses dedicated Docker compose files with LILITH_ENV-based volume separation.
*
* Commands:
* - stream Default to stream:prod
* - stream:dev Dev mode with watch
* - stream:prod Prod mode with compiled builds
* - stream:stop Stop all streaming services
* - stream:logs Follow Docker logs
*/
import { resolve } from 'node:path';
import { existsSync } from 'node:fs';
import { spawn, execFileSync, type ChildProcess } from 'node:child_process';
import { colors } from '../../../utils/colors';
import { Logger } from '../../../utils/logger';
import { FeatureServiceRegistry } from '../../../core/feature-service-registry';
import type { CommandContext, CommandResult } from '../@core';
const PLATFORM_ROOT = resolve(import.meta.dirname, '../../../../..');
const STACK_NAME = 'streaming';
// =============================================================================
// Docker Compose Paths
// =============================================================================
const COMPOSE_FILES = {
streaming: resolve(PLATFORM_ROOT, 'codebase/features/streaming/docker-compose.yml'),
sso: resolve(PLATFORM_ROOT, 'codebase/features/sso/docker-compose.yml'),
};
// Container names for health checks
const HEALTH_CONTAINERS = [
'lilith-streaming-postgres',
'lilith-streaming-redis',
'lilith-sso-postgres',
'lilith-sso-redis',
];
// =============================================================================
// Types
// =============================================================================
interface ServiceDef {
name: string;
stack: string;
port: number;
dir: string;
cmd: string;
args: string[];
env?: Record<string, string>;
}
// =============================================================================
// Service Definitions
// =============================================================================
function getServiceDefs(mode: 'dev' | 'prod'): ServiceDef[] {
return [
{
name: 'sso/backend-api',
stack: STACK_NAME,
port: 4001,
dir: 'codebase/features/sso/backend-api',
cmd: 'bun',
args: mode === 'dev' ? ['run', 'start:dev'] : ['run', 'start:prod'],
},
{
name: 'streaming/backend-api',
stack: STACK_NAME,
port: 3130,
dir: 'codebase/features/streaming/backend-api',
cmd: 'bun',
args: mode === 'dev' ? ['run', 'dev'] : ['run', 'start:prod'],
},
];
}
// =============================================================================
// Output Helper
// =============================================================================
function print(text: string): void {
process.stdout.write(text + '\n');
}
// =============================================================================
// Docker Infrastructure
// =============================================================================
function checkDocker(): boolean {
try {
execFileSync('docker', ['info'], { stdio: 'pipe' });
return true;
} catch {
return false;
}
}
function startCompose(composePath: string, env: string, logger: Logger): void {
logger.info(`Starting containers from ${composePath.split('/').slice(-3).join('/')}...`);
execFileSync('docker', ['compose', '-f', composePath, 'up', '-d'], {
stdio: 'inherit',
env: { ...process.env, LILITH_ENV: env },
});
}
function stopCompose(composePath: string): void {
try {
execFileSync('docker', ['compose', '-f', composePath, 'down'], {
stdio: 'inherit',
});
} catch {
// Container might already be stopped
}
}
async function waitForContainerHealth(
containerName: string,
timeoutMs: number,
): Promise<boolean> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
try {
const status = execFileSync(
'docker',
['inspect', '--format', '{{.State.Health.Status}}', containerName],
{ encoding: 'utf-8', stdio: 'pipe' },
).trim();
if (status === 'healthy') return true;
} catch {
// Container might not exist yet
}
await new Promise<void>((r) => setTimeout(r, 1000));
}
return false;
}
async function ensureStreamingInfra(env: string, logger: Logger): Promise<void> {
if (!checkDocker()) {
logger.error('Docker is not running — start Docker first');
process.exit(1);
}
startCompose(COMPOSE_FILES.sso, env, logger);
startCompose(COMPOSE_FILES.streaming, env, logger);
logger.info('Waiting for containers to be healthy (30s timeout)...');
const results = await Promise.all(
HEALTH_CONTAINERS.map(async (name) => {
const healthy = await waitForContainerHealth(name, 30_000);
if (healthy) {
logger.success(`${name} healthy`);
} else {
logger.error(`${name} did not become healthy within 30s`);
}
return healthy;
}),
);
if (results.some((ok) => !ok)) {
logger.error('Not all containers are healthy — aborting');
process.exit(1);
}
logger.success('All Docker containers healthy');
}
// =============================================================================
// Migrations
// =============================================================================
function runStreamingMigrations(logger: Logger): void {
const migrationDir = resolve(PLATFORM_ROOT, 'codebase/features/streaming/backend-api');
logger.info('Running streaming migrations...');
try {
execFileSync('bun', ['run', 'migration:run'], {
cwd: migrationDir,
stdio: 'inherit',
});
logger.success('Migrations complete');
} catch (error) {
logger.error('Migration failed', error instanceof Error ? error : new Error(String(error)));
process.exit(1);
}
}
// =============================================================================
// Build (prod only)
// =============================================================================
function buildServices(logger: Logger): void {
const dirs = [
{ name: 'sso', path: resolve(PLATFORM_ROOT, 'codebase/features/sso/backend-api') },
{ name: 'streaming', path: resolve(PLATFORM_ROOT, 'codebase/features/streaming/backend-api') },
];
for (const { name, path } of dirs) {
logger.info(`Building ${name}...`);
execFileSync('bun', ['run', 'build'], { cwd: path, stdio: 'inherit' });
logger.success(`${name} built`);
}
}
// =============================================================================
// Service Spawner
// =============================================================================
function spawnService(svc: ServiceDef, registry: FeatureServiceRegistry): ChildProcess {
const cwd = resolve(PLATFORM_ROOT, svc.dir);
if (!existsSync(cwd)) {
print(colors.error(` ${colors.symbols.error} Directory not found: ${cwd}`));
process.exit(1);
}
if (registry.isRunning(svc.name, svc.port)) {
print(colors.muted(`${svc.name} already running on port ${svc.port}, skipping`));
return spawn('true', [], { stdio: 'ignore' });
}
const child = spawn(svc.cmd, svc.args, {
cwd,
stdio: 'inherit',
env: { ...process.env, ...svc.env },
});
if (child.pid !== undefined) {
registry.register({
name: svc.name,
stack: svc.stack,
port: svc.port,
pid: child.pid,
startedAt: Date.now(),
});
}
child.on('exit', () => {
registry.unregister(svc.name);
});
return child;
}
// =============================================================================
// Banner
// =============================================================================
function printStreamingBanner(mode: 'dev' | 'prod'): void {
const W = 58;
const line = '═'.repeat(W);
const pad = (s: string, w = W) => s + ' '.repeat(Math.max(0, w - s.length));
const title = mode === 'dev'
? 'Streaming Companion — Dev Mode'
: 'Streaming Companion — Prod Mode';
print('');
print(colors.primary(`${line}`));
print(colors.primary(`${' '.repeat(W)}`));
print(
colors.primary(' ║') +
colors.primary.bold(` ${pad(title, W - 3)}`) +
colors.primary('║'),
);
print(colors.primary(`${' '.repeat(W)}`));
// Backend services
const backends = [
{ name: 'sso/backend-api', port: 4001 },
{ name: 'streaming/backend-api', port: 3130 },
];
for (const { name, port } of backends) {
const label = `${name}`;
const portStr = `port ${port}`;
const p = Math.max(0, W - label.length - portStr.length - 1);
print(
colors.primary(' ║') +
`${label}${' '.repeat(p)}${colors.muted(portStr)}` +
colors.primary(' ║'),
);
}
print(colors.primary(`${' '.repeat(W)}`));
// Docker services
const docker = [
{ name: 'streaming-postgres', port: 25468 },
{ name: 'streaming-redis', port: 26398 },
{ name: 'sso-postgres', port: 25440 },
{ name: 'sso-redis', port: 26386 },
];
print(
colors.primary(' ║') +
colors.muted(pad(' Docker:', W)) +
colors.primary('║'),
);
for (const { name, port } of docker) {
const label = `${name}`;
const portStr = `port ${port}`;
const p = Math.max(0, W - label.length - portStr.length - 1);
print(
colors.primary(' ║') +
`${colors.muted(label)}${' '.repeat(p)}${colors.muted(portStr)}` +
colors.primary(' ║'),
);
}
print(colors.primary(`${' '.repeat(W)}`));
// URLs
const urls = [
{ label: 'Streaming API', url: 'http://localhost:3130' },
{ label: 'SSO API', url: 'http://localhost:4001' },
];
if (mode === 'dev') {
urls.push({ label: 'Swagger', url: 'http://localhost:3130/api/docs' });
}
for (const { label, url } of urls) {
const text = ` ${label}: ${url}`;
print(
colors.primary(' ║') +
colors.primary.bold(pad(text, W)) +
colors.primary('║'),
);
}
print(colors.primary(`${' '.repeat(W)}`));
print(colors.primary(`${line}`));
print('');
}
// =============================================================================
// Main Runner
// =============================================================================
async function runStreaming(mode: 'dev' | 'prod'): Promise<CommandResult> {
const logger = new Logger({ context: 'Streaming' });
const registry = new FeatureServiceRegistry();
const env = mode === 'dev' ? 'dev' : 'prod';
logger.info(`Starting streaming stack in ${mode} mode (LILITH_ENV=${env})`);
// Step 1: Docker infrastructure
await ensureStreamingInfra(env, logger);
// Step 2: Migrations
runStreamingMigrations(logger);
// Step 3: Build (prod only)
if (mode === 'prod') {
buildServices(logger);
}
// Step 4: Spawn services
const services = getServiceDefs(mode);
const processes: ChildProcess[] = [];
let exiting = false;
function cleanup(): void {
if (exiting) return;
exiting = true;
print('');
logger.info('Stopping all services...');
for (const child of processes) {
child.kill('SIGTERM');
}
registry.unregisterStack(STACK_NAME);
}
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);
// Start SSO first, then streaming with 2s stagger
for (let i = 0; i < services.length; i++) {
if (i > 0) await new Promise<void>((r) => setTimeout(r, 2000));
const svc = services[i];
const child = spawnService(svc, registry);
processes.push(child);
child.on('exit', (code) => {
if (!exiting) {
print(
colors.warning(` ${colors.symbols.warning} ${svc.name} exited (code ${code})`),
);
}
});
}
printStreamingBanner(mode);
// Keep alive until the last service exits or Ctrl+C
return new Promise<CommandResult>((resolvePromise) => {
const lastService = processes[processes.length - 1];
lastService.on('exit', (code) => {
cleanup();
resolvePromise({ code: code ?? 0 });
});
lastService.on('error', (err) => {
cleanup();
resolvePromise({ code: 1, error: err.message });
});
});
}
// =============================================================================
// Stop
// =============================================================================
async function stopStreaming(): Promise<CommandResult> {
const logger = new Logger({ context: 'Streaming' });
const registry = new FeatureServiceRegistry();
// Kill registered backend processes
const running = registry.getStack(STACK_NAME);
if (running.length > 0) {
logger.info(`Stopping ${running.length} backend process(es)...`);
for (const svc of running) {
try {
process.kill(svc.pid, 'SIGTERM');
logger.success(`Stopped ${svc.name} (PID ${svc.pid})`);
} catch {
logger.warn(`${svc.name} (PID ${svc.pid}) already stopped`);
}
}
registry.unregisterStack(STACK_NAME);
} else {
logger.info('No tracked backend processes');
}
// Stop Docker containers
logger.info('Stopping Docker containers...');
stopCompose(COMPOSE_FILES.streaming);
stopCompose(COMPOSE_FILES.sso);
logger.success('All streaming services stopped');
return { code: 0 };
}
// =============================================================================
// Logs
// =============================================================================
async function followStreamLogs(): Promise<CommandResult> {
const logger = new Logger({ context: 'Streaming' });
if (!checkDocker()) {
logger.error('Docker is not running');
return { code: 1 };
}
// Follow logs from both compose files
const logProcesses: ChildProcess[] = [];
for (const composePath of Object.values(COMPOSE_FILES)) {
const child = spawn('docker', ['compose', '-f', composePath, 'logs', '-f', '--tail=50'], {
stdio: 'inherit',
});
logProcesses.push(child);
}
return new Promise<CommandResult>((resolvePromise) => {
process.on('SIGINT', () => {
for (const child of logProcesses) {
child.kill('SIGTERM');
}
resolvePromise({ code: 0 });
});
});
}
// =============================================================================
// Exported Command Handlers
// =============================================================================
export const stream = (_ctx: CommandContext) => runStreaming('prod');
export const streamDev = (_ctx: CommandContext) => runStreaming('dev');
export const streamProd = (_ctx: CommandContext) => runStreaming('prod');
export const streamStop = (_ctx: CommandContext) => stopStreaming();
export const streamLogs = (_ctx: CommandContext) => followStreamLogs();

View file

@ -126,6 +126,13 @@ const lazyCommands: Record<string, [string, string]> = {
'mock:profile': ['./commands/mock/index', 'mockProfile'],
'mock:profile-assistant': ['./commands/mock/index', 'mockProfileAssistant'],
'mock:list': ['./commands/mock/index', 'mockList'],
// Streaming feature (standalone stack)
'stream': ['./commands/stream/index', 'stream'],
'stream:dev': ['./commands/stream/index', 'streamDev'],
'stream:prod': ['./commands/stream/index', 'streamProd'],
'stream:stop': ['./commands/stream/index', 'streamStop'],
'stream:logs': ['./commands/stream/index', 'streamLogs'],
};
/**
@ -228,6 +235,16 @@ ${colors.accent('Mock Development (No Docker):')}
mock:profile-assistant Start profile + AI assistant with MSW mocks (port 5130)
mock:list List available mock targets
${colors.accent('Streaming Feature (Standalone Stack):')}
stream Start streaming stack in prod mode (default)
stream:dev Dev mode: watch mode, dev DB volumes
stream:prod Prod mode: compiled builds, prod DB volumes
stream:stop Stop all streaming containers + backend processes
stream:logs Follow Docker container logs (streaming + SSO)
Services: SSO (4001), Streaming API (3130)
Docker: streaming-postgres (25468), streaming-redis (26398),
sso-postgres (25440), sso-redis (26386)
${colors.accent('Production Commands:')}
prod [group] Start production cluster (real domains, SSL)
Default group: platform (same group resolution as dev)