Runtime
The Runtime executes agent handlers, manages their lifecycle, and collects structured output.
Protocol
class AgentRuntime(Protocol): async def invoke(self, handler: str, input: AgentInput, ctx: ExecContext) -> AgentResult: ...
async def invoke_chain(self, handlers: list[str], input: AgentInput, ctx: ExecContext) -> AgentResult: ...
async def delegate(self, handler: str, input: AgentInput, parent_ctx: ExecContext) -> AgentResult: ...interface AgentRuntime { invoke(handler: string, input: AgentInput, ctx: ExecContext): Promise<AgentResult>;
invokeChain(handlers: string[], input: AgentInput, ctx: ExecContext): Promise<AgentResult>;
delegate(handler: string, input: AgentInput, parentCtx: ExecContext): Promise<AgentResult>;}type AgentRuntime interface { Invoke(ctx *nctx.ExecContext, handler string, input AgentInput) (AgentResult, error)
InvokeChain(ctx *nctx.ExecContext, handlers []string, input AgentInput) (AgentResult, error)
Delegate(ctx *nctx.ExecContext, handler string, input AgentInput) (AgentResult, error)}Value types
@dataclass(frozen=True)class AgentInput: message: str args: dict[str, str] tools: list[dict[str, str]] history: list[dict[str, str]]
@dataclassclass AgentResult: status: AgentStatus # SUCCESS | ERROR | TIMEOUT | WRONG_HANDLER | NEEDS_DATA | NEEDS_CREDENTIALS output: str data: dict[str, str] error: str | None handler: strinterface AgentInput { readonly message: string; readonly args: Readonly<Record<string, string>>; readonly tools: ReadonlyArray<Readonly<Record<string, string>>>; readonly history: ReadonlyArray<Readonly<Record<string, string>>>;}
interface AgentResult { readonly status: AgentStatus; // "success" | "error" | "timeout" | "wrong_handler" | "needs_data" | "needs_credentials" readonly output: string; readonly data: Readonly<Record<string, string>>; readonly error: string | null; readonly handler: string;}type AgentInput struct { Message string Args map[string]string Tools []map[string]string History []map[string]string}
type AgentResult struct { Status AgentStatus // StatusSuccess | StatusError | StatusTimeout | StatusWrongHandler | ... Output string Data map[string]string Error string Handler string}Strategies
InProcessRuntime
Runs handlers as async functions in the main process. Fastest, no isolation.
from nerva.runtime.inprocess import InProcessRuntime, InProcessConfigfrom nerva.runtime.circuit_breaker import CircuitBreakerConfig
runtime = InProcessRuntime(config=InProcessConfig( timeout_seconds=30.0, circuit_breaker=CircuitBreakerConfig( failure_threshold=3, recovery_seconds=60.0, ),))
async def my_handler(input: AgentInput, ctx: ExecContext) -> str: return f"Processed: {input.message}"
runtime.register("my_agent", my_handler)result = await runtime.invoke("my_agent", AgentInput(message="hello"), ctx)import { InProcessRuntime } from "nerva/runtime/inprocess";import { createAgentInput } from "nerva/runtime";
const runtime = new InProcessRuntime({ timeoutMs: 30_000, circuitBreaker: { failureThreshold: 3, recoveryMs: 60_000, },});
runtime.register("my_agent", async (input, ctx) => { return `Processed: ${input.message}`;});
const result = await runtime.invoke( "my_agent", createAgentInput("hello"), ctx,);// Go does not have an InProcessRuntime — use SubprocessRuntime instead.// Handler functions are compiled as separate binaries.// See the SubprocessRuntime section below.Streaming handlers use async generators:
async def streaming_handler(input: AgentInput, ctx: ExecContext): for word in input.message.split(): yield word + " "
runtime.register("streaming_agent", streaming_handler)# Chunks are pushed to ctx.stream as they are producedasync function* streamingHandler( input: AgentInput, ctx: ExecContext,): AsyncGenerator<string> { for (const word of input.message.split(" ")) { yield word + " "; }}
runtime.registerStreaming("streaming_agent", streamingHandler);// Chunks are pushed to ctx.stream as they are produced// Go's SubprocessRuntime supports streaming via stdout chunks.// The handler binary writes output incrementally to stdout,// and the runtime pushes chunks to ctx as they arrive.SubprocessRuntime
Runs each handler in a separate process. Process-level isolation, crash protection.
from nerva.runtime.subprocess import SubprocessRuntime, SubprocessConfig
runtime = SubprocessRuntime(SubprocessConfig( handler_dir="./agents", timeout_seconds=30.0,))
result = await runtime.invoke("my_agent", agent_input, ctx)import { SubprocessRuntime } from "nerva/runtime/subprocess";
const runtime = new SubprocessRuntime({ handlerDir: "./agents", timeoutMs: 30_000,});
const result = await runtime.invoke("my_agent", agentInput, ctx);import "github.com/otomus/nerva/go/runtime"
cfg := &runtime.SubprocessConfig{ HandlerDir: "./agents", TimeoutSeconds: 30.0,}rt := runtime.NewSubprocessRuntime(cfg)
result, err := rt.Invoke(ctx, "my_agent", agentInput)When to use: Production, untrusted agent code, agents with conflicting dependencies.
ContainerRuntime
Runs handlers in Docker or Firecracker containers. Maximum isolation.
from nerva.runtime.container import ContainerRuntime, ContainerHandlerConfig
runtime = ContainerRuntime()runtime.register("my_agent", ContainerHandlerConfig( image="my-agent:latest", memory_limit="256m", cpu_limit="1.0", network_mode="none",))
result = await runtime.invoke("my_agent", agent_input, ctx)import { ContainerRuntime } from "nerva/runtime/container";
const runtime = new ContainerRuntime({ timeoutMs: 60_000 });runtime.register("my_agent", { image: "my-agent:latest", memoryLimit: "256m", cpuLimit: "1.0", networkMode: "none",});
const result = await runtime.invoke("my_agent", agentInput, ctx);// Go does not have a dedicated ContainerRuntime yet.// Use SubprocessRuntime with "docker" as the handler command,// or wrap docker invocations in your handler binary.When to use: Multi-tenant systems, agents running user-supplied code.
Circuit breaker
Every runtime strategy includes per-handler circuit breakers:
CLOSED (normal) --[3 consecutive failures]--> OPEN (rejecting) | [60s recovery] | HALF_OPEN (probe) | success -> CLOSED | failure -> OPENConfigure thresholds per handler:
from nerva.runtime.circuit_breaker import CircuitBreakerConfig
config = CircuitBreakerConfig( failure_threshold=3, # failures before opening recovery_seconds=60.0, # wait before probe half_open_max_calls=1, # probe calls in half-open)import { CircuitBreaker } from "nerva/runtime";
const breaker = new CircuitBreaker({ failureThreshold: 3, // failures before opening recoveryMs: 60_000, // wait before probe (milliseconds) halfOpenMaxCalls: 1, // probe calls in half-open});import "github.com/otomus/nerva/go/runtime"
config := &runtime.CircuitBreakerConfig{ FailureThreshold: 3, // failures before opening RecoverySeconds: 60.0, // wait before probe HalfOpenMaxCalls: 1, // probe calls in half-open}breaker := runtime.NewCircuitBreaker(config)Chaining and delegation
Chain runs handlers in sequence, piping each output as the next input:
result = await runtime.invoke_chain( ["extract_intent", "lookup_data", "format_response"], AgentInput(message="Find my order status"), ctx,)# Stops early if any handler returns non-SUCCESSconst result = await runtime.invokeChain( ["extract_intent", "lookup_data", "format_response"], createAgentInput("Find my order status"), ctx,);// Stops early if any handler returns non-SUCCESSresult, err := rt.InvokeChain( ctx, []string{"extract_intent", "lookup_data", "format_response"}, runtime.AgentInput{Message: "Find my order status"},)// Stops early if any handler returns non-SUCCESSDelegate enables agent-to-agent calls with inherited context:
async def travel_handler(input: AgentInput, ctx: ExecContext) -> str: # Delegate to calendar agent -- creates child ExecContext automatically calendar = await runtime.delegate( "calendar_agent", AgentInput(message="next Tuesday"), ctx, ) return f"Your next free slot: {calendar.output}"runtime.register("travel", async (input, ctx) => { // Delegate to calendar agent -- creates child ExecContext automatically const calendar = await runtime.delegate( "calendar_agent", createAgentInput("next Tuesday"), ctx, ); return `Your next free slot: ${calendar.output}`;});// Inside a handler binary, delegation is done by the orchestrator.// The SubprocessRuntime.Delegate method creates a child context:result, err := rt.Delegate( ctx, "calendar_agent", runtime.AgentInput{Message: "next Tuesday"},)fmt.Printf("Your next free slot: %s\n", result.Output)Child contexts inherit trace ID, permissions, and memory scope. Depth limits prevent infinite recursion.
Choosing a strategy
| Criteria | InProcess | Subprocess | Container |
|---|---|---|---|
| Latency overhead | ~0ms | ~50ms | ~200ms |
| Isolation | None | Process boundary | Full OS |
| Crash recovery | Crashes host | Isolated | Isolated |
| Dependency conflicts | Shared env | Separate env | Separate image |
| Use case | Dev, trusted agents | Production | Multi-tenant |