506 lines
17 KiB
Python
506 lines
17 KiB
Python
"""Knowledge Platform — Terminal interface for knowledge verification."""
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import tomllib
|
|
from typing import Any
|
|
|
|
import click
|
|
import httpx
|
|
from click_default_group import DefaultGroup
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.table import Table
|
|
|
|
from knowledge_platform.app import KnowledgePlatformApp
|
|
from knowledge_platform.config import ModelConfig, LaunchConfig
|
|
from knowledge_platform.database.database import create_database, sqlite_file_name
|
|
from knowledge_platform.locations import config_file
|
|
|
|
console = Console()
|
|
|
|
|
|
class DependencyError(Exception):
|
|
"""Raised when a required dependency is not available."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def create_db_if_not_exists() -> None:
|
|
if not sqlite_file_name.exists():
|
|
click.echo(f"Creating database at {sqlite_file_name!r}")
|
|
asyncio.run(create_database())
|
|
|
|
|
|
def load_or_create_config_file() -> dict[str, Any]:
|
|
config = config_file()
|
|
try:
|
|
file_config = tomllib.loads(config.read_text())
|
|
except FileNotFoundError:
|
|
file_config = {}
|
|
try:
|
|
config.touch()
|
|
except OSError:
|
|
pass
|
|
return file_config
|
|
|
|
|
|
def _build_config(**cli_overrides: Any) -> LaunchConfig:
|
|
"""Build LaunchConfig from file + CLI overrides."""
|
|
file_config = load_or_create_config_file()
|
|
merged: dict[str, Any] = {**file_config, **{k: v for k, v in cli_overrides.items() if v}}
|
|
return LaunchConfig(**merged)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dependency checking
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _check_kv_api(url: str) -> tuple[bool, str]:
|
|
"""Check KV API health. Returns (ok, detail)."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
|
|
resp = await client.get(f"{url.rstrip('/')}/api/truth/health")
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
status = data.get("status", "unknown")
|
|
return status == "ok", f"status={status}"
|
|
except httpx.ConnectError:
|
|
return False, f"Connection refused at {url}"
|
|
except httpx.TimeoutException:
|
|
return False, f"Timed out connecting to {url}"
|
|
except httpx.HTTPStatusError as exc:
|
|
return False, f"HTTP {exc.response.status_code}"
|
|
except Exception as exc:
|
|
return False, str(exc)
|
|
|
|
|
|
async def _check_llm_service(model: ModelConfig) -> tuple[bool, str]:
|
|
"""Check if the LLM service is reachable. Returns (ok, detail)."""
|
|
if not model.api_base:
|
|
return True, "cloud model (no local service)"
|
|
|
|
base_url = str(model.api_base).rstrip("/")
|
|
if base_url.endswith("/v1"):
|
|
base_url = base_url[:-3]
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
|
|
resp = await client.get(f"{base_url}/health")
|
|
if resp.status_code == 200:
|
|
return True, "healthy"
|
|
return False, f"HTTP {resp.status_code}"
|
|
except httpx.ConnectError:
|
|
return False, f"Connection refused at {base_url}"
|
|
except httpx.TimeoutException:
|
|
return False, f"Timed out connecting to {base_url}"
|
|
except Exception as exc:
|
|
return False, str(exc)
|
|
|
|
|
|
async def _check_dependencies(config: LaunchConfig) -> list[tuple[str, bool, str]]:
|
|
"""Check all dependencies concurrently."""
|
|
model = config.default_model_object
|
|
kv_result, llm_result = await asyncio.gather(
|
|
_check_kv_api(config.kv_api_url),
|
|
_check_llm_service(model),
|
|
)
|
|
return [
|
|
("KV API (Knowledge Verification)", kv_result[0], kv_result[1]),
|
|
(f"LLM ({model.display_name or model.name})", llm_result[0], llm_result[1]),
|
|
]
|
|
|
|
|
|
def _guess_systemd_service(model: ModelConfig) -> str | None:
|
|
"""Map a local model's port to its systemd service name."""
|
|
if not model.api_base:
|
|
return None
|
|
base = str(model.api_base)
|
|
port_map = {"10010": "llama-http-3b.service", "10020": "llama-http-14b.service"}
|
|
for port, service in port_map.items():
|
|
if f":{port}" in base:
|
|
return service
|
|
return None
|
|
|
|
|
|
def _render_dependency_status(
|
|
results: list[tuple[str, bool, str]],
|
|
config: LaunchConfig,
|
|
) -> None:
|
|
"""Render dependency check results as a Rich table, then fail if any are down."""
|
|
table = Table(show_header=True, header_style="bold cyan")
|
|
table.add_column("Dependency", style="bold")
|
|
table.add_column("Status")
|
|
table.add_column("Detail", style="dim")
|
|
|
|
failures: list[tuple[str, str]] = []
|
|
for name, ok, detail in results:
|
|
if ok:
|
|
table.add_row(name, "[green]OK[/green]", detail)
|
|
else:
|
|
table.add_row(name, "[red]FAILED[/red]", detail)
|
|
failures.append((name, detail))
|
|
|
|
console.print(Panel(table, title="Dependency Check", border_style="cyan"))
|
|
|
|
if failures:
|
|
console.print()
|
|
console.print("[bold red]Cannot start — required dependencies are down:[/bold red]")
|
|
for name, detail in failures:
|
|
console.print(f" [red]✗[/red] {name}: {detail}")
|
|
console.print()
|
|
console.print("[dim]To fix:[/dim]")
|
|
for name, detail in failures:
|
|
if "KV API" in name:
|
|
console.print(
|
|
f" [yellow]→[/yellow] Start the truth-validation semantic service "
|
|
f"(KV API at {config.kv_api_url})"
|
|
)
|
|
elif "LLM" in name:
|
|
model = config.default_model_object
|
|
service = _guess_systemd_service(model)
|
|
if service:
|
|
console.print(
|
|
f" [yellow]→[/yellow] Start the local LLM service: "
|
|
f"[bold]systemctl --user start {service}[/bold]"
|
|
)
|
|
else:
|
|
console.print(
|
|
f" [yellow]→[/yellow] Ensure the LLM endpoint is running at "
|
|
f"{model.api_base}"
|
|
)
|
|
raise DependencyError(
|
|
f"{len(failures)} dependency(ies) failed: "
|
|
+ ", ".join(name for name, _ in failures)
|
|
)
|
|
|
|
|
|
def check_dependencies_or_die(config: LaunchConfig) -> None:
|
|
"""Run dependency checks synchronously. Exits with code 1 on failure."""
|
|
with console.status("[bold cyan]Checking dependencies...", spinner="dots"):
|
|
results = asyncio.run(_check_dependencies(config))
|
|
try:
|
|
_render_dependency_status(results, config)
|
|
except DependencyError:
|
|
sys.exit(1)
|
|
console.print("[green]All dependencies healthy. Launching...[/green]")
|
|
console.print()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@click.group(cls=DefaultGroup, default="default", default_if_no_args=True)
|
|
def cli() -> None:
|
|
"""Knowledge Platform — knowledge verification TUI."""
|
|
|
|
|
|
# ---- default (TUI) ----
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("prompt", nargs=-1, type=str, required=False)
|
|
@click.option("-m", "--model", type=str, default="", help="Model ID to use.")
|
|
@click.option("-i", "--inline", is_flag=True, default=False, help="Textual inline mode.")
|
|
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
|
|
@click.option("--skip-checks", is_flag=True, default=False, help="Skip dependency checks.")
|
|
@click.option("--system-prompt", type=str, default="", help="Override the system prompt.")
|
|
def default(
|
|
prompt: tuple[str, ...], model: str, inline: bool, kv_api: str, skip_checks: bool,
|
|
system_prompt: str,
|
|
) -> None:
|
|
"""Launch the Knowledge Platform TUI (default command)."""
|
|
joined_prompt = " ".join(prompt) if prompt else ""
|
|
create_db_if_not_exists()
|
|
config = _build_config(
|
|
default_model=model or None,
|
|
kv_api_url=kv_api or None,
|
|
system_prompt=system_prompt or None,
|
|
)
|
|
|
|
if not skip_checks:
|
|
check_dependencies_or_die(config)
|
|
|
|
app = KnowledgePlatformApp(config, startup_prompt=joined_prompt)
|
|
app.run(inline=inline)
|
|
|
|
|
|
# ---- check ----
|
|
|
|
|
|
@cli.command()
|
|
@click.option("-m", "--model", type=str, default="", help="Model ID to check against.")
|
|
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
|
|
def check(model: str, kv_api: str) -> None:
|
|
"""Run dependency health checks without launching the TUI."""
|
|
config = _build_config(default_model=model or None, kv_api_url=kv_api or None)
|
|
with console.status("[bold cyan]Checking dependencies...", spinner="dots"):
|
|
results = asyncio.run(_check_dependencies(config))
|
|
try:
|
|
_render_dependency_status(results, config)
|
|
except DependencyError:
|
|
sys.exit(1)
|
|
console.print("[green]All dependencies healthy.[/green]")
|
|
|
|
|
|
# ---- prompt ----
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--tokens", is_flag=True, default=False, help="Show estimated token count.")
|
|
def prompt(tokens: bool) -> None:
|
|
"""Print the active system prompt to stdout."""
|
|
config = _build_config()
|
|
click.echo(config.system_prompt)
|
|
if tokens:
|
|
words = len(config.system_prompt.split())
|
|
est = int(words * 1.3)
|
|
click.echo(f"\n--- {len(config.system_prompt)} chars, ~{words} words, ~{est} tokens ---")
|
|
|
|
|
|
# ---- models ----
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
|
|
def models(as_json: bool) -> None:
|
|
"""List available models."""
|
|
config = _build_config()
|
|
all_models = config.all_models
|
|
|
|
if as_json:
|
|
data = [
|
|
{
|
|
"id": m.id or m.name,
|
|
"name": m.name,
|
|
"display_name": m.display_name,
|
|
"provider": m.provider,
|
|
"api_base": str(m.api_base) if m.api_base else None,
|
|
"temperature": m.temperature,
|
|
}
|
|
for m in all_models
|
|
]
|
|
click.echo(json.dumps(data, indent=2))
|
|
return
|
|
|
|
table = Table(show_header=True, header_style="bold cyan")
|
|
table.add_column("ID", style="bold")
|
|
table.add_column("Display Name")
|
|
table.add_column("Provider", style="dim")
|
|
table.add_column("Endpoint", style="dim")
|
|
for m in all_models:
|
|
default_marker = " *" if (m.id or m.name) == config.default_model else ""
|
|
table.add_row(
|
|
f"{m.id or m.name}{default_marker}",
|
|
m.display_name or m.name,
|
|
m.provider or "—",
|
|
str(m.api_base) if m.api_base else "cloud",
|
|
)
|
|
console.print(table)
|
|
console.print("[dim]* = default model[/dim]")
|
|
|
|
|
|
# ---- ask (one-shot, no TUI) ----
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("question", nargs=-1, required=True)
|
|
@click.option("-m", "--model", type=str, default="", help="Model ID to use.")
|
|
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
|
|
@click.option("--no-context", is_flag=True, default=False, help="Skip KV context injection.")
|
|
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
|
|
@click.option("--system-prompt", type=str, default="", help="Override the system prompt.")
|
|
def ask(
|
|
question: tuple[str, ...], model: str, kv_api: str, no_context: bool, as_json: bool,
|
|
system_prompt: str,
|
|
) -> None:
|
|
"""One-shot query: send a prompt, print the response, exit. No TUI, no chat history.
|
|
|
|
\b
|
|
Examples:
|
|
knowledge-platform ask "what is the pricing model?"
|
|
knowledge-platform ask "verify this claim" --json-output
|
|
knowledge-platform ask "quick check" -m claude-haiku
|
|
"""
|
|
config = _build_config(
|
|
default_model=model or None,
|
|
kv_api_url=kv_api or None,
|
|
system_prompt=system_prompt or None,
|
|
)
|
|
|
|
# Dependency check (fail fast)
|
|
check_dependencies_or_die(config)
|
|
|
|
joined = " ".join(question)
|
|
asyncio.run(_run_ask(config, joined, no_context, as_json))
|
|
|
|
|
|
async def _run_ask(
|
|
config: LaunchConfig, question: str, no_context: bool, as_json: bool
|
|
) -> None:
|
|
"""Execute a one-shot ask: context lookup + LLM stream to stdout."""
|
|
from knowledge_platform.backend.knowledge_backend import KnowledgeBackend
|
|
|
|
backend = KnowledgeBackend(kv_api_url=config.kv_api_url)
|
|
model = config.default_model_object
|
|
|
|
# Fetch KV context
|
|
context: str | None = None
|
|
if not no_context:
|
|
context = await backend.get_context_for_llm(question)
|
|
|
|
messages: list[dict[str, Any]] = [
|
|
{"role": "system", "content": config.system_prompt},
|
|
{"role": "user", "content": question},
|
|
]
|
|
|
|
chunks: list[str] = []
|
|
async for chunk in backend.stream_llm_response(messages=messages, model=model, context=context):
|
|
if as_json:
|
|
chunks.append(chunk)
|
|
else:
|
|
sys.stdout.write(chunk)
|
|
sys.stdout.flush()
|
|
|
|
if as_json:
|
|
full_response = "".join(chunks)
|
|
output = {
|
|
"model": model.id or model.name,
|
|
"question": question,
|
|
"context_injected": context is not None,
|
|
"response": full_response,
|
|
}
|
|
click.echo()
|
|
click.echo(json.dumps(output, indent=2))
|
|
else:
|
|
click.echo()
|
|
|
|
|
|
# ---- search (one-shot KV search) ----
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("query", nargs=-1, required=True)
|
|
@click.option("--limit", type=int, default=5, help="Max results.")
|
|
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
|
|
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
|
|
def search(query: tuple[str, ...], limit: int, kv_api: str, as_json: bool) -> None:
|
|
"""Search the knowledge base from the command line.
|
|
|
|
\b
|
|
Examples:
|
|
knowledge-platform search "pricing model"
|
|
knowledge-platform search "governance" --limit 3 --json-output
|
|
"""
|
|
config = _build_config(kv_api_url=kv_api or None)
|
|
joined = " ".join(query)
|
|
asyncio.run(_run_search(config, joined, limit, as_json))
|
|
|
|
|
|
async def _run_search(config: LaunchConfig, query: str, limit: int, as_json: bool) -> None:
|
|
from knowledge_platform.backend.kv_client import KVClient
|
|
|
|
async with KVClient(base_url=config.kv_api_url) as kv:
|
|
results = await kv.search(query, limit=limit)
|
|
|
|
if as_json:
|
|
click.echo(json.dumps({"query": query, "results": results}, indent=2))
|
|
return
|
|
|
|
if not results:
|
|
console.print(f"[yellow]No results for:[/yellow] {query}")
|
|
return
|
|
|
|
for i, r in enumerate(results, 1):
|
|
title = r.get("title", f"Result {i}")
|
|
content = r.get("content", r.get("text", ""))
|
|
score = r.get("score", 0)
|
|
console.print(f"[bold cyan]{i}.[/bold cyan] {title} [dim](score: {score:.2f})[/dim]")
|
|
if content:
|
|
if len(content) > 300:
|
|
content = content[:300] + "..."
|
|
console.print(f" {content}")
|
|
console.print()
|
|
|
|
|
|
# ---- verify (one-shot claim verification) ----
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("claim", nargs=-1, required=True)
|
|
@click.option("--kv-api", type=str, default="", help="KV API endpoint URL.")
|
|
@click.option("--json-output", "as_json", is_flag=True, default=False, help="Output as JSON.")
|
|
def verify(claim: tuple[str, ...], kv_api: str, as_json: bool) -> None:
|
|
"""Verify a claim against the knowledge base.
|
|
|
|
\b
|
|
Examples:
|
|
knowledge-platform verify "the platform uses blockchain"
|
|
knowledge-platform verify "pricing is subscription-based" --json-output
|
|
"""
|
|
config = _build_config(kv_api_url=kv_api or None)
|
|
joined = " ".join(claim)
|
|
asyncio.run(_run_verify(config, joined, as_json))
|
|
|
|
|
|
async def _run_verify(config: LaunchConfig, claim: str, as_json: bool) -> None:
|
|
from knowledge_platform.backend.kv_client import KVClient
|
|
|
|
async with KVClient(base_url=config.kv_api_url) as kv:
|
|
result = await kv.validate(claim)
|
|
|
|
if as_json:
|
|
click.echo(json.dumps({"claim": claim, **result}, indent=2))
|
|
return
|
|
|
|
verdict = result.get("verdict", "unknown")
|
|
confidence = result.get("confidence", 0)
|
|
explanation = result.get("explanation", "")
|
|
emoji = {"verified": "[green]VERIFIED[/green]", "contradiction": "[red]CONTRADICTION[/red]"}.get(
|
|
verdict, f"[yellow]{verdict.upper()}[/yellow]"
|
|
)
|
|
console.print(f"{emoji} confidence={confidence:.0%}")
|
|
if explanation:
|
|
console.print(f" {explanation}")
|
|
|
|
|
|
# ---- reset ----
|
|
|
|
|
|
@cli.command()
|
|
def reset() -> None:
|
|
"""Reset the database. All conversation history will be lost."""
|
|
from rich.padding import Padding
|
|
from rich.text import Text
|
|
from textwrap import dedent
|
|
|
|
console.print(
|
|
Padding(
|
|
Text.from_markup(
|
|
dedent(f"""\
|
|
[u b red]Warning![/]
|
|
|
|
[b red]This will delete all messages and chats.[/]
|
|
|
|
You may wish to create a backup of \
|
|
"[bold blue u]{str(sqlite_file_name.resolve().absolute())}[/]" before continuing.
|
|
""")
|
|
),
|
|
pad=(1, 2),
|
|
)
|
|
)
|
|
if click.confirm("Delete all chats?", abort=True):
|
|
sqlite_file_name.unlink(missing_ok=True)
|
|
asyncio.run(create_database())
|
|
console.print(f"Database reset @ {sqlite_file_name}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|