"""Safe tool executor for Crystal's agentic framework. Handles parameter validation, timeout enforcement, error capture, and working directory restrictions. All tool invocations go through the executor for consistent safety guarantees. """ from __future__ import annotations import asyncio import time from pathlib import Path from typing import Any from .base import Tool, ToolResult from .registry import ToolRegistry class ToolExecutor: """Sandboxed executor for tool invocations. Enforces: - Parameter validation against the tool's JSON schema - Configurable execution timeouts - Working directory restrictions (optional allowlist) - Structured error capture (never raises to the caller) Args: registry: ToolRegistry to resolve tool names from. default_timeout: Default execution timeout in seconds. allowed_roots: Optional list of directory roots that filesystem tools are permitted to access. If empty, no restriction is applied. """ def __init__( self, registry: ToolRegistry, *, default_timeout: float = 30.0, allowed_roots: list[Path] | None = None, ) -> None: self._registry = registry self._default_timeout = default_timeout self._allowed_roots = [r.resolve() for r in allowed_roots] if allowed_roots else [] async def execute( self, tool_name: str, parameters: dict[str, Any], *, timeout: float | None = None, ) -> ToolResult: """Execute a tool by name with the given parameters. Args: tool_name: Registered tool name. parameters: Tool parameters (validated before execution). timeout: Override the default timeout for this invocation. Returns: ToolResult — always returns, never raises. """ effective_timeout = timeout if timeout is not None else self._default_timeout # Resolve tool tool = self._registry.get(tool_name) if tool is None: available = ", ".join(self._registry.list_names()) or "(none)" return ToolResult.fail( f"Unknown tool: {tool_name}. Available: {available}", tool=tool_name, ) # Validate parameters validation_errors = tool.validate_parameters(parameters) if validation_errors: return ToolResult.fail( f"Parameter validation failed: {'; '.join(validation_errors)}", tool=tool_name, ) # Check path restrictions for any path-like parameters path_error = self._check_path_restrictions(parameters) if path_error is not None: return ToolResult.fail(path_error, tool=tool_name) # Execute with timeout start = time.monotonic() try: result = await asyncio.wait_for( tool.execute(**parameters), timeout=effective_timeout, ) except asyncio.TimeoutError: return ToolResult.timed_out( effective_timeout, tool=tool_name, elapsed=effective_timeout, ) except Exception as exc: return ToolResult.fail( f"{type(exc).__name__}: {exc}", tool=tool_name, elapsed=time.monotonic() - start, ) # Inject timing metadata result.metadata["tool"] = tool_name result.metadata["elapsed"] = round(time.monotonic() - start, 4) return result def _check_path_restrictions(self, parameters: dict[str, Any]) -> str | None: """Check that any path-like parameters fall within allowed roots. Inspects string values that look like absolute paths. Returns an error message if a violation is found, or None if everything is fine. """ if not self._allowed_roots: return None for key, value in parameters.items(): if not isinstance(value, str): continue if not value.startswith("/"): continue resolved = Path(value).resolve() if not any(self._is_within(resolved, root) for root in self._allowed_roots): allowed = ", ".join(str(r) for r in self._allowed_roots) return ( f"Path '{value}' (parameter '{key}') is outside " f"allowed roots: {allowed}" ) return None @staticmethod def _is_within(path: Path, root: Path) -> bool: """Check whether path is equal to or a descendant of root.""" try: path.relative_to(root) return True except ValueError: return False