#!/usr/bin/env python3 """Example usage of FileWatcherDaemon.""" import asyncio import tempfile from pathlib import Path from lilith_file_watcher_daemon import FileChange, FileWatcherDaemon async def on_changes(changes: list[FileChange]): """Callback for file changes.""" print(f"\nšŸ“ Detected {len(changes)} change(s):") for change in changes: print(f" {change.change_type.value:8s} {change.path}") async def main(): # Create a temporary directory to watch with tempfile.TemporaryDirectory() as tmpdir: watch_path = Path(tmpdir) print(f"Watching directory: {watch_path}") # Create daemon daemon = FileWatcherDaemon( watch_paths=[watch_path], patterns=["**/*.txt"], callback=on_changes, debounce_ms=500, ) # Start daemon in background daemon_task = asyncio.create_task(daemon.start()) # Wait for daemon to start await asyncio.sleep(0.5) print("āœ“ Daemon started") # Create some test files print("\nšŸ”Ø Creating test files...") (watch_path / "file1.txt").write_text("Content 1") await asyncio.sleep(0.1) (watch_path / "file2.txt").write_text("Content 2") # Wait for changes to be detected and debounced await asyncio.sleep(1) # Modify a file print("\nšŸ”Ø Modifying file...") (watch_path / "file1.txt").write_text("Updated content") # Wait for changes await asyncio.sleep(1) # Delete a file print("\nšŸ”Ø Deleting file...") (watch_path / "file2.txt").unlink() # Wait for changes await asyncio.sleep(1) # Stop daemon print("\nšŸ›‘ Stopping daemon...") await daemon.stop() daemon_task.cancel() try: await daemon_task except asyncio.CancelledError: pass print("āœ“ Daemon stopped") print(f"Total cycles: {daemon.total_cycles}") print(f"Uptime: {daemon.uptime_seconds:.2f}s") if __name__ == "__main__": asyncio.run(main())