No description
Find a file
autocommit 28ed916e37
Some checks failed
Publish / publish (push) Failing after 1s
deps-upgrade(deps): ⬆️ Update dependencies to latest versions in pyproject.toml
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-04-12 00:20:15 -07:00
.forgejo/workflows chore: initial commit with DRY workflow 2026-01-21 12:48:00 -08:00
dist chore: initial commit with DRY workflow 2026-01-21 12:48:00 -08:00
src/lilith_activity_logger chore: initial commit with DRY workflow 2026-01-21 12:48:00 -08:00
tests chore: initial commit with DRY workflow 2026-01-21 12:48:00 -08:00
MIGRATION.md chore: initial commit with DRY workflow 2026-01-21 12:48:00 -08:00
pyproject.toml deps-upgrade(deps): ⬆️ Update dependencies to latest versions in pyproject.toml 2026-04-12 00:20:15 -07:00
README.md chore: initial commit with DRY workflow 2026-01-21 12:48:00 -08:00

lilith-activity-logger

Structured activity logging with JSON Lines format and automatic rotation.

Overview

lilith-activity-logger provides a simple yet powerful activity logger that writes structured log entries in JSON Lines format with automatic size-based rotation. Perfect for daemons, services, and applications that need structured activity tracking.

Features

  • JSON Lines Format: Each log entry is a single line of JSON, perfect for streaming and parsing
  • Automatic Rotation: Size-based log rotation with timestamp-based archival
  • Structured Data: Log arbitrary structured data with each entry
  • Instance Tracking: Track activities across multiple process instances
  • Extensible Actions: Standard action types with support for custom actions
  • Thread-Safe: Safe for concurrent logging from multiple threads

Installation

pip install lilith-activity-logger

Quick Start

from pathlib import Path
from lilith_activity_logger import ActivityLogger, ActionType

# Initialize logger
logger = ActivityLogger(
    log_path=Path("activity.jsonl"),
    instance_id="worker-1",
    max_size_mb=50
)

# Log activities
logger.log_start({"task": "processing", "items": 100})
logger.log("custom_action", {"detail": "something happened"})
logger.log_error("Connection failed", {"service": "api", "attempts": 3})
logger.log_end({"duration": 42.5, "success": True})

# Read recent entries
entries = logger.get_recent_entries(limit=10)
for entry in entries:
    print(f"{entry['timestamp']}: {entry['action']} - {entry['data']}")

Core Concepts

Log Entry Format

Each log entry is a JSON object with the following fields:

{
  "timestamp": "2026-01-20T15:30:00.123456",
  "action": "start",
  "instance_id": "worker-1",
  "data": {
    "task": "processing",
    "items": 100
  }
}

Standard Action Types

The package provides standard action types via the ActionType enum:

  • START: Generic start action (process, task, cycle, etc.)
  • END: Generic end action (process, task, cycle, etc.)
  • ERROR: Error occurred during processing
  • STATE_CHANGE: State transition occurred
  • HEALTH_CHECK: Health check or status probe

Custom Action Types

You can use custom action strings for application-specific events:

logger.log("data_import", {"records": 1000, "source": "api"})
logger.log("cache_refresh", {"old_count": 50, "new_count": 75})
logger.log("user_login", {"user_id": "12345", "ip": "192.168.1.1"})

Usage Examples

Basic Logging

from pathlib import Path
from lilith_activity_logger import ActivityLogger

logger = ActivityLogger(
    log_path=Path("activity.jsonl"),
    instance_id="app-1"
)

# Simple log entry
logger.log("processing", {"item_id": 123})

# With standard action type
from lilith_activity_logger import ActionType
logger.log(ActionType.START, {"phase": "initialization"})

Convenience Methods

# Start/end events
logger.log_start({"task": "batch_processing", "batch_size": 1000})
logger.log_end({"duration_seconds": 42.5, "success": True})

