#!/usr/bin/env python3 """Life Manager Remote Management Daemon. Lightweight HTTP control API for managing life-manager services remotely. Uses stdlib-only asyncio HTTP server (no pip dependencies). Binds to 0.0.0.0:3710 for LAN access. Wraps systemctl --user and docker compose commands to provide start/stop/restart/status over HTTP. Auth: If LM_DAEMON_TOKEN is set, POST endpoints require Bearer token. GET endpoints are always open (no sensitive data exposed). """ import asyncio import json import logging import os import signal import time # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- PORT = int(os.environ.get("LM_DAEMON_PORT", "3720")) AUTH_TOKEN = os.environ.get("LM_DAEMON_TOKEN", "") INSTALL_DIR = os.path.expanduser("~/.local/share/life-platform") UNITS = [ "life-platform-api.service", "life-platform-caddy.service", "life-platform-vram.service", "life-ai.service", ] UNIT_NAMES = { "life-platform-api.service": "api", "life-platform-caddy.service": "caddy", "life-platform-vram.service": "vram", "life-ai.service": "ai", } CONTAINERS = ["life-platform-postgres", "life-platform-redis"] CONTAINER_NAMES = { "life-platform-postgres": "postgres", "life-platform-redis": "redis", } RATE_LIMIT_SECONDS = 5 log = logging.getLogger("lm-daemon") # --------------------------------------------------------------------------- # Rate limiting # --------------------------------------------------------------------------- _last_action_time: float = 0.0 def check_rate_limit() -> bool: """Allow at most one POST action per RATE_LIMIT_SECONDS. Returns True if allowed.""" global _last_action_time now = time.monotonic() if now - _last_action_time < RATE_LIMIT_SECONDS: return False _last_action_time = now return True # --------------------------------------------------------------------------- # Subprocess helpers # --------------------------------------------------------------------------- async def run_cmd(*args: str, timeout: float = 30.0) -> tuple[int, str, str]: """Run a command asynchronously with timeout.""" proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) return proc.returncode or 0, stdout.decode().strip(), stderr.decode().strip() except asyncio.TimeoutError: proc.kill() await proc.wait() return 1, "", "timeout" async def systemctl(*args: str) -> tuple[int, str, str]: """Run a systemctl --user command.""" return await run_cmd("systemctl", "--user", *args) # --------------------------------------------------------------------------- # Status # --------------------------------------------------------------------------- async def get_service_status(unit: str) -> dict: """Get the active state of a systemd user unit.""" _, stdout, _ = await systemctl("is-active", unit) state = stdout if stdout else "unknown" return {"active": state == "active", "state": state} async def get_container_status(name: str) -> dict: """Get the running/health state of a docker container.""" code, stdout, _ = await run_cmd( "docker", "inspect", "--format", "{{.State.Running}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}none{{end}}", name, ) if code != 0: return {"running": False, "health": "not_found"} parts = stdout.split("|", 1) running = parts[0].lower() == "true" health = parts[1] if len(parts) > 1 else "unknown" return {"running": running, "health": health} async def get_full_status() -> dict: """Gather status of all services and containers concurrently.""" tasks = [get_service_status(u) for u in UNITS] + [ get_container_status(c) for c in CONTAINERS ] results = await asyncio.gather(*tasks) services = {} for i, unit in enumerate(UNITS): services[UNIT_NAMES[unit]] = results[i] containers = {} offset = len(UNITS) for i, name in enumerate(CONTAINERS): containers[CONTAINER_NAMES[name]] = results[offset + i] all_services_up = all(s["active"] for s in services.values()) all_containers_up = all(c["running"] for c in containers.values()) any_up = any(s["active"] for s in services.values()) or any( c["running"] for c in containers.values() ) if all_services_up and all_containers_up: overall = "healthy" elif any_up: overall = "degraded" else: overall = "stopped" return {"services": services, "containers": containers, "overall": overall} # --------------------------------------------------------------------------- # Actions # --------------------------------------------------------------------------- async def action_start() -> dict: """Start docker infra then systemd app services.""" results = {} code, _, stderr = await run_cmd( "docker", "compose", "--env-file", ".env.production", "up", "-d", timeout=60.0, ) results["docker"] = {"success": code == 0} if code != 0: results["docker"]["error"] = stderr for unit in UNITS: code, _, stderr = await systemctl("start", unit) name = UNIT_NAMES[unit] results[name] = {"success": code == 0} if code != 0: results[name]["error"] = stderr return { "action": "start", "success": all(r["success"] for r in results.values()), "results": results, } async def action_stop() -> dict: """Stop app services (docker infra stays running).""" results = {} for unit in UNITS: code, _, stderr = await systemctl("stop", unit) name = UNIT_NAMES[unit] results[name] = {"success": code == 0} if code != 0: results[name]["error"] = stderr return { "action": "stop", "success": all(r["success"] for r in results.values()), "results": results, } async def action_restart() -> dict: """Restart app services.""" results = {} for unit in UNITS: code, _, stderr = await systemctl("restart", unit) name = UNIT_NAMES[unit] results[name] = {"success": code == 0} if code != 0: results[name]["error"] = stderr return { "action": "restart", "success": all(r["success"] for r in results.values()), "results": results, } # --------------------------------------------------------------------------- # HTTP Server # --------------------------------------------------------------------------- STATUS_TEXT = { 200: "OK", 401: "Unauthorized", 404: "Not Found", 405: "Method Not Allowed", 429: "Too Many Requests", 500: "Internal Server Error", } def parse_request(data: bytes) -> tuple[str, str, dict[str, str]]: """Parse raw HTTP request bytes into (method, path, headers).""" text = data.decode("utf-8", errors="replace") lines = text.split("\r\n") parts = lines[0].split(" ", 2) method = parts[0] if parts else "" path = parts[1] if len(parts) >= 2 else "" headers: dict[str, str] = {} for line in lines[1:]: if not line: break if ":" in line: key, value = line.split(":", 1) headers[key.strip().lower()] = value.strip() return method, path, headers def build_response(status: int, body: dict) -> bytes: """Build a raw HTTP/1.1 response from status code and JSON body.""" body_bytes = json.dumps(body).encode("utf-8") header = ( f"HTTP/1.1 {status} {STATUS_TEXT.get(status, 'Unknown')}\r\n" f"Content-Type: application/json\r\n" f"Content-Length: {len(body_bytes)}\r\n" f"Connection: close\r\n" f"\r\n" ) return header.encode("utf-8") + body_bytes def check_auth(headers: dict[str, str]) -> bool: """Validate bearer token if AUTH_TOKEN is configured.""" if not AUTH_TOKEN: return True return headers.get("authorization", "") == f"Bearer {AUTH_TOKEN}" # Route table: (method, path) -> handler name ROUTES: dict[tuple[str, str], str] = { ("GET", "/health"): "handle_health", ("GET", "/status"): "handle_status", ("POST", "/start"): "handle_start", ("POST", "/stop"): "handle_stop", ("POST", "/restart"): "handle_restart", } KNOWN_PATHS = {path for _, path in ROUTES} async def handle_health() -> dict: return {"status": "ok", "service": "life-manager-daemon"} async def handle_status() -> dict: return await get_full_status() async def handle_start() -> dict: return await action_start() async def handle_stop() -> dict: return await action_stop() async def handle_restart() -> dict: return await action_restart() HANDLERS = { "handle_health": handle_health, "handle_status": handle_status, "handle_start": handle_start, "handle_stop": handle_stop, "handle_restart": handle_restart, } async def handle_client( reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: """Handle a single HTTP connection.""" try: data = await asyncio.wait_for(reader.read(8192), timeout=5.0) if not data: return method, path, headers = parse_request(data) path = path.split("?", 1)[0] route_key = (method, path) if route_key not in ROUTES: if path in KNOWN_PATHS: response = build_response(405, {"error": "method not allowed"}) else: response = build_response(404, {"error": "not found"}) elif method == "POST" and not check_auth(headers): response = build_response(401, {"error": "unauthorized"}) elif method == "POST" and not check_rate_limit(): response = build_response( 429, {"error": "rate limited", "retry_after": RATE_LIMIT_SECONDS} ) else: try: handler = HANDLERS[ROUTES[route_key]] body = await handler() response = build_response(200, body) except Exception as exc: log.exception("Handler error") response = build_response(500, {"error": str(exc)}) writer.write(response) await writer.drain() except (asyncio.TimeoutError, ConnectionError): pass finally: writer.close() try: await writer.wait_closed() except (ConnectionError, BrokenPipeError): pass # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", ) os.chdir(INSTALL_DIR) log.info("Working directory: %s", INSTALL_DIR) if AUTH_TOKEN: log.info("Bearer token authentication enabled") else: log.warning("No LM_DAEMON_TOKEN set - POST endpoints are unprotected") server = await asyncio.start_server(handle_client, "0.0.0.0", PORT) log.info("Daemon listening on 0.0.0.0:%d", PORT) loop = asyncio.get_running_loop() stop = loop.create_future() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda: stop.set_result(None) if not stop.done() else None ) async with server: await stop log.info("Shutting down") if __name__ == "__main__": asyncio.run(main())