305 lines
9 KiB
Python
305 lines
9 KiB
Python
"""Test example daemon implementations."""
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from lilith_daemon_core import IntervalDaemon
|
|
|
|
|
|
class PeriodicTaskDaemon(IntervalDaemon):
|
|
"""Example: Periodic task execution daemon.
|
|
|
|
Executes a task at regular intervals, tracking results.
|
|
"""
|
|
|
|
def __init__(self, interval_seconds: int = 60):
|
|
super().__init__(interval_seconds=interval_seconds)
|
|
self.tasks_completed: list[str] = []
|
|
|
|
async def run_cycle(self) -> None:
|
|
"""Execute periodic task."""
|
|
result = await self.execute_task()
|
|
self.tasks_completed.append(result)
|
|
|
|
async def execute_task(self) -> str:
|
|
"""Simulate task execution."""
|
|
await asyncio.sleep(0.01)
|
|
return f"Task completed at cycle {self.total_cycles + 1}"
|
|
|
|
|
|
class DataSyncDaemon(IntervalDaemon):
|
|
"""Example: Data synchronization daemon.
|
|
|
|
Periodically syncs data between systems with error recovery.
|
|
"""
|
|
|
|
def __init__(self, interval_seconds: int = 300):
|
|
super().__init__(interval_seconds=interval_seconds)
|
|
self.sync_count = 0
|
|
self.sync_errors = 0
|
|
self.last_sync_status = "never"
|
|
|
|
async def run_cycle(self) -> None:
|
|
"""Execute data sync."""
|
|
try:
|
|
await self.sync_data()
|
|
self.sync_count += 1
|
|
self.last_sync_status = "success"
|
|
except Exception as e:
|
|
self.last_sync_status = f"error: {e}"
|
|
raise
|
|
|
|
async def sync_data(self) -> None:
|
|
"""Simulate data synchronization."""
|
|
await asyncio.sleep(0.01)
|
|
|
|
async def on_error(self, error: Exception) -> bool:
|
|
"""Track sync errors and continue."""
|
|
self.sync_errors += 1
|
|
return True # Continue despite errors
|
|
|
|
|
|
class HealthCheckDaemon(IntervalDaemon):
|
|
"""Example: Health check monitoring daemon.
|
|
|
|
Periodically checks service health and reports status.
|
|
"""
|
|
|
|
def __init__(self, interval_seconds: int = 30):
|
|
super().__init__(interval_seconds=interval_seconds)
|
|
self.health_checks: list[bool] = []
|
|
self.consecutive_failures = 0
|
|
|
|
async def run_cycle(self) -> None:
|
|
"""Execute health check."""
|
|
is_healthy = await self.check_health()
|
|
self.health_checks.append(is_healthy)
|
|
|
|
if not is_healthy:
|
|
self.consecutive_failures += 1
|
|
else:
|
|
self.consecutive_failures = 0
|
|
|
|
async def check_health(self) -> bool:
|
|
"""Simulate health check."""
|
|
await asyncio.sleep(0.01)
|
|
return True
|
|
|
|
async def on_start(self) -> None:
|
|
"""Log health check daemon start."""
|
|
self.health_checks = []
|
|
self.consecutive_failures = 0
|
|
|
|
async def on_stop(self) -> None:
|
|
"""Generate health report on stop."""
|
|
total = len(self.health_checks)
|
|
healthy = sum(self.health_checks)
|
|
# Report would be generated here
|
|
pass
|
|
|
|
|
|
class ResourceCleanupDaemon(IntervalDaemon):
|
|
"""Example: Resource cleanup daemon.
|
|
|
|
Periodically cleans up old resources with configurable thresholds.
|
|
"""
|
|
|
|
def __init__(self, interval_seconds: int = 3600):
|
|
super().__init__(interval_seconds=interval_seconds)
|
|
self.cleanup_count = 0
|
|
self.resources_cleaned = 0
|
|
|
|
async def run_cycle(self) -> None:
|
|
"""Execute cleanup cycle."""
|
|
cleaned = await self.cleanup_resources()
|
|
self.resources_cleaned += cleaned
|
|
self.cleanup_count += 1
|
|
|
|
async def cleanup_resources(self) -> int:
|
|
"""Simulate resource cleanup."""
|
|
await asyncio.sleep(0.01)
|
|
return 3 # Simulated cleaned resources
|
|
|
|
|
|
class MetricsCollectorDaemon(IntervalDaemon):
|
|
"""Example: Metrics collection daemon.
|
|
|
|
Collects and reports metrics at regular intervals.
|
|
"""
|
|
|
|
def __init__(self, interval_seconds: int = 60):
|
|
super().__init__(interval_seconds=interval_seconds, auto_enable=True)
|
|
self.metrics_collected: list[dict[str, float]] = []
|
|
|
|
async def run_cycle(self) -> None:
|
|
"""Collect metrics."""
|
|
metrics = await self.collect_metrics()
|
|
self.metrics_collected.append(metrics)
|
|
|
|
async def collect_metrics(self) -> dict[str, float]:
|
|
"""Simulate metrics collection."""
|
|
await asyncio.sleep(0.01)
|
|
return {
|
|
"cpu": 45.5,
|
|
"memory": 67.8,
|
|
"disk": 23.4,
|
|
}
|
|
|
|
|
|
class TestExamples:
|
|
"""Test example daemon implementations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_periodic_task_daemon(self) -> None:
|
|
"""Test periodic task daemon example."""
|
|
daemon = PeriodicTaskDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert len(daemon.tasks_completed) >= 2
|
|
assert "Task completed at cycle" in daemon.tasks_completed[0]
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_data_sync_daemon(self) -> None:
|
|
"""Test data sync daemon example."""
|
|
daemon = DataSyncDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert daemon.sync_count >= 2
|
|
assert daemon.last_sync_status == "success"
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_data_sync_daemon_with_errors(self) -> None:
|
|
"""Test data sync daemon error recovery."""
|
|
daemon = DataSyncDaemon(interval_seconds=1)
|
|
|
|
# Make sync fail
|
|
async def failing_sync() -> None:
|
|
raise RuntimeError("Sync failed")
|
|
|
|
daemon.sync_data = failing_sync # type: ignore[method-assign]
|
|
|
|
task = asyncio.create_task(daemon.start())
|
|
await asyncio.sleep(2.5)
|
|
|
|
# Should have tracked errors but continued
|
|
assert daemon.sync_errors >= 2
|
|
assert "error:" in daemon.last_sync_status
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_check_daemon(self) -> None:
|
|
"""Test health check daemon example."""
|
|
daemon = HealthCheckDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert len(daemon.health_checks) >= 2
|
|
assert daemon.consecutive_failures == 0
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_check_daemon_with_failures(self) -> None:
|
|
"""Test health check daemon failure tracking."""
|
|
daemon = HealthCheckDaemon(interval_seconds=1)
|
|
|
|
# Make health check fail
|
|
async def failing_check() -> bool:
|
|
return False
|
|
|
|
daemon.check_health = failing_check # type: ignore[method-assign]
|
|
|
|
task = asyncio.create_task(daemon.start())
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert all(not check for check in daemon.health_checks)
|
|
assert daemon.consecutive_failures >= 2
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resource_cleanup_daemon(self) -> None:
|
|
"""Test resource cleanup daemon example."""
|
|
daemon = ResourceCleanupDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert daemon.cleanup_count >= 2
|
|
assert daemon.resources_cleaned >= 6 # 3 per cycle * 2 cycles
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_metrics_collector_daemon(self) -> None:
|
|
"""Test metrics collector daemon example."""
|
|
daemon = MetricsCollectorDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert len(daemon.metrics_collected) >= 2
|
|
assert "cpu" in daemon.metrics_collected[0]
|
|
assert "memory" in daemon.metrics_collected[0]
|
|
assert "disk" in daemon.metrics_collected[0]
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_daemons_concurrently(self) -> None:
|
|
"""Test running multiple daemon types concurrently."""
|
|
daemon1 = PeriodicTaskDaemon(interval_seconds=1)
|
|
daemon2 = MetricsCollectorDaemon(interval_seconds=1)
|
|
|
|
task1 = asyncio.create_task(daemon1.start())
|
|
task2 = asyncio.create_task(daemon2.start())
|
|
|
|
await asyncio.sleep(2.5)
|
|
|
|
# Both should be running
|
|
assert daemon1.is_running
|
|
assert daemon2.is_running
|
|
|
|
# Both should have completed cycles
|
|
assert len(daemon1.tasks_completed) >= 2
|
|
assert len(daemon2.metrics_collected) >= 2
|
|
|
|
await daemon1.stop()
|
|
await daemon2.stop()
|
|
await task1
|
|
await task2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_daemon_with_lifecycle_hooks(self) -> None:
|
|
"""Test daemon with full lifecycle hooks."""
|
|
daemon = HealthCheckDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(1.5)
|
|
|
|
# Health checks should be initialized
|
|
assert len(daemon.health_checks) >= 1
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
# on_stop should have been called (implicitly tested)
|
|
assert daemon.state.value == "stopped"
|