perf(database): ⚡ Optimize SQLite connection handling with PRAGMA settings for journal mode, synchronous writes, and cache size to reduce lock contention and speed up telemetry queries
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
fa7b95a05a
commit
bea1934dfd
1 changed files with 64 additions and 14 deletions
|
|
@ -3,11 +3,11 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import AsyncIterator
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import event, text
|
||||
from sqlalchemy.ext.asyncio import (
|
||||
AsyncEngine,
|
||||
AsyncSession,
|
||||
|
|
@ -18,6 +18,31 @@ from sqlalchemy.ext.asyncio import (
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_SQLITE_PRAGMAS = (
|
||||
("journal_mode", "WAL"),
|
||||
("synchronous", "NORMAL"),
|
||||
("busy_timeout", "5000"),
|
||||
("temp_store", "MEMORY"),
|
||||
("cache_size", "-32000"),
|
||||
("foreign_keys", "ON"),
|
||||
)
|
||||
|
||||
|
||||
def _apply_sqlite_pragmas(dbapi_connection, _connection_record) -> None:
|
||||
"""Apply SQLite PRAGMAs on every new connection.
|
||||
|
||||
WAL + 5s busy_timeout is what keeps the daemon's per-second telemetry
|
||||
inserts from racing the prune-loop's writes — without these, every prune
|
||||
cycle holds a global lock long enough to drop seconds of samples.
|
||||
"""
|
||||
cursor = dbapi_connection.cursor()
|
||||
try:
|
||||
for name, value in _SQLITE_PRAGMAS:
|
||||
cursor.execute(f"PRAGMA {name}={value}")
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
"""Manages SQLite connection with rolling data retention."""
|
||||
|
||||
|
|
@ -55,6 +80,9 @@ class DatabaseManager:
|
|||
pool_pre_ping=True,
|
||||
)
|
||||
|
||||
# Apply SQLite PRAGMAs (WAL + busy_timeout) on every checkout.
|
||||
event.listen(self.engine.sync_engine, "connect", _apply_sqlite_pragmas)
|
||||
|
||||
self.session_factory = async_sessionmaker(
|
||||
self.engine,
|
||||
class_=AsyncSession,
|
||||
|
|
@ -124,46 +152,68 @@ class DatabaseManager:
|
|||
async def prune_old_data(self, force: bool = False) -> dict[str, int]:
|
||||
"""Prune old data to maintain size limits.
|
||||
|
||||
Strategy: drop everything older than RETENTION_DAYS first; if the file
|
||||
is still over SIZE_THRESHOLD_MB after that, drop the oldest records
|
||||
until we're back under target. This guarantees the DB cannot grow
|
||||
unbounded just because all data happens to be < RETENTION_DAYS old.
|
||||
|
||||
VACUUM only runs when we actually deleted rows — it holds an exclusive
|
||||
lock proportional to file size, so calling it on every tick is what
|
||||
was producing the sustained "database is locked" storms.
|
||||
|
||||
Args:
|
||||
force: Force pruning even if under threshold
|
||||
|
||||
Returns:
|
||||
Dictionary with counts of deleted records
|
||||
"""
|
||||
from .models import GPUTelemetryRecord
|
||||
|
||||
size_mb = await self.get_database_size_mb()
|
||||
logger.info("Database size: %.2f MB / %d MB", size_mb, self.MAX_SIZE_MB)
|
||||
|
||||
if not force and size_mb < self.SIZE_THRESHOLD_MB:
|
||||
return {"telemetry_records": 0}
|
||||
|
||||
cutoff_date = datetime.utcnow() - timedelta(days=self.RETENTION_DAYS)
|
||||
deleted = {"telemetry_records": 0}
|
||||
cutoff_date = datetime.now(timezone.utc) - timedelta(days=self.RETENTION_DAYS)
|
||||
deleted_total = 0
|
||||
|
||||
async with self.session() as session:
|
||||
# Delete old telemetry records
|
||||
result = await session.execute(
|
||||
text("DELETE FROM gpu_telemetry WHERE timestamp < :cutoff"),
|
||||
{"cutoff": cutoff_date},
|
||||
)
|
||||
deleted["telemetry_records"] = result.rowcount or 0
|
||||
deleted_total += result.rowcount or 0
|
||||
|
||||
await session.commit()
|
||||
# If the age-based delete didn't bring us under target, evict the
|
||||
# oldest remaining rows in batches until we do.
|
||||
size_mb = await self.get_database_size_mb()
|
||||
while size_mb >= self.SIZE_THRESHOLD_MB:
|
||||
async with self.session() as session:
|
||||
result = await session.execute(
|
||||
text(
|
||||
"DELETE FROM gpu_telemetry WHERE id IN ("
|
||||
"SELECT id FROM gpu_telemetry ORDER BY timestamp ASC LIMIT 50000"
|
||||
")"
|
||||
),
|
||||
)
|
||||
batch = result.rowcount or 0
|
||||
if batch == 0:
|
||||
break
|
||||
deleted_total += batch
|
||||
size_mb = await self.get_database_size_mb()
|
||||
|
||||
# Vacuum to reclaim space (SQLite)
|
||||
async with self.engine.connect() as conn:
|
||||
await conn.execute(text("VACUUM"))
|
||||
if deleted_total > 0:
|
||||
async with self.engine.connect() as conn:
|
||||
await conn.execute(text("VACUUM"))
|
||||
|
||||
new_size_mb = await self.get_database_size_mb()
|
||||
logger.info(
|
||||
"Pruned %d telemetry records. Size: %.2f MB -> %.2f MB",
|
||||
deleted["telemetry_records"],
|
||||
deleted_total,
|
||||
size_mb,
|
||||
new_size_mb,
|
||||
)
|
||||
|
||||
return deleted
|
||||
return {"telemetry_records": deleted_total}
|
||||
|
||||
async def _prune_loop(self) -> None:
|
||||
"""Background task to periodically prune old data."""
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue