Tools
The ToolManager discovers tools, enforces permissions, and executes them within sandbox constraints.
Protocol
class ToolManager(Protocol): async def discover(self, ctx: ExecContext) -> list[ToolSpec]: ...
async def call(self, tool: str, args: dict[str, object], ctx: ExecContext) -> ToolResult: ...interface ToolManager { discover(ctx: ExecContext): Promise<ToolSpec[]>;
call(tool: string, args: Record<string, unknown>, ctx: ExecContext): Promise<ToolResult>;}type ToolManager interface { Discover(ctx *nctx.ExecContext) ([]ToolSpec, error)
Call(ctx *nctx.ExecContext, name string, args map[string]any) (ToolResult, error)}Value types
@dataclass(frozen=True)class ToolSpec: name: str description: str parameters: dict[str, object] # JSON Schema required_permissions: frozenset[str]
@dataclass(frozen=True)class ToolResult: status: ToolStatus # SUCCESS | ERROR | PERMISSION_DENIED | NOT_FOUND | TIMEOUT output: str error: str | None duration_ms: floatinterface ToolSpec { readonly name: string; readonly description: string; readonly parameters: Readonly<Record<string, unknown>>; // JSON Schema readonly requiredPermissions: ReadonlySet<string>;}
interface ToolResult { readonly status: ToolStatus; // "success" | "error" | "permission_denied" | "not_found" | "timeout" readonly output: string; readonly error: string | null; readonly durationMs: number;}type ToolSpec struct { Name string Description string Parameters map[string]any RequiredPermissions map[string]bool}
type ToolResult struct { Status ToolStatus // ToolSuccess | ToolError | ToolPermissionDenied | ToolNotFound | ToolTimeout Output string Error string DurationMs float64}Strategies
FunctionToolManager
Register plain functions as tools. Schema is auto-extracted from type hints (Python) or provided explicitly (TypeScript, Go).
from nerva.tools.function import FunctionToolManager
tools = FunctionToolManager()
@tools.tool("search_flights", "Search for available flights")async def search_flights(origin: str, destination: str, date: str) -> str: # Call your flight API return f"Found 3 flights from {origin} to {destination} on {date}"
# Discover available tools (filtered by ctx.permissions)specs = await tools.discover(ctx)
# Call a toolresult = await tools.call("search_flights", { "origin": "TLV", "destination": "BER", "date": "2025-04-15",}, ctx)
print(result.status) # ToolStatus.SUCCESSprint(result.output) # "Found 3 flights from TLV to BER on 2025-04-15"print(result.duration_ms) # 23.4Sync functions are automatically offloaded to a thread via asyncio.to_thread.
import { FunctionToolManager } from "nerva/tools/function";
const tools = new FunctionToolManager();
tools.tool( "search_flights", "Search for available flights", { parameters: { type: "object", properties: { origin: { type: "string" }, destination: { type: "string" }, date: { type: "string" }, }, required: ["origin", "destination", "date"], }, }, async (args) => { return `Found 3 flights from ${args.origin} to ${args.destination} on ${args.date}`; },);
// Discover available tools (filtered by ctx.permissions)const specs = await tools.discover(ctx);
// Call a toolconst result = await tools.call("search_flights", { origin: "TLV", destination: "BER", date: "2025-04-15",}, ctx);
console.log(result.status); // "success"console.log(result.output); // "Found 3 flights from TLV to BER on 2025-04-15"console.log(result.durationMs); // 23.4Both sync and async functions are supported — sync return values are awaited as a no-op.
import "github.com/otomus/nerva/go/tools"
mgr := tools.NewFunctionToolManager()
searchFlights := func(origin, destination, date string) (string, error) { return fmt.Sprintf("Found 3 flights from %s to %s on %s", origin, destination, date), nil}
err := mgr.Register("search_flights", "Search for available flights", searchFlights, nil)
// Discover available tools (filtered by ctx.Permissions)specs, err := mgr.Discover(ctx)
// Call a toolresult, err := mgr.Call(ctx, "search_flights", map[string]any{ "arg0": "TLV", "arg1": "BER", "arg2": "2025-04-15",})
fmt.Println(result.Status) // "success"fmt.Println(result.Output) // "Found 3 flights from TLV to BER on 2025-04-15"fmt.Println(result.DurationMs) // 23.4Go uses reflection for parameter extraction. Since Go does not expose parameter names at runtime, arguments are positional (arg0, arg1, …).
MCPToolManager
Connects to MCP (Model Context Protocol) servers. Tools are discovered dynamically, executed in sandboxed server processes.
from nerva.tools.mcp import MCPToolManager, MCPServerConfig
tools = MCPToolManager( servers=[ MCPServerConfig(name="filesystem", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]), MCPServerConfig(name="github", command="npx", args=["-y", "@modelcontextprotocol/server-github"]), ], pool_size=5,)
# Tools from all connected serversspecs = await tools.discover(ctx)result = await tools.call("read_file", {"path": "/tmp/data.json"}, ctx)import { MCPToolManager } from "nerva/tools/mcp";
const tools = new MCPToolManager({ servers: [ { name: "filesystem", command: "npx", args: ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"] }, { name: "github", command: "npx", args: ["-y", "@modelcontextprotocol/server-github"] }, ], poolSize: 5,});
// Tools from all connected serversconst specs = await tools.discover(ctx);const result = await tools.call("read_file", { path: "/tmp/data.json" }, ctx);// Go does not have a built-in MCPToolManager yet.// Use FunctionToolManager to wrap MCP calls, or implement// the ToolManager interface with your own MCP client.CompositeToolManager
Combines multiple tool sources into a single manager. Handles deduplication and priority.
from nerva.tools.composite import CompositeToolManager
tools = CompositeToolManager(managers=[ function_tools, # local functions (highest priority) mcp_tools, # MCP servers (lower priority)])
# Discovers tools from all sourcesspecs = await tools.discover(ctx)import { CompositeToolManager } from "nerva/tools/composite";
const tools = new CompositeToolManager([ functionTools, // local functions (highest priority) mcpTools, // MCP servers (lower priority)]);
// Discovers tools from all sourcesconst specs = await tools.discover(ctx);// Go does not have a built-in CompositeToolManager yet.// Implement the ToolManager interface to aggregate multiple managers.Permissions
Tools support two layers of access control:
Context permissions — checked via ctx.permissions.can_use_tool(name):
ctx = ExecContext.create( user_id="user_1", permissions=Permissions(allowed_tools={"search_flights", "get_weather"}),)
# Only search_flights and get_weather are returnedspecs = await tools.discover(ctx)const ctx = ExecContext.create({ userId: "user_1", permissions: new Permissions({ allowedTools: new Set(["search_flights", "get_weather"]), }),});
// Only search_flights and get_weather are returnedconst specs = await tools.discover(ctx);ctx := nctx.NewExecContext(nctx.ExecContextOptions{ UserID: "user_1", Permissions: &nctx.Permissions{ AllowedTools: map[string]bool{"search_flights": true, "get_weather": true}, },})
// Only search_flights and get_weather are returnedspecs, err := tools.Discover(ctx)Role-based permissions — set per tool at registration:
@tools.tool( "deploy_service", "Deploy a service to production", required_permissions=frozenset({"admin", "devops"}),)async def deploy_service(service: str) -> str: ...tools.tool( "deploy_service", "Deploy a service to production", { requiredPermissions: new Set(["admin", "devops"]) }, async (args) => { // ... },);err := mgr.Register( "deploy_service", "Deploy a service to production", deployServiceFn, map[string]bool{"admin": true, "devops": true},)Both checks must pass. A PERMISSION_DENIED result is returned if either fails.
Sandboxing
MCPToolManager runs tools in isolated server processes with configurable constraints:
- Process isolation — each MCP server runs in its own subprocess
- Connection pooling — LRU pool reuses connections, evicts idle servers
- Timeout enforcement — per-call timeout with clean process termination
- Result size limits — output is truncated if it exceeds the configured maximum