|
Some checks failed
Publish / publish (push) Failing after 1s
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com> |
||
|---|---|---|
| .forgejo/workflows | ||
| dist | ||
| src/lilith_activity_logger | ||
| tests | ||
| MIGRATION.md | ||
| pyproject.toml | ||
| README.md | ||
lilith-activity-logger
Structured activity logging with JSON Lines format and automatic rotation.
Overview
lilith-activity-logger provides a simple yet powerful activity logger that writes structured log entries in JSON Lines format with automatic size-based rotation. Perfect for daemons, services, and applications that need structured activity tracking.
Features
- JSON Lines Format: Each log entry is a single line of JSON, perfect for streaming and parsing
- Automatic Rotation: Size-based log rotation with timestamp-based archival
- Structured Data: Log arbitrary structured data with each entry
- Instance Tracking: Track activities across multiple process instances
- Extensible Actions: Standard action types with support for custom actions
- Thread-Safe: Safe for concurrent logging from multiple threads
Installation
pip install lilith-activity-logger
Quick Start
from pathlib import Path
from lilith_activity_logger import ActivityLogger, ActionType
# Initialize logger
logger = ActivityLogger(
log_path=Path("activity.jsonl"),
instance_id="worker-1",
max_size_mb=50
)
# Log activities
logger.log_start({"task": "processing", "items": 100})
logger.log("custom_action", {"detail": "something happened"})
logger.log_error("Connection failed", {"service": "api", "attempts": 3})
logger.log_end({"duration": 42.5, "success": True})
# Read recent entries
entries = logger.get_recent_entries(limit=10)
for entry in entries:
print(f"{entry['timestamp']}: {entry['action']} - {entry['data']}")
Core Concepts
Log Entry Format
Each log entry is a JSON object with the following fields:
{
"timestamp": "2026-01-20T15:30:00.123456",
"action": "start",
"instance_id": "worker-1",
"data": {
"task": "processing",
"items": 100
}
}
Standard Action Types
The package provides standard action types via the ActionType enum:
START: Generic start action (process, task, cycle, etc.)END: Generic end action (process, task, cycle, etc.)ERROR: Error occurred during processingSTATE_CHANGE: State transition occurredHEALTH_CHECK: Health check or status probe
Custom Action Types
You can use custom action strings for application-specific events:
logger.log("data_import", {"records": 1000, "source": "api"})
logger.log("cache_refresh", {"old_count": 50, "new_count": 75})
logger.log("user_login", {"user_id": "12345", "ip": "192.168.1.1"})
Usage Examples
Basic Logging
from pathlib import Path
from lilith_activity_logger import ActivityLogger
logger = ActivityLogger(
log_path=Path("activity.jsonl"),
instance_id="app-1"
)
# Simple log entry
logger.log("processing", {"item_id": 123})
# With standard action type
from lilith_activity_logger import ActionType
logger.log(ActionType.START, {"phase": "initialization"})
Convenience Methods
# Start/end events
logger.log_start({"task": "batch_processing", "batch_size": 1000})
logger.log_end({"duration_seconds": 42.5, "success": True})
# Error logging
logger.log_error(
"Connection timeout",
{"service": "database", "attempts": 3, "timeout_seconds": 30}
)
# State changes
logger.log_state_change(
old_state="idle",
new_state="processing",
reason="new task received"
)
# Health checks
logger.log_health_check(
healthy=True,
details={"latency_ms": 45, "connections": 10, "memory_mb": 512}
)
Daemon/Service Usage
import uuid
from pathlib import Path
from lilith_activity_logger import ActivityLogger
class MyDaemon:
def __init__(self):
self.daemon_id = str(uuid.uuid4())[:8]
self.logger = ActivityLogger(
log_path=Path("/var/log/mydaemon/activity.jsonl"),
instance_id=self.daemon_id,
max_size_mb=50
)
def start(self):
self.logger.log_start({"version": "1.0.0"})
self.logger.log_state_change("stopped", "starting")
# ... initialization ...
self.logger.log_state_change("starting", "running")
def process_cycle(self):
cycle_id = str(uuid.uuid4())
self.logger.log("cycle_start", {"cycle_id": cycle_id})
try:
# ... do work ...
self.logger.log("cycle_end", {"cycle_id": cycle_id, "items": 42})
except Exception as e:
self.logger.log_error(str(e), {"cycle_id": cycle_id})
def stop(self):
self.logger.log_state_change("running", "stopping")
self.logger.log_end({"shutdown": "graceful"})
Reading Log Entries
# Get recent entries
entries = logger.get_recent_entries(limit=100)
# Process entries
for entry in entries:
timestamp = entry["timestamp"]
action = entry["action"]
data = entry["data"]
print(f"{timestamp}: {action}")
if action == "error":
print(f" Error: {data.get('error')}")
# Filter by action type
errors = [e for e in entries if e["action"] == "error"]
print(f"Found {len(errors)} errors")
Log Rotation
Logs automatically rotate when they reach the configured size:
logger = ActivityLogger(
log_path=Path("activity.jsonl"),
max_size_mb=50 # Rotate at 50MB
)
# When activity.jsonl reaches 50MB:
# 1. activity.jsonl -> activity.20260120_153045.jsonl (archive)
# 2. New activity.jsonl is created
Disabled Logging
# Disable logging by passing None
logger = ActivityLogger(log_path=None)
# All logging calls become no-ops
logger.log("action", {"data": "value"}) # Does nothing
logger.log_error("error") # Does nothing
# Check if logging is enabled
if logger.enabled:
logger.log("action", data)
Multi-Instance Tracking
# Worker 1
logger1 = ActivityLogger(
log_path=Path("activity.jsonl"),
instance_id="worker-1"
)
# Worker 2
logger2 = ActivityLogger(
log_path=Path("activity.jsonl"),
instance_id="worker-2"
)
# Both write to same log with different instance_ids
logger1.log("processing", {"item": 1})
logger2.log("processing", {"item": 2})
# Read and filter by instance
entries = logger1.get_recent_entries()
worker1_entries = [e for e in entries if e["instance_id"] == "worker-1"]
API Reference
ActivityLogger
Main logger class for activity logging.
Constructor
ActivityLogger(
log_path: Path | None = None,
instance_id: str = "",
max_size_mb: int = 50
)
Parameters:
log_path: Path to log file. IfNone, logging is disabled.instance_id: Unique identifier for this instance (e.g., process ID, worker name).max_size_mb: Maximum log file size in MB before rotation.
Methods
log(action, data=None)
Log an activity event.
logger.log("processing", {"items": 10})
logger.log(ActionType.ERROR, {"message": "failed"})
log_start(data=None)
Convenience method for logging start events.
logger.log_start({"phase": "initialization"})
log_end(data=None)
Convenience method for logging end events.
logger.log_end({"duration_seconds": 42.5})
log_error(error, context=None)
Convenience method for logging errors.
logger.log_error("Connection failed", {"service": "api"})
log_state_change(old_state, new_state, reason="")
Convenience method for logging state transitions.
logger.log_state_change("idle", "active", "task received")
log_health_check(healthy, details=None)
Convenience method for logging health checks.
logger.log_health_check(True, {"latency_ms": 10})
get_recent_entries(limit=100)
Read recent log entries.
entries = logger.get_recent_entries(limit=50)
Returns: List of log entries (dicts), most recent first.
clear_log()
Clear the current log file. Archives are not affected.
logger.clear_log()
Properties
enabled:bool- Whether logging is enabledlog_path:Path | None- Path to log fileinstance_id:str- Instance identifier
ActionType
Enum of standard action types.
from lilith_activity_logger import ActionType
ActionType.START # "start"
ActionType.END # "end"
ActionType.ERROR # "error"
ActionType.STATE_CHANGE # "state_change"
ActionType.HEALTH_CHECK # "health_check"
Best Practices
1. Use Descriptive Instance IDs
import os
import socket
instance_id = f"{socket.gethostname()}-{os.getpid()}"
logger = ActivityLogger(log_path=log_path, instance_id=instance_id)
2. Structure Your Data Consistently
# Good: Consistent structure
logger.log("request", {
"method": "POST",
"path": "/api/users",
"status": 200,
"duration_ms": 45
})
# Avoid: Inconsistent structure
logger.log("request", {"info": "POST /api/users returned 200 in 45ms"})
3. Use Standard Action Types When Possible
# Good: Use standard types
logger.log(ActionType.ERROR, {"error": "Connection failed"})
# Also good: Custom types for domain events
logger.log("order_placed", {"order_id": "12345"})
4. Include Context in Errors
logger.log_error(
"Database connection failed",
{
"host": "db.example.com",
"port": 5432,
"attempts": 3,
"last_error": "connection timeout"
}
)
5. Log State Transitions
class StateMachine:
def transition(self, new_state, reason=""):
old_state = self.state
self.state = new_state
self.logger.log_state_change(old_state, new_state, reason)
Integration Examples
With Auto-Commit Service
from lilith_activity_logger import ActivityLogger
class AutoCommitService:
def __init__(self):
self.logger = ActivityLogger(
log_path=Path("activity.jsonl"),
instance_id=self.daemon_id
)
def commit_cycle(self):
cycle_id = str(uuid.uuid4())
self.logger.log("cycle_start", {"cycle_id": cycle_id})
for repo in self.repos:
try:
result = self.commit_repo(repo)
self.logger.log("commit", {
"repo": repo.name,
"hash": result.hash,
"files_changed": result.files_changed
})
except Exception as e:
self.logger.log_error(str(e), {"repo": repo.name})
self.logger.log("cycle_end", {"cycle_id": cycle_id})
With FastAPI
from fastapi import FastAPI, Request
from lilith_activity_logger import ActivityLogger
app = FastAPI()
logger = ActivityLogger(Path("api_activity.jsonl"), instance_id="api")
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration_ms = (time.time() - start_time) * 1000
logger.log("request", {
"method": request.method,
"path": request.url.path,
"status": response.status_code,
"duration_ms": round(duration_ms, 2)
})
return response
License
MIT License - See LICENSE file for details.
Contributing
Contributions are welcome! Please ensure tests pass before submitting pull requests.
# Run tests
pytest
# Run with coverage
pytest --cov=lilith_activity_logger
Changelog
1.0.0 (2026-01-20)
- Initial release
- JSON Lines format logging
- Automatic log rotation
- Standard action types
- Custom action support
- Thread-safe operations
- Comprehensive test coverage