ml-knowledge-platform/knowledge_platform/__main__.py
Lilith 240b4328f1 chore(config): 🔧 Update 40 configuration files across project
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-02-16 01:39:57 -08:00

506 lines
17 KiB
Python

"""Knowledge Platform — Terminal interface for knowledge verification."""
import asyncio
import json
import sys
import tomllib
from typing import Any
import click
import httpx
from click_default_group import DefaultGroup
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from knowledge_platform.app import KnowledgePlatformApp
from knowledge_platform.config import ModelConfig, LaunchConfig
from knowledge_platform.database.database import create_database, sqlite_file_name
from knowledge_platform.locations import config_file
console = Console()
class DependencyError(Exception):
"""Raised when a required dependency is not available."""
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def create_db_if_not_exists() -> None:
if not sqlite_file_name.exists():
click.echo(f"Creating database at {sqlite_file_name!r}")
asyncio.run(create_database())
def load_or_create_config_file() -> dict[str, Any]:
config = config_file()
try:
file_config = tomllib.loads(config.read_text())
except FileNotFoundError:
file_config = {}
try:
config.touch()
except OSError:
pass
return file_config
def _build_config(**cli_overrides: Any) -> LaunchConfig:
"""Build LaunchConfig from file + CLI overrides."""
file_config = load_or_create_config_file()
merged: dict[str, Any] = {**file_config, **{k: v for k, v in cli_overrides.items() if v}}
return LaunchConfig(**merged)
# ---------------------------------------------------------------------------
# Dependency checking
# ---------------------------------------------------------------------------
async def _check_kv_api(url: str) -> tuple[bool, str]:
"""Check KV API health. Returns (ok, detail)."""
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
resp = await client.get(f"{url.rstrip('/')}/api/truth/health")
resp.raise_for_status()
data = resp.json()
status = data.get("status", "unknown")
return status == "ok", f"status={status}"
except httpx.ConnectError:
return False, f"Connection refused at {url}"
except httpx.TimeoutException:
return False, f"Timed out connecting to {url}"
except httpx.HTTPStatusError as exc:
return False, f"HTTP {exc.response.status_code}"
except Exception as exc:
return False, str(exc)
async def _check_llm_service(model: ModelConfig) -> tuple[bool, str]:
"""Check if the LLM service is reachable. Returns (ok, detail)."""
if not model.api_base:
return True, "cloud model (no local service)"
base_url = str(model.api_base).rstrip("/")
if base_url.endswith("/v1"):
base_url = base_url[:-3]
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
resp = await client.get(f"{base_url}/health")
if resp.status_code == 200:
return True, "healthy"
return False, f"HTTP {resp.status_code}"
except httpx.ConnectError:
return False, f"Connection refused at {base_url}"
except httpx.TimeoutException:
return False, f"Timed out connecting to {base_url}"
except Exception as exc:
return False, str(exc)
async def _check_dependencies(config: LaunchConfig) -> list[tuple[str, bool, str]]:
"""Check all dependencies concurrently."""
model = config.default_model_object
kv_result, llm_result = await asyncio.gather(
_check_kv_api(config.kv_api_url),
_check_llm_service(model),
)
return [
("KV API (Knowledge Verification)", kv_result[0], kv_result[1]),
(f"LLM ({model.display_name or model.name})", llm_result[0], llm_result[1]),
]
def _guess_systemd_service(model: ModelConfig) -> str | None:
"""Map a local model's port to its systemd service name."""
if not model.api_base:
return None
base = str(model.api_base)
port_map = {"10010": "llama-http-3b.service", "10020": "llama-http-14b.service"}
for port, service in port_map.items():
if f":{port}" in base:
return service
return None
def _render_dependency_status(
results: list[tuple[str, bool, str]],
config: LaunchConfig,
) -> None:
"""Render dependency check results as a Rich table, then fail if any are down."""
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Dependency", style="bold")
table.add_column("Status")
table.add_column("Detail", style="dim")
failures: list[tuple[str, str]] = []
for name, ok, detail in results:
if ok:
table.add_row(name, "[green]OK[/green]", detail)
else:
table.add_row(name, "[red]FAILED[/red]", detail)
failures.append((name, detail))
console.print(Panel(table, title="Dependency Check", border_style="cyan"))
if failures:
console.print()
console.print("[bold red]Cannot start — required dependencies are down:[/bold red]")
for name, detail in failures:
console.print(f" [red]✗[/red] {name}: {detail}")
console.print()
console.print("[dim]To fix:[/dim]")
for name, detail in failures:
if "KV API" in name:
console.print(
f" [yellow]→[/yellow] Start the truth-validation semantic service "
f"(KV API at {config.kv_api_url})"
)
elif "LLM" in name:
model = config.default_model_object
service = _guess_systemd_service(model)
if service:
console.print(
f" [yellow]→[/yellow] Start the local LLM service: "
f"[bold]systemctl --user start {service}[/bold]"
)
else:
console.print(
f" [yellow]→[/yellow] Ensure the LLM endpoint is running at "
f"{model.api_base}"
)
raise DependencyError(
f"{len(failures)} dependency(ies) failed: "
+ ", ".join(name for name, _ in failures)
)
def check_dependencies_or_die(config: LaunchConfig) -> None:
"""Run dependency checks synchronously. Exits with code 1 on failure."""
with console.status("[bold cyan]Checking dependencies...", spinner="dots"):
results = asyncio.run(_check_dependencies(config))
try:
_render_dependency_status(results, config)
except DependencyError:
sys.exit(1)
console.print("[green]All dependencies healthy. Launching...[/green]")
console.print()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
@click.group(cls=DefaultGroup, default="default", default_if_no_args=True)
def cli() -> None:
"""Knowledge Platform — knowledge verification TUI."""
# ---- default (TUI) ----
@cli.command()
@click.argument("prompt", nargs=-1, type=str, required=False)
@click.option("-m", "--model", type=str, default="", help="Model ID to use.")
@click.option("-i", "--inline", is_flag=True, default=False, help="Textual inline mode.")
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
@click.option("--skip-checks", is_flag=True, default=False, help="Skip dependency checks.")
@click.option("--system-prompt", type=str, default="", help="Override the system prompt.")
def default(
prompt: tuple[str, ...], model: str, inline: bool, kv_api: str, skip_checks: bool,
system_prompt: str,
) -> None:
"""Launch the Knowledge Platform TUI (default command)."""
joined_prompt = " ".join(prompt) if prompt else ""
create_db_if_not_exists()
config = _build_config(
default_model=model or None,
kv_api_url=kv_api or None,
system_prompt=system_prompt or None,
)
if not skip_checks:
check_dependencies_or_die(config)
app = KnowledgePlatformApp(config, startup_prompt=joined_prompt)
app.run(inline=inline)
# ---- check ----
@cli.command()
@click.option("-m", "--model", type=str, default="", help="Model ID to check against.")
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
def check(model: str, kv_api: str) -> None:
"""Run dependency health checks without launching the TUI."""
config = _build_config(default_model=model or None, kv_api_url=kv_api or None)
with console.status("[bold cyan]Checking dependencies...", spinner="dots"):
results = asyncio.run(_check_dependencies(config))
try:
_render_dependency_status(results, config)
except DependencyError:
sys.exit(1)
console.print("[green]All dependencies healthy.[/green]")
# ---- prompt ----
@cli.command()
@click.option("--tokens", is_flag=True, default=False, help="Show estimated token count.")
def prompt(tokens: bool) -> None:
"""Print the active system prompt to stdout."""
config = _build_config()
click.echo(config.system_prompt)
if tokens:
words = len(config.system_prompt.split())
est = int(words * 1.3)
click.echo(f"\n--- {len(config.system_prompt)} chars, ~{words} words, ~{est} tokens ---")
# ---- models ----
@cli.command()
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
def models(as_json: bool) -> None:
"""List available models."""
config = _build_config()
all_models = config.all_models
if as_json:
data = [
{
"id": m.id or m.name,
"name": m.name,
"display_name": m.display_name,
"provider": m.provider,
"api_base": str(m.api_base) if m.api_base else None,
"temperature": m.temperature,
}
for m in all_models
]
click.echo(json.dumps(data, indent=2))
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("ID", style="bold")
table.add_column("Display Name")
table.add_column("Provider", style="dim")
table.add_column("Endpoint", style="dim")
for m in all_models:
default_marker = " *" if (m.id or m.name) == config.default_model else ""
table.add_row(
f"{m.id or m.name}{default_marker}",
m.display_name or m.name,
m.provider or "",
str(m.api_base) if m.api_base else "cloud",
)
console.print(table)
console.print("[dim]* = default model[/dim]")
# ---- ask (one-shot, no TUI) ----
@cli.command()
@click.argument("question", nargs=-1, required=True)
@click.option("-m", "--model", type=str, default="", help="Model ID to use.")
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
@click.option("--no-context", is_flag=True, default=False, help="Skip KV context injection.")
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
@click.option("--system-prompt", type=str, default="", help="Override the system prompt.")
def ask(
question: tuple[str, ...], model: str, kv_api: str, no_context: bool, as_json: bool,
system_prompt: str,
) -> None:
"""One-shot query: send a prompt, print the response, exit. No TUI, no chat history.
\b
Examples:
knowledge-platform ask "what is the pricing model?"
knowledge-platform ask "verify this claim" --json-output
knowledge-platform ask "quick check" -m claude-haiku
"""
config = _build_config(
default_model=model or None,
kv_api_url=kv_api or None,
system_prompt=system_prompt or None,
)
# Dependency check (fail fast)
check_dependencies_or_die(config)
joined = " ".join(question)
asyncio.run(_run_ask(config, joined, no_context, as_json))
async def _run_ask(
config: LaunchConfig, question: str, no_context: bool, as_json: bool
) -> None:
"""Execute a one-shot ask: context lookup + LLM stream to stdout."""
from knowledge_platform.backend.knowledge_backend import KnowledgeBackend
backend = KnowledgeBackend(kv_api_url=config.kv_api_url)
model = config.default_model_object
# Fetch KV context
context: str | None = None
if not no_context:
context = await backend.get_context_for_llm(question)
messages: list[dict[str, Any]] = [
{"role": "system", "content": config.system_prompt},
{"role": "user", "content": question},
]
chunks: list[str] = []
async for chunk in backend.stream_llm_response(messages=messages, model=model, context=context):
if as_json:
chunks.append(chunk)
else:
sys.stdout.write(chunk)
sys.stdout.flush()
if as_json:
full_response = "".join(chunks)
output = {
"model": model.id or model.name,
"question": question,
"context_injected": context is not None,
"response": full_response,
}
click.echo()
click.echo(json.dumps(output, indent=2))
else:
click.echo()
# ---- search (one-shot KV search) ----
@cli.command()
@click.argument("query", nargs=-1, required=True)
@click.option("--limit", type=int, default=5, help="Max results.")
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
def search(query: tuple[str, ...], limit: int, kv_api: str, as_json: bool) -> None:
"""Search the knowledge base from the command line.
\b
Examples:
knowledge-platform search "pricing model"
knowledge-platform search "governance" --limit 3 --json-output
"""
config = _build_config(kv_api_url=kv_api or None)
joined = " ".join(query)
asyncio.run(_run_search(config, joined, limit, as_json))
async def _run_search(config: LaunchConfig, query: str, limit: int, as_json: bool) -> None:
from knowledge_platform.backend.kv_client import KVClient
async with KVClient(base_url=config.kv_api_url) as kv:
results = await kv.search(query, limit=limit)
if as_json:
click.echo(json.dumps({"query": query, "results": results}, indent=2))
return
if not results:
console.print(f"[yellow]No results for:[/yellow] {query}")
return
for i, r in enumerate(results, 1):
title = r.get("title", f"Result {i}")
content = r.get("content", r.get("text", ""))
score = r.get("score", 0)
console.print(f"[bold cyan]{i}.[/bold cyan] {title} [dim](score: {score:.2f})[/dim]")
if content:
if len(content) > 300:
content = content[:300] + "..."
console.print(f" {content}")
console.print()
# ---- verify (one-shot claim verification) ----
@cli.command()
@click.argument("claim", nargs=-1, required=True)
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
def verify(claim: tuple[str, ...], kv_api: str, as_json: bool) -> None:
"""Verify a claim against the knowledge base.
\b
Examples:
knowledge-platform verify "the platform uses blockchain"
knowledge-platform verify "pricing is subscription-based" --json-output
"""
config = _build_config(kv_api_url=kv_api or None)
joined = " ".join(claim)
asyncio.run(_run_verify(config, joined, as_json))
async def _run_verify(config: LaunchConfig, claim: str, as_json: bool) -> None:
from knowledge_platform.backend.kv_client import KVClient
async with KVClient(base_url=config.kv_api_url) as kv:
result = await kv.validate(claim)
if as_json:
click.echo(json.dumps({"claim": claim, **result}, indent=2))
return
verdict = result.get("verdict", "unknown")
confidence = result.get("confidence", 0)
explanation = result.get("explanation", "")
emoji = {"verified": "[green]VERIFIED[/green]", "contradiction": "[red]CONTRADICTION[/red]"}.get(
verdict, f"[yellow]{verdict.upper()}[/yellow]"
)
console.print(f"{emoji} confidence={confidence:.0%}")
if explanation:
console.print(f" {explanation}")
# ---- reset ----
@cli.command()
def reset() -> None:
"""Reset the database. All conversation history will be lost."""
from rich.padding import Padding
from rich.text import Text
from textwrap import dedent
console.print(
Padding(
Text.from_markup(
dedent(f"""\
[u b red]Warning![/]
[b red]This will delete all messages and chats.[/]
You may wish to create a backup of \
"[bold blue u]{str(sqlite_file_name.resolve().absolute())}[/]" before continuing.
""")
),
pad=(1, 2),
)
)
if click.confirm("Delete all chats?", abort=True):
sqlite_file_name.unlink(missing_ok=True)
asyncio.run(create_database())
console.print(f"Database reset @ {sqlite_file_name}")
if __name__ == "__main__":
cli()