/** * @lilith/broadcast-controller (self-contained entrypoint) * * Thin Bun.serve wrapper + embedded chat UI for the droplet use-case. * The real production logic lives in the sibling modules: * - obs-client.ts * - fanout-manager.ts * - destination-store.ts * - broadcast-controller.ts ← the core engine (BroadcastController) * - llm-agent.ts * * This file now: * - wires the core exactly once * - keeps the original zero-dependency, single-file-deploy UI + server behavior * - exposes the same /api/* surface (plus additional direct endpoints for parity with backend-api) * - delegates every action through the typed core (no duplication of business logic) * * Run (unchanged from skeleton): * XAI_API_KEY=... OBS_WS_URL=... OBS_WS_PASSWORD=... UI_PASSPHRASE=... \ * bun run index.ts * * The collective keeps the embedded controller working while the architecture grows. */ import { type Subprocess } from "bun"; // only for type of active fanouts in status (we delegate) import type { Destination, ChatMessage, ToolCallResult, } from "../shared/types"; import { ObsClient } from "./obs-client"; import { FanoutManager } from "./fanout-manager"; import { DestinationStore } from "./destination-store"; import { BroadcastController } from "./broadcast-controller"; import { LlmAgent } from "./llm-agent"; // ---------------- env ---------------- const PORT = Number(process.env.PORT || 8080); const OBS_WS_URL = process.env.OBS_WS_URL || "ws://127.0.0.1:4455"; const OBS_WS_PASSWORD = process.env.OBS_WS_PASSWORD || ""; const XAI_API_KEY = process.env.XAI_API_KEY || ""; const UI_PASSPHRASE = process.env.UI_PASSPHRASE || "change-this-immediately"; const INITIAL_RTMP_TARGETS: Destination[] = process.env.INITIAL_RTMP_TARGETS ? (JSON.parse(process.env.INITIAL_RTMP_TARGETS) as any[]).filter( (x): x is Destination => typeof x?.name === "string" && typeof x?.url === "string", ) : []; if (!XAI_API_KEY) { console.warn("[controller] WARNING: XAI_API_KEY not set. LLM chat will fail until provided."); } // ---------------- wire the core (this is the controller) ---------------- const obs = new ObsClient(OBS_WS_URL, OBS_WS_PASSWORD); const fanout = new FanoutManager(); const store = new DestinationStore(); // uses DESTINATIONS_FILE or /data/destinations.json const core = new BroadcastController({ obs, fanout, store }); const llmAgent = new LlmAgent({ controller: core, xaiApiKey: XAI_API_KEY }); await store.load(INITIAL_RTMP_TARGETS); // ---------------- HTTP server + chat UI (preserved behavior + enhanced surface) ---------------- function isAuthed(url: URL): boolean { const pass = url.searchParams.get("p") || ""; return pass === UI_PASSPHRASE; } const server = Bun.serve({ port: PORT, async fetch(req) { const url = new URL(req.url); const authed = isAuthed(url); if (url.pathname === "/") { if (!authed) { return new Response(loginHtml(), { headers: { "Content-Type": "text/html" } }); } return new Response(indexHtml(), { headers: { "Content-Type": "text/html; charset=utf-8" } }); } // health (public) if (url.pathname === "/health") { return Response.json({ ok: true, obsReady: obs.isConnected }); } // ---- protected routes ---- if (!authed) { return new Response("unauthorized", { status: 401 }); } // status (legacy shape + new full) if (url.pathname === "/api/status") { try { const summary = await core.getFullStatus(); const status = await core.getStatusSummary(); return Response.json({ ok: true, summary, status, destinations: core.store.list(), fanouts: fanout.activeNames, }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } // direct control surface (added for completeness; mirrors backend-api) if (url.pathname === "/api/scenes" && req.method === "GET") { try { const scenes = await core.listScenes(); return Response.json({ ok: true, scenes }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname === "/api/scenes/switch" && req.method === "POST") { try { const { sceneName } = await req.json(); if (!sceneName) return Response.json({ error: "sceneName required" }, { status: 400 }); const r = await core.switchScene(sceneName); return Response.json({ ok: true, ...r }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname === "/api/stream/status" && req.method === "GET") { try { const s = await core.getStreamStatus(); return Response.json({ ok: true, stream: s }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname === "/api/broadcast/start" && req.method === "POST") { try { const r = await core.startBroadcast(); return Response.json({ ok: true, ...r }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname === "/api/broadcast/stop" && req.method === "POST") { try { const r = await core.stopBroadcast(); return Response.json({ ok: true, ...r }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname === "/api/destinations" && req.method === "GET") { try { const destinations = await core.listDestinations(); return Response.json({ ok: true, destinations }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname === "/api/destinations" && req.method === "POST") { try { const { name, url: destUrl } = await req.json(); if (!name || !destUrl) return Response.json({ error: "name and url required" }, { status: 400 }); const destinations = await core.addDestination(name, destUrl); return Response.json({ ok: true, destinations }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname.startsWith("/api/destinations/") && req.method === "DELETE") { try { const name = url.pathname.split("/").pop()!; const destinations = await core.removeDestination(name); return Response.json({ ok: true, destinations }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } if (url.pathname === "/api/text" && req.method === "POST") { try { const { inputName, text } = await req.json(); if (!inputName || typeof text !== "string") { return Response.json({ error: "inputName and text required" }, { status: 400 }); } const r = await core.setTextSource(inputName, text); return Response.json({ ok: true, ...r }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } // LLM chat (unchanged contract) if (url.pathname === "/api/chat" && req.method === "POST") { const body = await req.json().catch(() => ({})); const message: string = body.message || ""; const history: ChatMessage[] = body.history || []; if (!message.trim()) return Response.json({ error: "empty message" }, { status: 400 }); try { const { reply, toolResults } = await llmAgent.runChat(message, history); return Response.json({ reply, toolResults }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } // legacy quick (still works, now delegates) if (url.pathname === "/api/quick" && req.method === "POST") { const { action } = await req.json(); try { let result: unknown; if (action === "start") result = await core.startBroadcast(); else if (action === "stop") result = await core.stopBroadcast(); else if (action === "status") result = await core.getFullStatus(); else if (action === "scenes") result = { scenes: await core.listScenes() }; else return Response.json({ error: "unknown action" }, { status: 400 }); return Response.json({ ok: true, result }); } catch (e: any) { return Response.json({ error: e.message || String(e) }, { status: 500 }); } } return new Response("not found", { status: 404 }); }, }); console.log(`[controller] listening on :${PORT}`); console.log(`[controller] OBS target: ${OBS_WS_URL}`); console.log( `[controller] UI passphrase: ${UI_PASSPHRASE === "change-this-immediately" ? "!!! CHANGE VIA ENV !!!" : "(set)"}`, ); // Auto-connect to OBS (the client itself retries) obs .connect() .then(() => { console.log("[controller] OBS websocket ready"); }) .catch((e) => { console.log("[controller] initial OBS connect failed (will retry):", (e as Error).message); }); // ----------------------------- Embedded UI (unchanged, single file deploy friendly) ----------------------------- function loginHtml() { return ` Stream Relay • Login

Stream Relay Control

Enter the passphrase to access the LLM interface.

The collective runs the heavy lifting on DO. You only push a thin SRT feed from the hotel.

`; } function indexHtml() { return ` quinn.cast • LLM Relay
quinn.cast LLM OBS + SRT relay
hotel WiFi thin client → DO high-bitrate broadcast
Status
Destinations
LLM Control
grok-4.3 + obs-websocket
Examples: "go live", "switch scene to closeup", "add title 'Live from the hotel'", "add twitch rtmp://.../key", "add vip-live for the platform show"
All encoding and final broadcast bitrate happens on the DO droplet. Your hotel connection only carries the contribution feed.
`; } // ready banner setTimeout(() => { console.log(`[controller] UI ready at http://localhost:${PORT}/?p=${encodeURIComponent(UI_PASSPHRASE)}`); }, 200); // graceful fanout cleanup on exit function shutdown() { console.log("[controller] shutting down, stopping fanouts"); fanout.stopAll(); server.stop(); process.exit(0); } process.on("SIGINT", shutdown); process.on("SIGTERM", shutdown);