lilith-daemon-core/tests/test_examples.py
2026-01-21 12:48:35 -08:00

305 lines
9 KiB
Python

"""Test example daemon implementations."""
import asyncio
import pytest
from lilith_daemon_core import IntervalDaemon
class PeriodicTaskDaemon(IntervalDaemon):
"""Example: Periodic task execution daemon.
Executes a task at regular intervals, tracking results.
"""
def __init__(self, interval_seconds: int = 60):
super().__init__(interval_seconds=interval_seconds)
self.tasks_completed: list[str] = []
async def run_cycle(self) -> None:
"""Execute periodic task."""
result = await self.execute_task()
self.tasks_completed.append(result)
async def execute_task(self) -> str:
"""Simulate task execution."""
await asyncio.sleep(0.01)
return f"Task completed at cycle {self.total_cycles + 1}"
class DataSyncDaemon(IntervalDaemon):
"""Example: Data synchronization daemon.
Periodically syncs data between systems with error recovery.
"""
def __init__(self, interval_seconds: int = 300):
super().__init__(interval_seconds=interval_seconds)
self.sync_count = 0
self.sync_errors = 0
self.last_sync_status = "never"
async def run_cycle(self) -> None:
"""Execute data sync."""
try:
await self.sync_data()
self.sync_count += 1
self.last_sync_status = "success"
except Exception as e:
self.last_sync_status = f"error: {e}"
raise
async def sync_data(self) -> None:
"""Simulate data synchronization."""
await asyncio.sleep(0.01)
async def on_error(self, error: Exception) -> bool:
"""Track sync errors and continue."""
self.sync_errors += 1
return True # Continue despite errors
class HealthCheckDaemon(IntervalDaemon):
"""Example: Health check monitoring daemon.
Periodically checks service health and reports status.
"""
def __init__(self, interval_seconds: int = 30):
super().__init__(interval_seconds=interval_seconds)
self.health_checks: list[bool] = []
self.consecutive_failures = 0
async def run_cycle(self) -> None:
"""Execute health check."""
is_healthy = await self.check_health()
self.health_checks.append(is_healthy)
if not is_healthy:
self.consecutive_failures += 1
else:
self.consecutive_failures = 0
async def check_health(self) -> bool:
"""Simulate health check."""
await asyncio.sleep(0.01)
return True
async def on_start(self) -> None:
"""Log health check daemon start."""
self.health_checks = []
self.consecutive_failures = 0
async def on_stop(self) -> None:
"""Generate health report on stop."""
total = len(self.health_checks)
healthy = sum(self.health_checks)
# Report would be generated here
pass
class ResourceCleanupDaemon(IntervalDaemon):
"""Example: Resource cleanup daemon.
Periodically cleans up old resources with configurable thresholds.
"""
def __init__(self, interval_seconds: int = 3600):
super().__init__(interval_seconds=interval_seconds)
self.cleanup_count = 0
self.resources_cleaned = 0
async def run_cycle(self) -> None:
"""Execute cleanup cycle."""
cleaned = await self.cleanup_resources()
self.resources_cleaned += cleaned
self.cleanup_count += 1
async def cleanup_resources(self) -> int:
"""Simulate resource cleanup."""
await asyncio.sleep(0.01)
return 3 # Simulated cleaned resources
class MetricsCollectorDaemon(IntervalDaemon):
"""Example: Metrics collection daemon.
Collects and reports metrics at regular intervals.
"""
def __init__(self, interval_seconds: int = 60):
super().__init__(interval_seconds=interval_seconds, auto_enable=True)
self.metrics_collected: list[dict[str, float]] = []
async def run_cycle(self) -> None:
"""Collect metrics."""
metrics = await self.collect_metrics()
self.metrics_collected.append(metrics)
async def collect_metrics(self) -> dict[str, float]:
"""Simulate metrics collection."""
await asyncio.sleep(0.01)
return {
"cpu": 45.5,
"memory": 67.8,
"disk": 23.4,
}
class TestExamples:
"""Test example daemon implementations."""
@pytest.mark.asyncio
async def test_periodic_task_daemon(self) -> None:
"""Test periodic task daemon example."""
daemon = PeriodicTaskDaemon(interval_seconds=1)
task = asyncio.create_task(daemon.start())
await asyncio.sleep(2.5)
assert len(daemon.tasks_completed) >= 2
assert "Task completed at cycle" in daemon.tasks_completed[0]
await daemon.stop()
await task
@pytest.mark.asyncio
async def test_data_sync_daemon(self) -> None:
"""Test data sync daemon example."""
daemon = DataSyncDaemon(interval_seconds=1)
task = asyncio.create_task(daemon.start())
await asyncio.sleep(2.5)
assert daemon.sync_count >= 2
assert daemon.last_sync_status == "success"
await daemon.stop()
await task
@pytest.mark.asyncio
async def test_data_sync_daemon_with_errors(self) -> None:
"""Test data sync daemon error recovery."""
daemon = DataSyncDaemon(interval_seconds=1)
# Make sync fail
async def failing_sync() -> None:
raise RuntimeError("Sync failed")
daemon.sync_data = failing_sync # type: ignore[method-assign]
task = asyncio.create_task(daemon.start())
await asyncio.sleep(2.5)
# Should have tracked errors but continued
assert daemon.sync_errors >= 2
assert "error:" in daemon.last_sync_status
await daemon.stop()
await task
@pytest.mark.asyncio
async def test_health_check_daemon(self) -> None:
"""Test health check daemon example."""
daemon = HealthCheckDaemon(interval_seconds=1)
task = asyncio.create_task(daemon.start())
await asyncio.sleep(2.5)
assert len(daemon.health_checks) >= 2
assert daemon.consecutive_failures == 0
await daemon.stop()
await task
@pytest.mark.asyncio
async def test_health_check_daemon_with_failures(self) -> None:
"""Test health check daemon failure tracking."""
daemon = HealthCheckDaemon(interval_seconds=1)
# Make health check fail
async def failing_check() -> bool:
return False
daemon.check_health = failing_check # type: ignore[method-assign]
task = asyncio.create_task(daemon.start())
await asyncio.sleep(2.5)
assert all(not check for check in daemon.health_checks)
assert daemon.consecutive_failures >= 2
await daemon.stop()
await task
@pytest.mark.asyncio
async def test_resource_cleanup_daemon(self) -> None:
"""Test resource cleanup daemon example."""
daemon = ResourceCleanupDaemon(interval_seconds=1)
task = asyncio.create_task(daemon.start())
await asyncio.sleep(2.5)
assert daemon.cleanup_count >= 2
assert daemon.resources_cleaned >= 6 # 3 per cycle * 2 cycles
await daemon.stop()
await task
@pytest.mark.asyncio
async def test_metrics_collector_daemon(self) -> None:
"""Test metrics collector daemon example."""
daemon = MetricsCollectorDaemon(interval_seconds=1)
task = asyncio.create_task(daemon.start())
await asyncio.sleep(2.5)
assert len(daemon.metrics_collected) >= 2
assert "cpu" in daemon.metrics_collected[0]
assert "memory" in daemon.metrics_collected[0]
assert "disk" in daemon.metrics_collected[0]
await daemon.stop()
await task
@pytest.mark.asyncio
async def test_multiple_daemons_concurrently(self) -> None:
"""Test running multiple daemon types concurrently."""
daemon1 = PeriodicTaskDaemon(interval_seconds=1)
daemon2 = MetricsCollectorDaemon(interval_seconds=1)
task1 = asyncio.create_task(daemon1.start())
task2 = asyncio.create_task(daemon2.start())
await asyncio.sleep(2.5)
# Both should be running
assert daemon1.is_running
assert daemon2.is_running
# Both should have completed cycles
assert len(daemon1.tasks_completed) >= 2
assert len(daemon2.metrics_collected) >= 2
await daemon1.stop()
await daemon2.stop()
await task1
await task2
@pytest.mark.asyncio
async def test_daemon_with_lifecycle_hooks(self) -> None:
"""Test daemon with full lifecycle hooks."""
daemon = HealthCheckDaemon(interval_seconds=1)
task = asyncio.create_task(daemon.start())
await asyncio.sleep(1.5)
# Health checks should be initialized
assert len(daemon.health_checks) >= 1
await daemon.stop()
await task
# on_stop should have been called (implicitly tested)
assert daemon.state.value == "stopped"