No description
Find a file
autocommit 3dd4f187fd
Some checks failed
Publish / publish (push) Failing after 0s
deps-upgrade(deps): ⬆️ Update dependencies to latest stable versions in pyproject.toml
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-04-12 00:20:21 -07:00
.forgejo/workflows chore: initial commit with DRY workflow 2026-01-21 12:48:35 -08:00
dist chore: initial commit with DRY workflow 2026-01-21 12:48:35 -08:00
src/lilith_daemon_core chore: initial commit with DRY workflow 2026-01-21 12:48:35 -08:00
tests chore: initial commit with DRY workflow 2026-01-21 12:48:35 -08:00
pyproject.toml deps-upgrade(deps): ⬆️ Update dependencies to latest stable versions in pyproject.toml 2026-04-12 00:20:21 -07:00
README.md chore: initial commit with DRY workflow 2026-01-21 12:48:35 -08:00

lilith-daemon-core

Generic interval-based daemon infrastructure with lifecycle management.

Features

  • Interval-based execution: Run tasks on a fixed schedule
  • State management: Track daemon lifecycle (stopped, starting, running, stopping)
  • Enable/disable: Pause/resume without stopping the daemon
  • Graceful shutdown: Clean lifecycle hooks for startup and shutdown
  • Error handling: Configurable error recovery strategies
  • Statistics tracking: Monitor cycles, uptime, and timing

Installation

pip install lilith-daemon-core

Quick Start

import asyncio
from lilith_daemon_core import IntervalDaemon

class MyDaemon(IntervalDaemon):
    def __init__(self):
        super().__init__(interval_seconds=60)  # Run every 60 seconds

    async def run_cycle(self):
        """Execute one cycle of work."""
        print("Running cycle...")
        # Do your work here

# Run the daemon
async def main():
    daemon = MyDaemon()
    await daemon.start()  # Runs forever

asyncio.run(main())

Usage

Basic Daemon

from lilith_daemon_core import IntervalDaemon

class MyDaemon(IntervalDaemon):
    async def run_cycle(self):
        """Required: Implement your daemon's work here."""
        print(f"Cycle {self.total_cycles + 1}")

daemon = MyDaemon(interval_seconds=30)
await daemon.start()

With Lifecycle Hooks

class MyDaemon(IntervalDaemon):
    async def on_start(self):
        """Called before daemon loop starts."""
        print("Daemon starting up...")
        # Initialize resources

    async def on_stop(self):
        """Called after daemon loop stops."""
        print("Daemon shutting down...")
        # Clean up resources

    async def run_cycle(self):
        print("Running...")

Error Handling

class MyDaemon(IntervalDaemon):
    async def on_error(self, error: Exception) -> bool:
        """Handle errors during cycle execution.

        Returns:
            True to continue running
            False to re-raise and stop
        """
        if isinstance(error, ConnectionError):
            print(f"Connection error, retrying: {error}")
            return True  # Continue
        else:
            print(f"Fatal error: {error}")
            return False  # Stop daemon

    async def run_cycle(self):
        # May raise exceptions
        await risky_operation()

Enable/Disable

daemon = MyDaemon(interval_seconds=60)

# Start disabled
daemon = MyDaemon(interval_seconds=60, auto_enable=False)
await daemon.start()  # Runs but skips cycles

# Enable later
daemon.enable()  # Now cycles will execute

# Disable without stopping
daemon.disable()  # Pauses cycles

Graceful Shutdown

import signal

daemon = MyDaemon(interval_seconds=30)

# Start daemon in background
task = asyncio.create_task(daemon.start())

# Stop gracefully on SIGINT
def handle_signal():
    asyncio.create_task(daemon.stop())

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)

await task

API Reference

IntervalDaemon

Base class for interval-based daemons.

Constructor

IntervalDaemon(
    interval_seconds: int,
    daemon_id: str | None = None,
    auto_enable: bool = True,
)

Parameters:

  • interval_seconds: Time between cycle executions
  • daemon_id: Optional unique identifier
  • auto_enable: Start enabled (default: True)

Methods

Lifecycle:

  • async start(): Start daemon loop
  • async stop(): Stop daemon gracefully
  • enable(): Enable cycle execution
  • disable(): Disable cycle execution

Required Override:

  • async run_cycle(): Implement your daemon's work

Optional Hooks:

  • async on_start(): Called before loop starts
  • async on_stop(): Called after loop stops
  • async on_error(error) -> bool: Handle cycle errors

Properties

State:

  • state: DaemonState: Current lifecycle state
  • is_running: bool: Whether daemon is running
  • is_enabled: bool: Whether cycles are enabled

Timing:

  • next_cycle_at: datetime | None: Next scheduled cycle
  • last_cycle_at: datetime | None: Last cycle execution
  • started_at: datetime | None: Daemon start time
  • uptime_seconds: float | None: Seconds since start

Statistics:

  • total_cycles: int: Number of cycles executed

DaemonState

Enum of daemon lifecycle states:

  • STOPPED: Daemon not running
  • STARTING: Daemon initializing
  • RUNNING: Daemon loop active
  • STOPPING: Daemon shutting down
  • DISABLED: Daemon paused

Advanced Examples

Multi-Instance with IDs

daemon1 = MyDaemon(interval_seconds=30, daemon_id="daemon-1")
daemon2 = MyDaemon(interval_seconds=60, daemon_id="daemon-2")

await asyncio.gather(
    daemon1.start(),
    daemon2.start(),
)

Monitoring

class MonitoredDaemon(IntervalDaemon):
    async def run_cycle(self):
        print(f"Uptime: {self.uptime_seconds:.1f}s")
        print(f"Cycles: {self.total_cycles}")
        print(f"Next: {self.next_cycle_at}")
  • lilith-daemon-registry: Multi-instance daemon management
  • lilith-activity-logger: Structured activity logging
  • lilith-file-watcher-daemon: File-watching daemon built on this base

License

MIT