ml-knowledge-platform/knowledge_platform/tools/executor.py
2026-02-16 04:50:51 -08:00

145 lines
4.7 KiB
Python

"""Safe tool executor for Crystal's agentic framework.
Handles parameter validation, timeout enforcement, error capture,
and working directory restrictions. All tool invocations go through
the executor for consistent safety guarantees.
"""
from __future__ import annotations
import asyncio
import time
from pathlib import Path
from typing import Any
from .base import Tool, ToolResult
from .registry import ToolRegistry
class ToolExecutor:
"""Sandboxed executor for tool invocations.
Enforces:
- Parameter validation against the tool's JSON schema
- Configurable execution timeouts
- Working directory restrictions (optional allowlist)
- Structured error capture (never raises to the caller)
Args:
registry: ToolRegistry to resolve tool names from.
default_timeout: Default execution timeout in seconds.
allowed_roots: Optional list of directory roots that filesystem
tools are permitted to access. If empty, no restriction is applied.
"""
def __init__(
self,
registry: ToolRegistry,
*,
default_timeout: float = 30.0,
allowed_roots: list[Path] | None = None,
) -> None:
self._registry = registry
self._default_timeout = default_timeout
self._allowed_roots = [r.resolve() for r in allowed_roots] if allowed_roots else []
async def execute(
self,
tool_name: str,
parameters: dict[str, Any],
*,
timeout: float | None = None,
) -> ToolResult:
"""Execute a tool by name with the given parameters.
Args:
tool_name: Registered tool name.
parameters: Tool parameters (validated before execution).
timeout: Override the default timeout for this invocation.
Returns:
ToolResult — always returns, never raises.
"""
effective_timeout = timeout if timeout is not None else self._default_timeout
# Resolve tool
tool = self._registry.get(tool_name)
if tool is None:
available = ", ".join(self._registry.list_names()) or "(none)"
return ToolResult.fail(
f"Unknown tool: {tool_name}. Available: {available}",
tool=tool_name,
)
# Validate parameters
validation_errors = tool.validate_parameters(parameters)
if validation_errors:
return ToolResult.fail(
f"Parameter validation failed: {'; '.join(validation_errors)}",
tool=tool_name,
)
# Check path restrictions for any path-like parameters
path_error = self._check_path_restrictions(parameters)
if path_error is not None:
return ToolResult.fail(path_error, tool=tool_name)
# Execute with timeout
start = time.monotonic()
try:
result = await asyncio.wait_for(
tool.execute(**parameters),
timeout=effective_timeout,
)
except asyncio.TimeoutError:
return ToolResult.timed_out(
effective_timeout,
tool=tool_name,
elapsed=effective_timeout,
)
except Exception as exc:
return ToolResult.fail(
f"{type(exc).__name__}: {exc}",
tool=tool_name,
elapsed=time.monotonic() - start,
)
# Inject timing metadata
result.metadata["tool"] = tool_name
result.metadata["elapsed"] = round(time.monotonic() - start, 4)
return result
def _check_path_restrictions(self, parameters: dict[str, Any]) -> str | None:
"""Check that any path-like parameters fall within allowed roots.
Inspects string values that look like absolute paths. Returns an
error message if a violation is found, or None if everything is fine.
"""
if not self._allowed_roots:
return None
for key, value in parameters.items():
if not isinstance(value, str):
continue
if not value.startswith("/"):
continue
resolved = Path(value).resolve()
if not any(self._is_within(resolved, root) for root in self._allowed_roots):
allowed = ", ".join(str(r) for r in self._allowed_roots)
return (
f"Path '{value}' (parameter '{key}') is outside "
f"allowed roots: {allowed}"
)
return None
@staticmethod
def _is_within(path: Path, root: Path) -> bool:
"""Check whether path is equal to or a descendant of root."""
try:
path.relative_to(root)
return True
except ValueError:
return False