855 lines
35 KiB
Python
Executable file
855 lines
35 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Crystal Expert Verification Suite.
|
|
|
|
Tests Crystal's knowledge AI capabilities across three dimensions:
|
|
1. Fact Verification — validates true/false claims via /api/truth/validate
|
|
2. Semantic Search — tests retrieval quality via /api/truth/search
|
|
3. Terminology Correction — tests forbidden term replacement via /api/truth/correct
|
|
|
|
Produces a graded report with per-category scores and an overall grade.
|
|
|
|
Usage:
|
|
python scripts/verify-expertise.py [--kv-api URL] [--verbose] [--json]
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn
|
|
from rich.table import Table
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DEFAULT_KV_API = "http://localhost:41233"
|
|
REQUEST_TIMEOUT = 30.0
|
|
|
|
console = Console()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data model
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class Verdict(Enum):
|
|
PASS = "PASS"
|
|
FAIL = "FAIL"
|
|
SKIP = "SKIP"
|
|
ERROR = "ERROR"
|
|
|
|
|
|
@dataclass
|
|
class TestResult:
|
|
category: str
|
|
name: str
|
|
verdict: Verdict
|
|
score: float # 0.0 - 1.0
|
|
detail: str
|
|
raw_response: dict[str, Any] | None = None
|
|
elapsed_ms: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class CategoryScore:
|
|
name: str
|
|
weight: float
|
|
passed: int = 0
|
|
failed: int = 0
|
|
skipped: int = 0
|
|
errored: int = 0
|
|
total_score: float = 0.0
|
|
results: list[TestResult] = field(default_factory=list)
|
|
|
|
@property
|
|
def total(self) -> int:
|
|
return self.passed + self.failed + self.skipped + self.errored
|
|
|
|
@property
|
|
def pct(self) -> float:
|
|
runnable = self.passed + self.failed
|
|
if runnable == 0:
|
|
return 0.0
|
|
return (self.total_score / runnable) * 100.0
|
|
|
|
@property
|
|
def weighted(self) -> float:
|
|
return self.pct * self.weight
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test definitions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Category 1: Fact Verification — true claims (should be VALID, confidence >= 0.75)
|
|
TRUE_CLAIMS: list[tuple[str, str]] = [
|
|
("Lilith charges 0% commission to creators", "Zero commission is the core economic model"),
|
|
("Creators keep 100% of their earnings on Lilith", "100% take rate for creators"),
|
|
("Lilith is registered in Iceland", "Icelandic jurisdiction"),
|
|
("OnlyFans takes 20% of creator earnings", "Competitor economic fact"),
|
|
("The platform uses GDPR-first privacy design", "Privacy architecture principle"),
|
|
]
|
|
|
|
# Category 1: Fact Verification — false claims (should be INVALID, confidence < 0.75 OR valid=false)
|
|
FALSE_CLAIMS: list[tuple[str, str]] = [
|
|
("Lilith charges 20% commission", "Contradicts zero-commission model"),
|
|
("Lilith uses blockchain for payments", "Blockchain is explicitly forbidden topic"),
|
|
("Creators keep 80% of their earnings", "Wrong take rate — should be 100%"),
|
|
("The platform is based in the United States", "Wrong jurisdiction — Iceland"),
|
|
("Lilith uses NFTs for content verification", "NFTs are explicitly forbidden topic"),
|
|
]
|
|
|
|
# Category 1: Fact Verification — nuanced claims
|
|
NUANCED_CLAIMS: list[tuple[str, bool, str]] = [
|
|
("Chaturbate takes 40% of creator earnings", True, "True — Chaturbate takes 40-50%"),
|
|
("Lilith has end-to-end encrypted messaging", False, "Not yet implemented"),
|
|
("Workers can export their reputation data", True, "Portable reputation is a feature"),
|
|
("The platform requires government ID verification", True, "Identity verification required"),
|
|
("Lilith charges clients a subscription fee", True, "Client-funded subscription model"),
|
|
]
|
|
|
|
# Category 2: Semantic Search — queries and expected topic keywords
|
|
SEARCH_QUERIES: list[tuple[str, list[str]]] = [
|
|
("What is Lilith's revenue model?", ["commission", "0%", "subscription", "earning", "fee", "revenue", "creator"]),
|
|
("How does identity verification work?", ["verif", "identity", "background", "check", "safety"]),
|
|
("What safety features protect workers?", ["safety", "escrow", "verif", "protect", "moderat"]),
|
|
("How do subscription tiers work?", ["subscription", "tier", "premium", "plan", "pricing"]),
|
|
("What is the anti-extraction philosophy?", ["extract", "commission", "creator", "own", "empower"]),
|
|
("How does the booking system work?", ["book", "session", "schedul", "appointment", "calendar"]),
|
|
("What languages does the platform support?", ["language", "locale", "i18n", "translat", "english"]),
|
|
("How does content moderation work?", ["moderat", "content", "review", "report", "flag"]),
|
|
("What is the domain deployment architecture?", ["domain", "deploy", "nginx", "vhost", "infrastructure"]),
|
|
("How do domain events work across features?", ["event", "domain", "publish", "subscribe", "cross-feature"]),
|
|
]
|
|
|
|
# Category 3: Terminology Correction — texts with forbidden terms
|
|
CORRECTION_DIRTY: list[tuple[str, dict[str, str]]] = [
|
|
(
|
|
"The hooker posted her profile",
|
|
{"hooker": "sex worker"},
|
|
),
|
|
(
|
|
"A john booked a session",
|
|
{"john": "client"},
|
|
),
|
|
(
|
|
"Her pimp manages bookings",
|
|
{"pimp": "manager"},
|
|
),
|
|
(
|
|
"The prostitute registered on the platform",
|
|
{"prostitute": "sex worker"},
|
|
),
|
|
(
|
|
"A trick showed up late",
|
|
{"trick": "client"},
|
|
),
|
|
]
|
|
|
|
# Category 3: Terminology Correction — clean texts (should NOT be corrected)
|
|
CORRECTION_CLEAN: list[str] = [
|
|
"The sex worker created her profile on the platform",
|
|
"A client booked a session through the booking system",
|
|
"The manager reviewed the weekly analytics dashboard",
|
|
"Platform subscriptions are available in three tiers",
|
|
"Identity verification is required for all users",
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test runners
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def run_validate(client: httpx.AsyncClient, content: str) -> tuple[dict[str, Any], float]:
|
|
"""Call /api/truth/validate and return (response_json, elapsed_ms)."""
|
|
start = time.monotonic()
|
|
resp = await client.post(
|
|
"/api/truth/validate",
|
|
json={"content": content, "source": "verify-expertise"},
|
|
)
|
|
elapsed = (time.monotonic() - start) * 1000
|
|
resp.raise_for_status()
|
|
return resp.json(), elapsed
|
|
|
|
|
|
async def run_search(client: httpx.AsyncClient, query: str) -> tuple[dict[str, Any], float]:
|
|
"""Call /api/truth/search and return (response_json, elapsed_ms)."""
|
|
start = time.monotonic()
|
|
resp = await client.get(
|
|
"/api/truth/search",
|
|
params={"q": query, "limit": "5"},
|
|
)
|
|
elapsed = (time.monotonic() - start) * 1000
|
|
resp.raise_for_status()
|
|
return resp.json(), elapsed
|
|
|
|
|
|
async def run_correct(client: httpx.AsyncClient, content: str) -> tuple[dict[str, Any], float]:
|
|
"""Call /api/truth/correct and return (response_json, elapsed_ms).
|
|
|
|
The correction endpoint now supports regex fallback when LLM is unavailable,
|
|
so 503 errors should only occur if the semantic validator itself is down.
|
|
"""
|
|
start = time.monotonic()
|
|
resp = await client.post(
|
|
"/api/truth/correct",
|
|
json={"content": content, "useReasoning": False},
|
|
)
|
|
elapsed = (time.monotonic() - start) * 1000
|
|
resp.raise_for_status()
|
|
return resp.json(), elapsed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Evaluation logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def evaluate_true_claim(data: dict[str, Any]) -> tuple[Verdict, float, str]:
|
|
"""Evaluate a claim expected to be VALID with high confidence."""
|
|
valid = data.get("valid", False)
|
|
confidence = data.get("confidence", 0.0)
|
|
mode = data.get("mode", "semantic")
|
|
assertion = data.get("assertionMatch", {}).get("explanation", "")
|
|
|
|
detail_suffix = f" mode={mode}" if mode != "semantic" else ""
|
|
if assertion:
|
|
detail_suffix += f" [{assertion}]"
|
|
|
|
if valid and confidence >= 0.75:
|
|
return Verdict.PASS, 1.0, f"VALID confidence={confidence:.2f}{detail_suffix}"
|
|
if valid and confidence >= 0.5:
|
|
return Verdict.PASS, 0.5, f"VALID but low confidence={confidence:.2f}{detail_suffix}"
|
|
if valid:
|
|
return Verdict.FAIL, 0.25, f"VALID but very low confidence={confidence:.2f}{detail_suffix}"
|
|
return Verdict.FAIL, 0.0, f"INVALID (expected VALID) confidence={confidence:.2f}{detail_suffix}"
|
|
|
|
|
|
def evaluate_false_claim(data: dict[str, Any]) -> tuple[Verdict, float, str]:
|
|
"""Evaluate a claim expected to be INVALID."""
|
|
valid = data.get("valid", False)
|
|
confidence = data.get("confidence", 0.0)
|
|
mode = data.get("mode", "semantic")
|
|
assertion = data.get("assertionMatch", {}).get("explanation", "")
|
|
issues = data.get("issues", [])
|
|
|
|
detail_suffix = f" mode={mode}" if mode != "semantic" else ""
|
|
if assertion:
|
|
detail_suffix += f" [{assertion}]"
|
|
if issues:
|
|
detail_suffix += f" issues={len(issues)}"
|
|
|
|
if not valid:
|
|
return Verdict.PASS, 1.0, f"INVALID confidence={confidence:.2f}{detail_suffix}"
|
|
# Valid but low confidence — partial credit (the system is uncertain)
|
|
if confidence < 0.5:
|
|
return Verdict.PASS, 0.5, f"VALID but uncertain confidence={confidence:.2f}{detail_suffix}"
|
|
return Verdict.FAIL, 0.0, f"VALID (expected INVALID) confidence={confidence:.2f}{detail_suffix}"
|
|
|
|
|
|
def evaluate_nuanced_claim(
|
|
data: dict[str, Any], expected_valid: bool,
|
|
) -> tuple[Verdict, float, str]:
|
|
"""Evaluate a nuanced claim with expected validity."""
|
|
valid = data.get("valid", False)
|
|
confidence = data.get("confidence", 0.0)
|
|
mode = data.get("mode", "semantic")
|
|
assertion = data.get("assertionMatch", {}).get("explanation", "")
|
|
|
|
detail_suffix = f" mode={mode}" if mode != "semantic" else ""
|
|
if assertion:
|
|
detail_suffix += f" [{assertion}]"
|
|
|
|
if valid == expected_valid:
|
|
score = min(1.0, confidence) if confidence >= 0.5 else 0.5
|
|
return Verdict.PASS, score, f"{'VALID' if valid else 'INVALID'} confidence={confidence:.2f}{detail_suffix}"
|
|
return Verdict.FAIL, 0.0, (
|
|
f"{'VALID' if valid else 'INVALID'} "
|
|
f"(expected {'VALID' if expected_valid else 'INVALID'}) "
|
|
f"confidence={confidence:.2f}{detail_suffix}"
|
|
)
|
|
|
|
|
|
def evaluate_search(data: dict[str, Any], topic_keywords: list[str]) -> tuple[Verdict, float, str]:
|
|
"""Evaluate semantic search results for relevance."""
|
|
results = data.get("results", [])
|
|
if not results:
|
|
return Verdict.FAIL, 0.0, "No results returned"
|
|
|
|
top_result = results[0]
|
|
top_score = top_result.get("score", 0.0)
|
|
excerpt = top_result.get("excerpt", top_result.get("text", "")).lower()
|
|
path = top_result.get("path", "").lower()
|
|
|
|
# Check if any topic keyword appears in top-3 results
|
|
combined_text = ""
|
|
for r in results[:3]:
|
|
combined_text += " " + r.get("excerpt", r.get("text", "")).lower()
|
|
combined_text += " " + r.get("path", "").lower()
|
|
|
|
keyword_hits = sum(1 for kw in topic_keywords if kw.lower() in combined_text)
|
|
keyword_ratio = keyword_hits / len(topic_keywords) if topic_keywords else 0.0
|
|
|
|
# Score: 50% from top result relevance score, 50% from keyword coverage
|
|
score_component = min(1.0, top_score / 0.7) * 0.5 # normalize to 0.7 threshold
|
|
keyword_component = keyword_ratio * 0.5
|
|
final_score = score_component + keyword_component
|
|
|
|
if top_score >= 0.7 and keyword_ratio >= 0.3:
|
|
return Verdict.PASS, final_score, (
|
|
f"top_score={top_score:.2f}, keywords={keyword_hits}/{len(topic_keywords)}"
|
|
)
|
|
if top_score >= 0.5 or keyword_ratio >= 0.2:
|
|
return Verdict.PASS, final_score * 0.7, (
|
|
f"partial: top_score={top_score:.2f}, keywords={keyword_hits}/{len(topic_keywords)}"
|
|
)
|
|
return Verdict.FAIL, final_score * 0.3, (
|
|
f"poor: top_score={top_score:.2f}, keywords={keyword_hits}/{len(topic_keywords)}"
|
|
)
|
|
|
|
|
|
def evaluate_dirty_correction(
|
|
data: dict[str, Any], expected_replacements: dict[str, str], original: str,
|
|
) -> tuple[Verdict, float, str]:
|
|
"""Evaluate whether forbidden terms were correctly replaced."""
|
|
corrected = data.get("corrected", "")
|
|
changes = data.get("changes", [])
|
|
success = data.get("success", False)
|
|
|
|
if not success and not corrected:
|
|
# Check if the changes list captures the expected replacements
|
|
if not changes:
|
|
return Verdict.FAIL, 0.0, "No corrections applied"
|
|
|
|
# Check the corrected text for forbidden terms still present
|
|
corrected_lower = corrected.lower() if corrected else ""
|
|
replaced_count = 0
|
|
total_expected = len(expected_replacements)
|
|
|
|
for forbidden, replacement in expected_replacements.items():
|
|
# Check if the forbidden term is gone from corrected text
|
|
pattern = re.compile(rf"\b{re.escape(forbidden)}s?\b", re.IGNORECASE)
|
|
if corrected and not pattern.search(corrected):
|
|
replaced_count += 1
|
|
elif changes and isinstance(changes, list):
|
|
# Fall back to checking the changes list
|
|
for change in changes:
|
|
if not isinstance(change, dict):
|
|
continue
|
|
orig = change.get("original", "").lower()
|
|
repl = change.get("replacement", "").lower()
|
|
if forbidden.lower() in orig and replacement.lower() in repl:
|
|
replaced_count += 1
|
|
break
|
|
|
|
score = replaced_count / total_expected if total_expected > 0 else 0.0
|
|
|
|
if replaced_count == total_expected:
|
|
return Verdict.PASS, 1.0, f"All {total_expected} term(s) corrected"
|
|
if replaced_count > 0:
|
|
return Verdict.PASS, score, f"{replaced_count}/{total_expected} term(s) corrected"
|
|
return Verdict.FAIL, 0.0, f"0/{total_expected} term(s) corrected"
|
|
|
|
|
|
def evaluate_clean_correction(data: dict[str, Any], original: str) -> tuple[Verdict, float, str]:
|
|
"""Evaluate that clean text is not incorrectly modified."""
|
|
corrected = data.get("corrected", "")
|
|
changes = data.get("changes", [])
|
|
success = data.get("success", False)
|
|
|
|
# No changes is ideal
|
|
if not changes or (isinstance(changes, list) and len(changes) == 0):
|
|
return Verdict.PASS, 1.0, "No false corrections"
|
|
|
|
# If corrected text matches original, it's fine even if changes list is non-empty
|
|
if corrected and corrected.strip() == original.strip():
|
|
return Verdict.PASS, 1.0, "Corrected text matches original"
|
|
|
|
# If there's no corrected text and success is false, also fine
|
|
if not success and not corrected:
|
|
return Verdict.PASS, 1.0, "Service reported no corrections needed"
|
|
|
|
# Otherwise, a false positive correction occurred
|
|
change_count = len(changes) if isinstance(changes, list) else 0
|
|
return Verdict.FAIL, 0.0, f"False positive: {change_count} spurious correction(s)"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main test orchestrator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def run_all_tests(
|
|
kv_api_url: str, verbose: bool,
|
|
) -> list[CategoryScore]:
|
|
"""Execute all test categories and return scored results."""
|
|
categories: list[CategoryScore] = [
|
|
CategoryScore(name="Fact Verification (True Claims)", weight=0.20),
|
|
CategoryScore(name="Fact Verification (False Claims)", weight=0.20),
|
|
CategoryScore(name="Fact Verification (Nuanced)", weight=0.10),
|
|
CategoryScore(name="Semantic Search Quality", weight=0.30),
|
|
CategoryScore(name="Terminology Correction", weight=0.20),
|
|
]
|
|
cat_true, cat_false, cat_nuanced, cat_search, cat_correction = categories
|
|
|
|
async with httpx.AsyncClient(base_url=kv_api_url, timeout=REQUEST_TIMEOUT) as client:
|
|
# ── Health check ──────────────────────────────────────────────
|
|
try:
|
|
health_resp = await client.get("/api/truth/health")
|
|
health_data = health_resp.json()
|
|
status = health_data.get("status", "unknown")
|
|
indexed = health_data.get("indexed", False)
|
|
console.print(f" KV API: [green]connected[/green] (status={status}, indexed={indexed})")
|
|
except httpx.ConnectError:
|
|
console.print(f" [bold red]Cannot connect to KV API at {kv_api_url}[/bold red]")
|
|
console.print(" Start Crystal with: ./run start")
|
|
sys.exit(1)
|
|
except httpx.HTTPStatusError as exc:
|
|
if exc.response.status_code == 503:
|
|
console.print(" [yellow]KV API available but semantic validator not ready[/yellow]")
|
|
console.print(" [dim]Search and validation tests may fail[/dim]")
|
|
else:
|
|
console.print(f" [red]KV API returned {exc.response.status_code}[/red]")
|
|
sys.exit(1)
|
|
|
|
# Check LLM availability for correction tests
|
|
llm_available = False
|
|
try:
|
|
llm_resp = await client.get("/api/truth/llm/health")
|
|
if llm_resp.status_code == 200:
|
|
llm_data = llm_resp.json()
|
|
llm_available = llm_data.get("available", False)
|
|
if llm_available:
|
|
console.print(" LLM: [green]available[/green]")
|
|
else:
|
|
console.print(" LLM: [yellow]not available[/yellow] (regex-only corrections)")
|
|
else:
|
|
console.print(" LLM: [yellow]not available[/yellow] (regex-only corrections)")
|
|
except (httpx.ConnectError, httpx.HTTPStatusError):
|
|
console.print(" LLM: [yellow]not available[/yellow] (regex-only corrections)")
|
|
|
|
console.print()
|
|
|
|
# ── Category 1a: True claims ─────────────────────────────────
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
console=console,
|
|
) as progress:
|
|
task = progress.add_task("Testing true claims...", total=len(TRUE_CLAIMS))
|
|
|
|
for claim, description in TRUE_CLAIMS:
|
|
try:
|
|
data, elapsed = await run_validate(client, claim)
|
|
verdict, score, detail = evaluate_true_claim(data)
|
|
result = TestResult(
|
|
category="true_claim", name=claim, verdict=verdict,
|
|
score=score, detail=detail, raw_response=data, elapsed_ms=elapsed,
|
|
)
|
|
except httpx.HTTPStatusError as exc:
|
|
result = TestResult(
|
|
category="true_claim", name=claim, verdict=Verdict.ERROR,
|
|
score=0.0, detail=f"HTTP {exc.response.status_code}",
|
|
)
|
|
except httpx.ConnectError:
|
|
result = TestResult(
|
|
category="true_claim", name=claim, verdict=Verdict.ERROR,
|
|
score=0.0, detail="Connection failed",
|
|
)
|
|
|
|
cat_true.results.append(result)
|
|
if result.verdict == Verdict.PASS:
|
|
cat_true.passed += 1
|
|
elif result.verdict == Verdict.ERROR:
|
|
cat_true.errored += 1
|
|
else:
|
|
cat_true.failed += 1
|
|
cat_true.total_score += result.score
|
|
progress.advance(task)
|
|
|
|
# ── Category 1b: False claims ────────────────────────────────
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
console=console,
|
|
) as progress:
|
|
task = progress.add_task("Testing false claims...", total=len(FALSE_CLAIMS))
|
|
|
|
for claim, description in FALSE_CLAIMS:
|
|
try:
|
|
data, elapsed = await run_validate(client, claim)
|
|
verdict, score, detail = evaluate_false_claim(data)
|
|
result = TestResult(
|
|
category="false_claim", name=claim, verdict=verdict,
|
|
score=score, detail=detail, raw_response=data, elapsed_ms=elapsed,
|
|
)
|
|
except httpx.HTTPStatusError as exc:
|
|
result = TestResult(
|
|
category="false_claim", name=claim, verdict=Verdict.ERROR,
|
|
score=0.0, detail=f"HTTP {exc.response.status_code}",
|
|
)
|
|
except httpx.ConnectError:
|
|
result = TestResult(
|
|
category="false_claim", name=claim, verdict=Verdict.ERROR,
|
|
score=0.0, detail="Connection failed",
|
|
)
|
|
|
|
cat_false.results.append(result)
|
|
if result.verdict == Verdict.PASS:
|
|
cat_false.passed += 1
|
|
elif result.verdict == Verdict.ERROR:
|
|
cat_false.errored += 1
|
|
else:
|
|
cat_false.failed += 1
|
|
cat_false.total_score += result.score
|
|
progress.advance(task)
|
|
|
|
# ── Category 1c: Nuanced claims ──────────────────────────────
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
console=console,
|
|
) as progress:
|
|
task = progress.add_task("Testing nuanced claims...", total=len(NUANCED_CLAIMS))
|
|
|
|
for claim, expected_valid, description in NUANCED_CLAIMS:
|
|
try:
|
|
data, elapsed = await run_validate(client, claim)
|
|
verdict, score, detail = evaluate_nuanced_claim(data, expected_valid)
|
|
result = TestResult(
|
|
category="nuanced_claim", name=claim, verdict=verdict,
|
|
score=score, detail=detail, raw_response=data, elapsed_ms=elapsed,
|
|
)
|
|
except httpx.HTTPStatusError as exc:
|
|
result = TestResult(
|
|
category="nuanced_claim", name=claim, verdict=Verdict.ERROR,
|
|
score=0.0, detail=f"HTTP {exc.response.status_code}",
|
|
)
|
|
except httpx.ConnectError:
|
|
result = TestResult(
|
|
category="nuanced_claim", name=claim, verdict=Verdict.ERROR,
|
|
score=0.0, detail="Connection failed",
|
|
)
|
|
|
|
cat_nuanced.results.append(result)
|
|
if result.verdict == Verdict.PASS:
|
|
cat_nuanced.passed += 1
|
|
elif result.verdict == Verdict.ERROR:
|
|
cat_nuanced.errored += 1
|
|
else:
|
|
cat_nuanced.failed += 1
|
|
cat_nuanced.total_score += result.score
|
|
progress.advance(task)
|
|
|
|
# ── Category 2: Semantic Search ──────────────────────────────
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
console=console,
|
|
) as progress:
|
|
task = progress.add_task("Testing semantic search...", total=len(SEARCH_QUERIES))
|
|
|
|
for query, keywords in SEARCH_QUERIES:
|
|
try:
|
|
data, elapsed = await run_search(client, query)
|
|
verdict, score, detail = evaluate_search(data, keywords)
|
|
result = TestResult(
|
|
category="search", name=query, verdict=verdict,
|
|
score=score, detail=detail, raw_response=data, elapsed_ms=elapsed,
|
|
)
|
|
except httpx.HTTPStatusError as exc:
|
|
result = TestResult(
|
|
category="search", name=query, verdict=Verdict.ERROR,
|
|
score=0.0, detail=f"HTTP {exc.response.status_code}",
|
|
)
|
|
except httpx.ConnectError:
|
|
result = TestResult(
|
|
category="search", name=query, verdict=Verdict.ERROR,
|
|
score=0.0, detail="Connection failed",
|
|
)
|
|
|
|
cat_search.results.append(result)
|
|
if result.verdict == Verdict.PASS:
|
|
cat_search.passed += 1
|
|
elif result.verdict == Verdict.ERROR:
|
|
cat_search.errored += 1
|
|
else:
|
|
cat_search.failed += 1
|
|
cat_search.total_score += result.score
|
|
progress.advance(task)
|
|
|
|
# ── Category 3: Terminology Correction ───────────────────────
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
console=console,
|
|
) as progress:
|
|
total_corrections = len(CORRECTION_DIRTY) + len(CORRECTION_CLEAN)
|
|
task = progress.add_task("Testing terminology corrections...", total=total_corrections)
|
|
|
|
# Dirty texts (should be corrected)
|
|
for text, expected_replacements in CORRECTION_DIRTY:
|
|
try:
|
|
data, elapsed = await run_correct(client, text)
|
|
verdict, score, detail = evaluate_dirty_correction(
|
|
data, expected_replacements, text,
|
|
)
|
|
result = TestResult(
|
|
category="correction_dirty", name=text, verdict=verdict,
|
|
score=score, detail=detail, raw_response=data, elapsed_ms=elapsed,
|
|
)
|
|
except httpx.HTTPStatusError as exc:
|
|
result = TestResult(
|
|
category="correction_dirty", name=text, verdict=Verdict.ERROR,
|
|
score=0.0, detail=f"HTTP {exc.response.status_code}",
|
|
)
|
|
except httpx.ConnectError:
|
|
result = TestResult(
|
|
category="correction_dirty", name=text, verdict=Verdict.ERROR,
|
|
score=0.0, detail="Connection failed",
|
|
)
|
|
|
|
cat_correction.results.append(result)
|
|
if result.verdict == Verdict.PASS:
|
|
cat_correction.passed += 1
|
|
elif result.verdict == Verdict.ERROR:
|
|
cat_correction.errored += 1
|
|
else:
|
|
cat_correction.failed += 1
|
|
cat_correction.total_score += result.score
|
|
progress.advance(task)
|
|
|
|
# Clean texts (should NOT be corrected)
|
|
for text in CORRECTION_CLEAN:
|
|
try:
|
|
data, elapsed = await run_correct(client, text)
|
|
verdict, score, detail = evaluate_clean_correction(data, text)
|
|
result = TestResult(
|
|
category="correction_clean", name=text, verdict=verdict,
|
|
score=score, detail=detail, raw_response=data, elapsed_ms=elapsed,
|
|
)
|
|
except httpx.HTTPStatusError as exc:
|
|
result = TestResult(
|
|
category="correction_clean", name=text, verdict=Verdict.ERROR,
|
|
score=0.0, detail=f"HTTP {exc.response.status_code}",
|
|
)
|
|
except httpx.ConnectError:
|
|
result = TestResult(
|
|
category="correction_clean", name=text, verdict=Verdict.ERROR,
|
|
score=0.0, detail="Connection failed",
|
|
)
|
|
|
|
cat_correction.results.append(result)
|
|
if result.verdict == Verdict.PASS:
|
|
cat_correction.passed += 1
|
|
elif result.verdict == Verdict.ERROR:
|
|
cat_correction.errored += 1
|
|
else:
|
|
cat_correction.failed += 1
|
|
cat_correction.total_score += result.score
|
|
progress.advance(task)
|
|
|
|
return categories
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reporting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def compute_grade(weighted_pct: float) -> tuple[str, str]:
|
|
"""Return (grade_label, grade_color) based on weighted percentage."""
|
|
if weighted_pct >= 90.0:
|
|
return "Expert", "bold green"
|
|
if weighted_pct >= 75.0:
|
|
return "Competent", "bold yellow"
|
|
if weighted_pct >= 60.0:
|
|
return "Limited", "bold red"
|
|
return "Insufficient", "bold red"
|
|
|
|
|
|
def print_report(categories: list[CategoryScore], verbose: bool) -> float:
|
|
"""Print the graded report and return the overall weighted percentage."""
|
|
console.print()
|
|
console.rule("[bold cyan]Crystal Expert Verification Report[/bold cyan]")
|
|
console.print()
|
|
|
|
# Per-category detail tables
|
|
for cat in categories:
|
|
status_icon = "[green]PASS[/green]" if cat.pct >= 75 else "[red]FAIL[/red]"
|
|
console.print(
|
|
f"[bold]{cat.name}[/bold] "
|
|
f"{status_icon} "
|
|
f"{cat.pct:.1f}% ({cat.passed}/{cat.total} passed) "
|
|
f"weight={cat.weight:.0%}"
|
|
)
|
|
|
|
if verbose:
|
|
table = Table(show_header=True, header_style="dim", padding=(0, 1))
|
|
table.add_column("Verdict", width=6)
|
|
table.add_column("Score", width=5)
|
|
table.add_column("Test Case", max_width=50)
|
|
table.add_column("Detail", max_width=40)
|
|
table.add_column("ms", width=6, justify="right")
|
|
|
|
for r in cat.results:
|
|
verdict_style = {
|
|
Verdict.PASS: "[green]PASS[/green]",
|
|
Verdict.FAIL: "[red]FAIL[/red]",
|
|
Verdict.SKIP: "[dim]SKIP[/dim]",
|
|
Verdict.ERROR: "[bold red]ERR[/bold red]",
|
|
}[r.verdict]
|
|
|
|
table.add_row(
|
|
verdict_style,
|
|
f"{r.score:.2f}",
|
|
r.name[:50],
|
|
r.detail[:40],
|
|
f"{r.elapsed_ms:.0f}" if r.elapsed_ms else "-",
|
|
)
|
|
|
|
console.print(table)
|
|
console.print()
|
|
|
|
# Summary
|
|
overall_weighted = sum(cat.weighted for cat in categories)
|
|
grade_label, grade_color = compute_grade(overall_weighted)
|
|
|
|
summary_table = Table(show_header=True, header_style="bold", title="Summary")
|
|
summary_table.add_column("Category", min_width=30)
|
|
summary_table.add_column("Score", justify="right")
|
|
summary_table.add_column("Weight", justify="right")
|
|
summary_table.add_column("Weighted", justify="right")
|
|
|
|
for cat in categories:
|
|
summary_table.add_row(
|
|
cat.name,
|
|
f"{cat.pct:.1f}%",
|
|
f"{cat.weight:.0%}",
|
|
f"{cat.weighted:.1f}%",
|
|
)
|
|
|
|
summary_table.add_section()
|
|
summary_table.add_row(
|
|
"[bold]Overall[/bold]",
|
|
"",
|
|
"",
|
|
f"[bold]{overall_weighted:.1f}%[/bold]",
|
|
)
|
|
|
|
console.print(summary_table)
|
|
console.print()
|
|
|
|
console.print(Panel(
|
|
f"[{grade_color}]{grade_label}[/{grade_color}] — {overall_weighted:.1f}%\n\n"
|
|
+ {
|
|
"Expert": "Crystal can reliably replace Claude for platform knowledge queries.",
|
|
"Competent": "Crystal is a useful supplement but not a full Claude replacement.",
|
|
"Limited": "Crystal needs improvement before it can serve as a knowledge expert.",
|
|
"Insufficient": "Crystal is not ready for knowledge expert duties.",
|
|
}[grade_label],
|
|
title="[bold]Grade[/bold]",
|
|
border_style=grade_color.replace("bold ", ""),
|
|
))
|
|
|
|
return overall_weighted
|
|
|
|
|
|
def export_json(categories: list[CategoryScore], overall: float) -> dict[str, Any]:
|
|
"""Build a JSON-serializable report."""
|
|
grade_label, _ = compute_grade(overall)
|
|
return {
|
|
"grade": grade_label,
|
|
"overall_pct": round(overall, 2),
|
|
"categories": [
|
|
{
|
|
"name": cat.name,
|
|
"weight": cat.weight,
|
|
"pct": round(cat.pct, 2),
|
|
"weighted": round(cat.weighted, 2),
|
|
"passed": cat.passed,
|
|
"failed": cat.failed,
|
|
"errored": cat.errored,
|
|
"skipped": cat.skipped,
|
|
"results": [
|
|
{
|
|
"name": r.name,
|
|
"verdict": r.verdict.value,
|
|
"score": round(r.score, 3),
|
|
"detail": r.detail,
|
|
"elapsed_ms": round(r.elapsed_ms, 1),
|
|
}
|
|
for r in cat.results
|
|
],
|
|
}
|
|
for cat in categories
|
|
],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Crystal Expert Verification Suite",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=(
|
|
"Grading scale:\n"
|
|
" 90%+ = Expert (can replace Claude for platform knowledge)\n"
|
|
" 75%+ = Competent (useful supplement, not full replacement)\n"
|
|
" 60%+ = Limited (needs improvement)\n"
|
|
" <60% = Insufficient\n"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--kv-api", default=DEFAULT_KV_API,
|
|
help=f"KV API base URL (default: {DEFAULT_KV_API})",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose", "-v", action="store_true",
|
|
help="Show individual test results",
|
|
)
|
|
parser.add_argument(
|
|
"--json", dest="json_output", action="store_true",
|
|
help="Output JSON report to stdout (suppresses rich output)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.json_output:
|
|
console.print(Panel(
|
|
"[bold cyan]Crystal Expert Verification Suite[/bold cyan]\n\n"
|
|
"Testing Crystal's knowledge AI across three dimensions:\n"
|
|
f" 1. Fact Verification ({len(TRUE_CLAIMS) + len(FALSE_CLAIMS) + len(NUANCED_CLAIMS)} claims)\n"
|
|
f" 2. Semantic Search ({len(SEARCH_QUERIES)} queries)\n"
|
|
f" 3. Terminology ({len(CORRECTION_DIRTY) + len(CORRECTION_CLEAN)} texts)\n\n"
|
|
f"KV API: {args.kv_api}",
|
|
border_style="cyan",
|
|
))
|
|
console.print()
|
|
|
|
categories = await run_all_tests(args.kv_api, args.verbose)
|
|
overall = print_report(categories, args.verbose)
|
|
|
|
if args.json_output:
|
|
report = export_json(categories, overall)
|
|
print(json.dumps(report, indent=2))
|
|
|
|
grade_label, _ = compute_grade(overall)
|
|
sys.exit(0 if grade_label == "Expert" else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|