Architecture
Design principles
- Library, not server — Nerva runs inside your web framework. It does not own HTTP, auth, or API docs.
- Protocol-based — every primitive is a Python
Protocol/ TypeScriptinterface. No abstract base classes, no inheritance hierarchies. - Composable — use one primitive or all eight. Replace any piece with your own implementation.
- ExecContext everywhere — a single, explicit context object flows through every method. No hidden globals, no thread-locals.
- Side effects at the edges — pure logic in the core, I/O only at the boundary.
Request flow
Policy appears three times — before routing, before invocation, and during execution. Each checkpoint evaluates different rules (rate limit vs permission vs budget).
ExecContext as connective tissue
ExecContext is the single object that threads through every primitive call. It carries:
- Identity — request ID, trace ID, user ID, session ID
- Permissions — what this user/agent can access
- Memory scope — user, session, agent, or global
- Observability — trace spans, structured events, token usage
- Lifecycle — creation time, timeout, cooperative cancellation
- Streaming — optional stream sink for real-time token delivery
Without it, you end up passing 6+ separate arguments to every function, or building a god-object in v2.
ctx = ExecContext.create( user_id="user_123", session_id="session_abc", permissions=my_permissions, memory_scope="session", timeout_seconds=30,)Child contexts inherit the parent’s trace and permissions:
child_ctx = parent_ctx.child("calendar_agent")# child_ctx.trace_id == parent_ctx.trace_id# child_ctx.permissions == parent_ctx.permissions# child_ctx has its own request_id and timeoutDelegation model
Multi-agent is not a separate system. It is runtime recursion with shared context.
When an agent needs another agent, it calls runtime.delegate():
async def handle(input: AgentInput, ctx: ExecContext, runtime: AgentRuntime) -> AgentResult: calendar = await runtime.delegate("calendar_agent", AgentInput(message="next Tuesday"), ctx) flights = await tools.call("search_flights", {"to": "BER"}, ctx) return AgentResult(text=f"Found flights. {calendar.output}")What delegate does:
- Creates a child ExecContext — inherits trace, permissions, memory scope
- Checks permissions via Policy — can this agent invoke that agent?
- Invokes via the same Runtime (subprocess / in-process / container)
- Returns structured result to the calling agent
- Child spans appear nested under parent in traces
Depth limits prevent infinite recursion. Each agent has its own timeout and circuit breaker.
Middleware pipeline
The Orchestrator supports middleware hooks at each stage of the pipeline:
from nerva.middleware import Middleware
class LoggingMiddleware(Middleware): async def before_route(self, message: str, ctx: ExecContext) -> str: log.info("Routing", message=message, user=ctx.user_id) return message
async def after_invoke(self, result: AgentResult, ctx: ExecContext) -> AgentResult: log.info("Invoked", handler=result.handler, status=result.status) return result
orchestrator = Orchestrator( ..., middleware=[LoggingMiddleware(), AuthMiddleware(), MetricsMiddleware()],)Streaming architecture
Streaming is an orchestrator-level concern, not a per-primitive feature. Tokens flow through ctx.stream:
# Server-side: orchestrator yields chunksasync for chunk in orchestrator.stream("Book me a flight", ctx): await websocket.send(chunk)The data flow:
LLM Provider --> Runtime --> ctx.stream --> Responder.format_chunk() --> Client- Runtime pushes LLM tokens as they arrive
- Tools push progress events (optional)
- Responder formats each chunk for the target channel
- Client receives formatted chunks in real time
Async generator handlers enable streaming from agent code:
async def streaming_handler(input: AgentInput, ctx: ExecContext): async for token in llm.stream(input.message): yield tokenObservability
Every primitive records spans and events via ExecContext:
[t_abc] handle "Book me a flight to Berlin" (1,247ms, $0.003) +-- [t_abc.1] router.classify (45ms) | +-- scores: flight_agent=0.92, calendar=0.71 +-- [t_abc.2] runtime.invoke flight_agent (1,102ms, $0.002) | +-- [t_abc.2.1] memory.recall (23ms) | +-- [t_abc.2.2] tools.call search_flights (890ms) | +-- [t_abc.2.3] delegate calendar_agent (134ms, $0.001) | +-- [t_abc.2.3.1] tools.call calendar (98ms) +-- [t_abc.3] memory.store (12ms) +-- [t_abc.4] responder.format (88ms)Export to OpenTelemetry, structured JSON logs, or any custom tracer.
Repo structure
nerva/ spec/ # TypeSpec definitions (source of truth) exec_context.tsp router.tsp / runtime.tsp / tools.tsp / memory.tsp responder.tsp / registry.tsp / policy.tsp generated/ # auto-generated JSON Schema
packages/ nerva-py/ # Python implementation nerva/ context.py # ExecContext orchestrator.py # Orchestrator router/ # RuleRouter, EmbeddingRouter, LLMRouter, HybridRouter runtime/ # InProcessRuntime, SubprocessRuntime, ContainerRuntime tools/ # FunctionToolManager, MCPToolManager, CompositeToolManager memory/ # TieredMemory (hot/warm/cold) responder/ # PassthroughResponder, ToneResponder, MultimodalResponder registry/ # InMemoryRegistry, SqliteRegistry policy/ # NoopPolicyEngine, YamlPolicyEngine, AdaptivePolicyEngine middleware/ tracing/
nerva-js/ # TypeScript implementation (mirrors Python) nerva-cli/ # CLI (new, generate, list, trace-ui)Framework integration
FastAPI / NestJS / Express (HTTP, auth, sessions, swagger, CORS) +-- Nerva (routing, runtime, tools, memory, policy) +-- LLM providers, MCP servers, subprocess agentsNerva ships optional contrib bridges:
nerva.contrib.fastapi—NervaMiddleware,get_nerva_ctx()dependencynerva/contrib/express—nervaMiddleware(),req.nervaCtxnerva/contrib/nestjs—NervaModule,@NervaCtx()decorator
These are convenience helpers — you can always construct ExecContext manually.