145 lines
4.7 KiB
Python
145 lines
4.7 KiB
Python
"""Safe tool executor for Crystal's agentic framework.
|
|
|
|
Handles parameter validation, timeout enforcement, error capture,
|
|
and working directory restrictions. All tool invocations go through
|
|
the executor for consistent safety guarantees.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .base import Tool, ToolResult
|
|
from .registry import ToolRegistry
|
|
|
|
|
|
class ToolExecutor:
|
|
"""Sandboxed executor for tool invocations.
|
|
|
|
Enforces:
|
|
- Parameter validation against the tool's JSON schema
|
|
- Configurable execution timeouts
|
|
- Working directory restrictions (optional allowlist)
|
|
- Structured error capture (never raises to the caller)
|
|
|
|
Args:
|
|
registry: ToolRegistry to resolve tool names from.
|
|
default_timeout: Default execution timeout in seconds.
|
|
allowed_roots: Optional list of directory roots that filesystem
|
|
tools are permitted to access. If empty, no restriction is applied.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
registry: ToolRegistry,
|
|
*,
|
|
default_timeout: float = 30.0,
|
|
allowed_roots: list[Path] | None = None,
|
|
) -> None:
|
|
self._registry = registry
|
|
self._default_timeout = default_timeout
|
|
self._allowed_roots = [r.resolve() for r in allowed_roots] if allowed_roots else []
|
|
|
|
async def execute(
|
|
self,
|
|
tool_name: str,
|
|
parameters: dict[str, Any],
|
|
*,
|
|
timeout: float | None = None,
|
|
) -> ToolResult:
|
|
"""Execute a tool by name with the given parameters.
|
|
|
|
Args:
|
|
tool_name: Registered tool name.
|
|
parameters: Tool parameters (validated before execution).
|
|
timeout: Override the default timeout for this invocation.
|
|
|
|
Returns:
|
|
ToolResult — always returns, never raises.
|
|
"""
|
|
effective_timeout = timeout if timeout is not None else self._default_timeout
|
|
|
|
# Resolve tool
|
|
tool = self._registry.get(tool_name)
|
|
if tool is None:
|
|
available = ", ".join(self._registry.list_names()) or "(none)"
|
|
return ToolResult.fail(
|
|
f"Unknown tool: {tool_name}. Available: {available}",
|
|
tool=tool_name,
|
|
)
|
|
|
|
# Validate parameters
|
|
validation_errors = tool.validate_parameters(parameters)
|
|
if validation_errors:
|
|
return ToolResult.fail(
|
|
f"Parameter validation failed: {'; '.join(validation_errors)}",
|
|
tool=tool_name,
|
|
)
|
|
|
|
# Check path restrictions for any path-like parameters
|
|
path_error = self._check_path_restrictions(parameters)
|
|
if path_error is not None:
|
|
return ToolResult.fail(path_error, tool=tool_name)
|
|
|
|
# Execute with timeout
|
|
start = time.monotonic()
|
|
try:
|
|
result = await asyncio.wait_for(
|
|
tool.execute(**parameters),
|
|
timeout=effective_timeout,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
return ToolResult.timed_out(
|
|
effective_timeout,
|
|
tool=tool_name,
|
|
elapsed=effective_timeout,
|
|
)
|
|
except Exception as exc:
|
|
return ToolResult.fail(
|
|
f"{type(exc).__name__}: {exc}",
|
|
tool=tool_name,
|
|
elapsed=time.monotonic() - start,
|
|
)
|
|
|
|
# Inject timing metadata
|
|
result.metadata["tool"] = tool_name
|
|
result.metadata["elapsed"] = round(time.monotonic() - start, 4)
|
|
|
|
return result
|
|
|
|
def _check_path_restrictions(self, parameters: dict[str, Any]) -> str | None:
|
|
"""Check that any path-like parameters fall within allowed roots.
|
|
|
|
Inspects string values that look like absolute paths. Returns an
|
|
error message if a violation is found, or None if everything is fine.
|
|
"""
|
|
if not self._allowed_roots:
|
|
return None
|
|
|
|
for key, value in parameters.items():
|
|
if not isinstance(value, str):
|
|
continue
|
|
if not value.startswith("/"):
|
|
continue
|
|
|
|
resolved = Path(value).resolve()
|
|
if not any(self._is_within(resolved, root) for root in self._allowed_roots):
|
|
allowed = ", ".join(str(r) for r in self._allowed_roots)
|
|
return (
|
|
f"Path '{value}' (parameter '{key}') is outside "
|
|
f"allowed roots: {allowed}"
|
|
)
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _is_within(path: Path, root: Path) -> bool:
|
|
"""Check whether path is equal to or a descendant of root."""
|
|
try:
|
|
path.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
return False
|