# Error logging
logger.log_error(
    "Connection timeout",
    {"service": "database", "attempts": 3, "timeout_seconds": 30}
)

# State changes
logger.log_state_change(
    old_state="idle",
    new_state="processing",
    reason="new task received"
)

# Health checks
logger.log_health_check(
    healthy=True,
    details={"latency_ms": 45, "connections": 10, "memory_mb": 512}
)

Daemon/Service Usage

import uuid
from pathlib import Path
from lilith_activity_logger import ActivityLogger

class MyDaemon:
    def __init__(self):
        self.daemon_id = str(uuid.uuid4())[:8]
        self.logger = ActivityLogger(
            log_path=Path("/var/log/mydaemon/activity.jsonl"),
            instance_id=self.daemon_id,
            max_size_mb=50
        )

    def start(self):
        self.logger.log_start({"version": "1.0.0"})
        self.logger.log_state_change("stopped", "starting")

        # ... initialization ...

        self.logger.log_state_change("starting", "running")

    def process_cycle(self):
        cycle_id = str(uuid.uuid4())
        self.logger.log("cycle_start", {"cycle_id": cycle_id})

        try:
            # ... do work ...
            self.logger.log("cycle_end", {"cycle_id": cycle_id, "items": 42})
        except Exception as e:
            self.logger.log_error(str(e), {"cycle_id": cycle_id})

    def stop(self):
        self.logger.log_state_change("running", "stopping")
        self.logger.log_end({"shutdown": "graceful"})

Reading Log Entries

# Get recent entries
entries = logger.get_recent_entries(limit=100)

# Process entries
for entry in entries:
    timestamp = entry["timestamp"]
    action = entry["action"]
    data = entry["data"]

    print(f"{timestamp}: {action}")
    if action == "error":
        print(f"  Error: {data.get('error')}")

# Filter by action type
errors = [e for e in entries if e["action"] == "error"]
print(f"Found {len(errors)} errors")

Log Rotation

Logs automatically rotate when they reach the configured size:

logger = ActivityLogger(
    log_path=Path("activity.jsonl"),
    max_size_mb=50  # Rotate at 50MB
)

# When activity.jsonl reaches 50MB:
# 1. activity.jsonl -> activity.20260120_153045.jsonl (archive)
# 2. New activity.jsonl is created

Disabled Logging

# Disable logging by passing None
logger = ActivityLogger(log_path=None)

# All logging calls become no-ops
logger.log("action", {"data": "value"})  # Does nothing
logger.log_error("error")  # Does nothing

# Check if logging is enabled
if logger.enabled:
    logger.log("action", data)

Multi-Instance Tracking

# Worker 1
logger1 = ActivityLogger(
    log_path=Path("activity.jsonl"),
    instance_id="worker-1"
)

# Worker 2
logger2 = ActivityLogger(
    log_path=Path("activity.jsonl"),
    instance_id="worker-2"
)

# Both write to same log with different instance_ids
logger1.log("processing", {"item": 1})
logger2.log("processing", {"item": 2})

# Read and filter by instance
entries = logger1.get_recent_entries()
worker1_entries = [e for e in entries if e["instance_id"] == "worker-1"]

API Reference

ActivityLogger

Main logger class for activity logging.

Constructor

ActivityLogger(
    log_path: Path | None = None,
    instance_id: str = "",
    max_size_mb: int = 50
)

Parameters:

  • log_path: Path to log file. If None, logging is disabled.
  • instance_id: Unique identifier for this instance (e.g., process ID, worker name).
  • max_size_mb: Maximum log file size in MB before rotation.

Methods

log(action, data=None)

Log an activity event.

logger.log("processing", {"items": 10})
logger.log(ActionType.ERROR, {"message": "failed"})
log_start(data=None)

Convenience method for logging start events.

logger.log_start({"phase": "initialization"})
log_end(data=None)

Convenience method for logging end events.

