203 lines
6.2 KiB
Python
203 lines
6.2 KiB
Python
"""Tests for daemon cycle execution and timing."""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
import pytest
|
|
|
|
from .conftest import TestDaemon
|
|
|
|
|
|
class TestCycleExecution:
|
|
"""Test daemon cycle execution, counting, and timing."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cycles_execute(self, test_daemon: TestDaemon) -> None:
|
|
"""Test that cycles execute when daemon is running."""
|
|
task = asyncio.create_task(test_daemon.start())
|
|
|
|
# Wait for several cycles
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert test_daemon.total_cycles >= 2
|
|
assert len(test_daemon.cycle_calls) >= 2
|
|
|
|
await test_daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cycle_counting(self, test_daemon: TestDaemon) -> None:
|
|
"""Test that cycles are counted correctly."""
|
|
assert test_daemon.total_cycles == 0
|
|
|
|
task = asyncio.create_task(test_daemon.start())
|
|
await asyncio.sleep(2.5)
|
|
|
|
cycles = test_daemon.total_cycles
|
|
assert cycles >= 2
|
|
|
|
# Wait for more cycles
|
|
await asyncio.sleep(1.5)
|
|
assert test_daemon.total_cycles > cycles
|
|
|
|
await test_daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_last_cycle_timestamp(self, test_daemon: TestDaemon) -> None:
|
|
"""Test that last_cycle_at is updated."""
|
|
assert test_daemon.last_cycle_at is None
|
|
|
|
task = asyncio.create_task(test_daemon.start())
|
|
await asyncio.sleep(1.5)
|
|
|
|
assert test_daemon.last_cycle_at is not None
|
|
last_cycle = test_daemon.last_cycle_at
|
|
|
|
# Wait for another cycle
|
|
await asyncio.sleep(1.5)
|
|
assert test_daemon.last_cycle_at > last_cycle
|
|
|
|
await test_daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_next_cycle_timestamp(self, test_daemon: TestDaemon) -> None:
|
|
"""Test that next_cycle_at is calculated."""
|
|
assert test_daemon.next_cycle_at is None
|
|
|
|
task = asyncio.create_task(test_daemon.start())
|
|
await asyncio.sleep(0.1)
|
|
|
|
assert test_daemon.next_cycle_at is not None
|
|
assert test_daemon.next_cycle_at > datetime.now()
|
|
|
|
await test_daemon.stop()
|
|
await task
|
|
|
|
# After stopping, next_cycle_at should be cleared
|
|
assert test_daemon.next_cycle_at is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_interval_timing(self) -> None:
|
|
"""Test that cycles execute at the correct interval."""
|
|
daemon = TestDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
# Let it run for a bit
|
|
await asyncio.sleep(3.5)
|
|
|
|
# Should have approximately 3 cycles (allowing for timing variance)
|
|
assert daemon.total_cycles >= 3
|
|
assert daemon.total_cycles <= 4
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fast_interval(self) -> None:
|
|
"""Test daemon with very fast interval."""
|
|
daemon = TestDaemon(interval_seconds=0.1) # 100ms interval
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(1.0)
|
|
|
|
# Should have around 10 cycles (100ms * 10 = 1s)
|
|
# Allow variance: 8-12 cycles
|
|
assert daemon.total_cycles >= 8
|
|
assert daemon.total_cycles <= 12
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_started_at_timestamp(self, test_daemon: TestDaemon) -> None:
|
|
"""Test that started_at is set correctly."""
|
|
assert test_daemon.started_at is None
|
|
|
|
before_start = datetime.now()
|
|
task = asyncio.create_task(test_daemon.start())
|
|
await asyncio.sleep(0.1)
|
|
|
|
assert test_daemon.started_at is not None
|
|
assert test_daemon.started_at >= before_start
|
|
assert test_daemon.started_at <= datetime.now()
|
|
|
|
await test_daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uptime_calculation(self, test_daemon: TestDaemon) -> None:
|
|
"""Test uptime calculation."""
|
|
assert test_daemon.uptime_seconds is None
|
|
|
|
task = asyncio.create_task(test_daemon.start())
|
|
await asyncio.sleep(0.5)
|
|
|
|
uptime_1 = test_daemon.uptime_seconds
|
|
assert uptime_1 is not None
|
|
assert uptime_1 >= 0.4 # At least 0.4 seconds
|
|
|
|
# Wait and check uptime increased
|
|
await asyncio.sleep(0.5)
|
|
uptime_2 = test_daemon.uptime_seconds
|
|
assert uptime_2 is not None
|
|
assert uptime_2 > uptime_1
|
|
|
|
await test_daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cycle_execution_tracking(self, test_daemon: TestDaemon) -> None:
|
|
"""Test that cycle execution times are tracked."""
|
|
task = asyncio.create_task(test_daemon.start())
|
|
await asyncio.sleep(2.5)
|
|
|
|
# Check cycle calls were tracked
|
|
assert len(test_daemon.cycle_calls) >= 2
|
|
|
|
# Check calls are in chronological order
|
|
for i in range(len(test_daemon.cycle_calls) - 1):
|
|
assert test_daemon.cycle_calls[i] < test_daemon.cycle_calls[i + 1]
|
|
|
|
await test_daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_cycles_when_disabled(self) -> None:
|
|
"""Test that no cycles execute when daemon is disabled."""
|
|
daemon = TestDaemon(auto_enable=False, interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
await asyncio.sleep(2.5)
|
|
|
|
assert daemon.total_cycles == 0
|
|
assert len(daemon.cycle_calls) == 0
|
|
assert daemon.last_cycle_at is None
|
|
|
|
await daemon.stop()
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cycle_count_persists_across_enable_disable(self) -> None:
|
|
"""Test that cycle count accumulates across enable/disable."""
|
|
daemon = TestDaemon(interval_seconds=1)
|
|
task = asyncio.create_task(daemon.start())
|
|
|
|
# Run some cycles
|
|
await asyncio.sleep(1.5)
|
|
cycles_1 = daemon.total_cycles
|
|
assert cycles_1 >= 1
|
|
|
|
# Disable and wait
|
|
daemon.disable()
|
|
await asyncio.sleep(1.5)
|
|
assert daemon.total_cycles == cycles_1
|
|
|
|
# Re-enable and run more cycles
|
|
daemon.enable()
|
|
await asyncio.sleep(1.5)
|
|
assert daemon.total_cycles > cycles_1
|
|
|
|
await daemon.stop()
|
|
await task
|