No description
|
Some checks failed
Publish / publish (push) Failing after 0s
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com> |
||
|---|---|---|
| .forgejo/workflows | ||
| dist | ||
| src/lilith_daemon_core | ||
| tests | ||
| pyproject.toml | ||
| README.md | ||
lilith-daemon-core
Generic interval-based daemon infrastructure with lifecycle management.
Features
- Interval-based execution: Run tasks on a fixed schedule
- State management: Track daemon lifecycle (stopped, starting, running, stopping)
- Enable/disable: Pause/resume without stopping the daemon
- Graceful shutdown: Clean lifecycle hooks for startup and shutdown
- Error handling: Configurable error recovery strategies
- Statistics tracking: Monitor cycles, uptime, and timing
Installation
pip install lilith-daemon-core
Quick Start
import asyncio
from lilith_daemon_core import IntervalDaemon
class MyDaemon(IntervalDaemon):
def __init__(self):
super().__init__(interval_seconds=60) # Run every 60 seconds
async def run_cycle(self):
"""Execute one cycle of work."""
print("Running cycle...")
# Do your work here
# Run the daemon
async def main():
daemon = MyDaemon()
await daemon.start() # Runs forever
asyncio.run(main())
Usage
Basic Daemon
from lilith_daemon_core import IntervalDaemon
class MyDaemon(IntervalDaemon):
async def run_cycle(self):
"""Required: Implement your daemon's work here."""
print(f"Cycle {self.total_cycles + 1}")
daemon = MyDaemon(interval_seconds=30)
await daemon.start()
With Lifecycle Hooks
class MyDaemon(IntervalDaemon):
async def on_start(self):
"""Called before daemon loop starts."""
print("Daemon starting up...")
# Initialize resources
async def on_stop(self):
"""Called after daemon loop stops."""
print("Daemon shutting down...")
# Clean up resources
async def run_cycle(self):
print("Running...")
Error Handling
class MyDaemon(IntervalDaemon):
async def on_error(self, error: Exception) -> bool:
"""Handle errors during cycle execution.
Returns:
True to continue running
False to re-raise and stop
"""
if isinstance(error, ConnectionError):
print(f"Connection error, retrying: {error}")
return True # Continue
else:
print(f"Fatal error: {error}")
return False # Stop daemon
async def run_cycle(self):
# May raise exceptions
await risky_operation()
Enable/Disable
daemon = MyDaemon(interval_seconds=60)
# Start disabled
daemon = MyDaemon(interval_seconds=60, auto_enable=False)
await daemon.start() # Runs but skips cycles
# Enable later
daemon.enable() # Now cycles will execute
# Disable without stopping
daemon.disable() # Pauses cycles
Graceful Shutdown
import signal
daemon = MyDaemon(interval_seconds=30)
# Start daemon in background
task = asyncio.create_task(daemon.start())
# Stop gracefully on SIGINT
def handle_signal():
asyncio.create_task(daemon.stop())
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)
await task
API Reference
IntervalDaemon
Base class for interval-based daemons.
Constructor
IntervalDaemon(
interval_seconds: int,
daemon_id: str | None = None,
auto_enable: bool = True,
)
Parameters:
interval_seconds: Time between cycle executionsdaemon_id: Optional unique identifierauto_enable: Start enabled (default: True)
Methods
Lifecycle:
async start(): Start daemon loopasync stop(): Stop daemon gracefullyenable(): Enable cycle executiondisable(): Disable cycle execution
Required Override:
async run_cycle(): Implement your daemon's work
Optional Hooks:
async on_start(): Called before loop startsasync on_stop(): Called after loop stopsasync on_error(error) -> bool: Handle cycle errors
Properties
State:
state: DaemonState: Current lifecycle stateis_running: bool: Whether daemon is runningis_enabled: bool: Whether cycles are enabled
Timing:
next_cycle_at: datetime | None: Next scheduled cyclelast_cycle_at: datetime | None: Last cycle executionstarted_at: datetime | None: Daemon start timeuptime_seconds: float | None: Seconds since start
Statistics:
total_cycles: int: Number of cycles executed
DaemonState
Enum of daemon lifecycle states:
STOPPED: Daemon not runningSTARTING: Daemon initializingRUNNING: Daemon loop activeSTOPPING: Daemon shutting downDISABLED: Daemon paused
Advanced Examples
Multi-Instance with IDs
daemon1 = MyDaemon(interval_seconds=30, daemon_id="daemon-1")
daemon2 = MyDaemon(interval_seconds=60, daemon_id="daemon-2")
await asyncio.gather(
daemon1.start(),
daemon2.start(),
)
Monitoring
class MonitoredDaemon(IntervalDaemon):
async def run_cycle(self):
print(f"Uptime: {self.uptime_seconds:.1f}s")
print(f"Cycles: {self.total_cycles}")
print(f"Next: {self.next_cycle_at}")
Related Packages
- lilith-daemon-registry: Multi-instance daemon management
- lilith-activity-logger: Structured activity logging
- lilith-file-watcher-daemon: File-watching daemon built on this base
License
MIT