"""ValidateBatchTool -- batch validation of multiple content items. Wraps the KV API ``/api/truth/validate/batch`` endpoint, enabling efficient validation of multiple content items in a single request with skip-if-valid optimization. """ from __future__ import annotations import os from typing import Any, ClassVar import httpx from ..base import Tool, ToolParameter, ToolResult # Read from KV_API_URL environment variable, fallback to localhost _DEFAULT_KV_API_URL = os.environ.get("KV_API_URL", "http://localhost:41233") _REQUEST_TIMEOUT = 60.0 # Batch operations may take longer class ValidateBatchTool(Tool): """Validate multiple content items against the knowledge base in one call. Submits an array of content items to the KV API batch validation endpoint, which returns validation results for each item with skip-if-valid optimization for cached results. """ name: ClassVar[str] = "validate_batch" description: ClassVar[str] = ( "Validate multiple content items against the Lilith platform knowledge base " "in a single request. Returns validation results (valid flag, confidence score, " "cached status) for each item. Use this to efficiently check multiple claims " "or locale strings at once." ) parameters: ClassVar[list[ToolParameter]] = [ ToolParameter( name="items", type="array", description=( "Array of content items to validate. Each item should be an object " "with 'content' (string, required), 'subjects' (array of strings, optional), " "and 'source' (string, optional)." ), items={ "type": "object", "properties": { "content": {"type": "string"}, "subjects": {"type": "array", "items": {"type": "string"}}, "source": {"type": "string"}, }, "required": ["content"], }, ), ToolParameter( name="kv_api_url", type="string", description="KV API base URL", required=False, default=_DEFAULT_KV_API_URL, ), ] async def execute(self, **kwargs: Any) -> ToolResult: items: list[dict[str, Any]] = kwargs["items"] kv_api_url: str = kwargs.get("kv_api_url", _DEFAULT_KV_API_URL) if not items: return ToolResult.fail("At least one item is required for batch validation") if len(items) > 100: return ToolResult.fail( f"Batch size too large ({len(items)} items). Maximum is 100." ) async with httpx.AsyncClient( base_url=kv_api_url, timeout=_REQUEST_TIMEOUT ) as client: try: resp = await client.post( "/api/truth/validate/batch", json={"items": items}, ) except httpx.ConnectError: return ToolResult.fail( f"Cannot connect to KV API at {kv_api_url}. " "Start it with: cd codebase/tools/platform-knowledge-ai && ./run start" ) except httpx.TimeoutException: return ToolResult.fail( f"KV API batch validation timed out after {_REQUEST_TIMEOUT}s. " "Try reducing the batch size." ) if resp.status_code == 503: return ToolResult.fail( "Validator service not ready. " "The embedding model may still be loading -- try again shortly." ) if resp.status_code == 400: error_data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {} error_msg = error_data.get("error", "Invalid request") return ToolResult.fail(f"Bad request: {error_msg}") if resp.status_code >= 500: return ToolResult.fail( f"KV API internal error (HTTP {resp.status_code}). " "Check service logs for details." ) if resp.status_code != 200: return ToolResult.fail( f"KV API returned unexpected status {resp.status_code}" ) data = resp.json() results = data.get("results", []) all_valid = data.get("all_valid", True) total_time_ms = data.get("total_time_ms", 0) # Format results for consistent output formatted_results: list[dict[str, Any]] = [] for idx, result in enumerate(results): formatted_results.append({ "index": idx, "valid": result.get("valid", False), "confidence": result.get("confidence", 0.0), "cached": result.get("cached", False), }) return ToolResult.success( { "results": formatted_results, "total_items": len(results), "all_valid": all_valid, "total_time_ms": total_time_ms, }, total_items=len(results), all_valid=all_valid, )