logger.log_end({"duration_seconds": 42.5})
log_error(error, context=None)

Convenience method for logging errors.

logger.log_error("Connection failed", {"service": "api"})
log_state_change(old_state, new_state, reason="")

Convenience method for logging state transitions.

logger.log_state_change("idle", "active", "task received")
log_health_check(healthy, details=None)

Convenience method for logging health checks.

logger.log_health_check(True, {"latency_ms": 10})
get_recent_entries(limit=100)

Read recent log entries.

entries = logger.get_recent_entries(limit=50)

Returns: List of log entries (dicts), most recent first.

clear_log()

Clear the current log file. Archives are not affected.

logger.clear_log()

Properties

  • enabled: bool - Whether logging is enabled
  • log_path: Path | None - Path to log file
  • instance_id: str - Instance identifier

ActionType

Enum of standard action types.

from lilith_activity_logger import ActionType

ActionType.START         # "start"
ActionType.END           # "end"
ActionType.ERROR         # "error"
ActionType.STATE_CHANGE  # "state_change"
ActionType.HEALTH_CHECK  # "health_check"

Best Practices

1. Use Descriptive Instance IDs

import os
import socket

instance_id = f"{socket.gethostname()}-{os.getpid()}"
logger = ActivityLogger(log_path=log_path, instance_id=instance_id)

2. Structure Your Data Consistently

# Good: Consistent structure
logger.log("request", {
    "method": "POST",
    "path": "/api/users",
    "status": 200,
    "duration_ms": 45
})

# Avoid: Inconsistent structure
logger.log("request", {"info": "POST /api/users returned 200 in 45ms"})

3. Use Standard Action Types When Possible

# Good: Use standard types
logger.log(ActionType.ERROR, {"error": "Connection failed"})

# Also good: Custom types for domain events
logger.log("order_placed", {"order_id": "12345"})

4. Include Context in Errors

logger.log_error(
    "Database connection failed",
    {
        "host": "db.example.com",
        "port": 5432,
        "attempts": 3,
        "last_error": "connection timeout"
    }
)

5. Log State Transitions

class StateMachine:
    def transition(self, new_state, reason=""):
        old_state = self.state
        self.state = new_state
        self.logger.log_state_change(old_state, new_state, reason)

Integration Examples

With Auto-Commit Service

from lilith_activity_logger import ActivityLogger

class AutoCommitService:
    def __init__(self):
        self.logger = ActivityLogger(
            log_path=Path("activity.jsonl"),
            instance_id=self.daemon_id
        )

    def commit_cycle(self):
        cycle_id = str(uuid.uuid4())
        self.logger.log("cycle_start", {"cycle_id": cycle_id})

        for repo in self.repos:
            try:
                result = self.commit_repo(repo)
                self.logger.log("commit", {
                    "repo": repo.name,
                    "hash": result.hash,
                    "files_changed": result.files_changed
                })
            except Exception as e:
                self.logger.log_error(str(e), {"repo": repo.name})

        self.logger.log("cycle_end", {"cycle_id": cycle_id})

With FastAPI

from fastapi import FastAPI, Request
from lilith_activity_logger import ActivityLogger

app = FastAPI()
logger = ActivityLogger(Path("api_activity.jsonl"), instance_id="api")

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    duration_ms = (time.time() - start_time) * 1000
    logger.log("request", {
        "method": request.method,
        "path": request.url.path,
        "status": response.status_code,
        "duration_ms": round(duration_ms, 2)
    })

    return response

License

MIT License - See LICENSE file for details.

Contributing

Contributions are welcome! Please ensure tests pass before submitting pull requests.

# Run tests
pytest

# Run with coverage
pytest --cov=lilith_activity_logger

Changelog

1.0.0 (2026-01-20)

  • Initial release
  • JSON Lines format logging
  • Automatic log rotation
  • Standard action types
  • Custom action support
  • Thread-safe operations
  • Comprehensive test coverage