Skip to content

Latest commit

 

History

History
196 lines (157 loc) · 12.2 KB

File metadata and controls

196 lines (157 loc) · 12.2 KB

NATS Communication Protocol

This document is the L0 authoritative definition, defining Subject, Header, Payload, and reliability and guardrail constraints.

Turn / Step definitions (slightly different from OpenAI)

  • Turn: A complete delivery closure (from one Enqueue/user input to the final task.deliverable completion).
    • Aligned with OpenAI’s user input -> assistant final response layer, but allows non-user triggers (for example, internal orchestration/derivation), so it is broader.
  • Step: A single reasoning/tool iteration within a Turn (a one-step iteration of ReAct), producing cards and events such as agent.thought / tool.call / tool.result / task.deliverable.
    • Aligned with OpenAI’s step/iteration, but here a Step only describes one reasoning iteration and does not equal a full delivery.

1. Subject format (authoritative)

cg.{ver}.{project_id}.{channel_id}.{category}.{component}.{target}.{suffix}
  • ver: protocol version (aligned with core.config.PROTOCOL_VERSION)
  • project_id: tenant/project isolation
  • channel_id: visibility/auth domain (for example, public / u_{user} / t_{thread}); not a CardBox storage isolation dimension (cards/boxes are still accessed by project_id)
  • category: cmd / evt / str
  • component: agent / sys / user / tool
  • target: routing target (capability group or specific agent_id / tool provider)
  • suffix: action/type (for example, wakeup / turn / task / chunk)

Unified placeholder example: cmd.agent.{target}.wakeup.

2. Transport and reliability

2.1 JetStream (cmd/evt)

  • cmd and evt are transported via JetStream (reliable delivery).
  • Default streams:
    • cg_cmd_{PROTOCOL_VERSION}: cg.{PROTOCOL_VERSION}.*.*.cmd.> (retain 24h)
    • cg_evt_{PROTOCOL_VERSION}: cg.{PROTOCOL_VERSION}.*.*.evt.> (retain 7d)
  • The scope generated by subject_pattern() matches the above (target/suffix support wildcards, suffix commonly used for wakeup / task / step, etc.).
  • The true source for semantic progression and replay is still PG/CardBox; JetStream only carries edge events.

2.2 Core NATS (str)

  • str uses Core NATS (low latency, lossy)
  • Used only for streaming output and observability, not for state advancement.
  • core.nats_client.publish_core/subscribe_core use Core NATS directly, without JetStream, queue_group, or persistence; traceparent is not required.
  • Consumers of str should not use str as a state-change trigger.

3. Header conventions (mandatory for L0)

Aside from str.*, publish_event applies the following behaviors automatically for cmd/evt:

  • traceparent is mandatory; if missing in input, it is filled.
  • Optional passthrough/fill: tracestate.
  • Auto-injected: CG-Timestamp / CG-Version / CG-Msg-Type / CG-Sender (only filled when missing).
  • CG-Recursion-Depth remains the core L0 damping constraint; the call chain/entry normalization performs centralized validation and fillback. Missing, non-integer, or negative values are treated as ProtocolViolationError when strict checking is required.

Optional extension: CG-User-ID / CG-Channel-ID / CG-Context-ID (the current implementation does not have unified semantic consumption).

Constraint: avoid placing keys with CG-* prefix in business payloads; current implementation only emits warnings in publish_event and does not hard-reject.

Recursion depth and damping rules (mandatory for L0)

  • Scope: cmd.* / evt.* / tool callbacks; str.* does not apply.
  • Invalid handling: when the call chain explicitly requires depth validation, missing, non-integer, or negative CG-Recursion-Depth triggers ProtocolViolationError.
  • Increment rule:
    • User entry tasks start at 0 (for example, the first hop triggered by UI/external entry).
    • enqueue currently supports only mode=call; transfer/notify return ProtocolViolationError.
    • enqueue itself does not perform +1; callers that need drill-down semantics should precompute and pass depth in the chain via next_recursion_depth.
    • enqueue attempts to backfill/validate depth by correlation_id; mismatch with previous record on correlation_id results in an error.
    • tool_result/timeout report and join_response require strict matching between depth and the value associated with correlation_id.
  • Threshold and rejection:
    • Threshold comes from [worker].max_recursion_depth (default 20).
    • If depth >= threshold, publishers or consumers must reject dispatch and halt the call chain.
  • Failure semantics (recommended to normalize):
    • Recursion exceeded: evt.agent.{agent_id}.task with status=failed + error_code=recursion_depth_exceeded.
    • Protocol violation: evt.agent.{agent_id}.task with status=failed + error_code=protocol_violation.

Trace rules (authoritative)

  • Rules below apply to cmd.* / evt.* / tool callbacks; str.* does not apply.
  • traceparent is the core tracing field; trace_id is derived from traceparent.trace-id and persisted.
  • Publish-side ensures traceparent (generated if missing); changes in traceparent indicate chain handoff.
  • Tool service callbacks should pass through inbound traceparent; ExecutionService (Report primitive) is responsible for generating child spans and writing to Inbox.
  • Do not pass trace id only in payload (payload should be used only for compatibility/debugging).
  • fanout/join scenarios usually inherit the original trace-id to keep traceability across branches.
  • If calling external HTTP, inject traceparent / tracestate when possible.

4. Idempotency and gating (core L0 invariants)

4.1 AgentTurn gating (CAS)

  • Any state-advancing write must satisfy the turn_epoch + active_agent_turn_id gate.
  • UPDATE ... WHERE turn_epoch=? AND active_agent_turn_id=? with zero rows affected is treated as stale/partitioned and must stop side effects.

4.2 ToolResult idempotency

  • Worker de-duplicates tool callbacks by (agent_turn_id, tool_call_id).
  • Execution side permits duplicate delivery of the same tool_result to tolerate message loss.

5. Key subject families and payloads

Global command-path rule:

  • In-boundary cmd.* publication is centralized in L0 (enqueue/report/join/command_intent/wakeup); business services should not directly publish_event to cmd.*.
  • cmd.agent.*.wakeup must be emitted through L0 wakeup APIs only (not via command_intent).
  • This release uses a moderate payload reduction: cmd.tool.* / cmd.sys.pmo.internal.* may keep routing fields such as tool_call_id/tool_call_card_id/tool_name/after_execution.

5.1 cmd.agent.{target}.wakeup

  • Direction: PMO/ExecutionService -> Worker
  • Required: agent_id
  • Optional: inbox_id / reason / metadata
  • Description: control-plane bell; no semantic data is carried. Semantic requests and receipts are stored in state.agent_inbox.

5.2 Inbox tool_result (Report)

  • Direction: Tool/PMO -> ExecutionService (writes to Inbox + wakeup)
  • Required: tool_call_id / agent_turn_id / turn_epoch / agent_id
  • Required: after_execution (suspend|terminate)
  • Required: status (success|failed|canceled|timeout|partial)
  • Required: tool_result_card_id (pointer)
  • Prohibited: inline result
  • Description: callback writes to state.agent_inbox (Report) and is processed from Inbox by Worker.
  • Description: if tool.result.content.result.__cg_control.after_execution exists, it overrides a valid after_execution.

5.3 Inbox stop (Cancel)

  • Direction: PMO/BatchManager -> ExecutionService (writes to Inbox + wakeup)
  • Required: agent_id / agent_turn_id / turn_epoch
  • Optional: reason

5.4 cmd.tool.*

  • Direction: Worker -> Tool Service
  • Subject comes from resource.tools.target_subject
  • Required: tool_call_id / agent_turn_id / turn_epoch / agent_id
  • Common optional: tool_name / after_execution / tool_call_card_id / context_box_id / dispatch_requested_at
  • Constraint: Tool entry ToolCommandPayload forbids direct args by default; usually carries tool_call_card_id and let the card express exact parameters.
  • Dispatch path: Worker/ToolCaller/API should all go through L0 command_intent to produce signals, then publish after transaction commit.

5.5 cmd.sys.pmo.internal.*

  • Direction: Worker -> PMO
  • Semantics aligned with cmd.tool.* (still resolved through resource.tools.target_subject to internal.* tools), executed by PMO built-in handlers.

5.6 evt.agent.{agent_id}.step

  • Required: agent_turn_id / step_id / phase
  • Optional: metadata (such as metadata.timing, see 05_operations/observability.md)
  • Purpose: observability and audit

5.7 evt.agent.{agent_id}.task

  • Required: agent_turn_id / status
  • Optional: deliverable_card_id / output_box_id / tool_result_card_id / error / error_code / stats (such as stats.timing, see 05_operations/observability.md)

5.8 str.agent.{agent_id}.chunk

  • Required: agent_turn_id / step_id / chunk_type / content
  • Optional: index / metadata (such as metadata.llm_request_started_at / metadata.llm_timing)
  • Description: str carries no trace headers and does not perform tracing; it relies only on payload fields for correlation/observability.

5.9 evt.agent.{agent_id}.state

  • Required: agent_id / agent_turn_id / status / turn_epoch / updated_at
  • Optional: output_box_id / metadata
  • Meaning: status-edge signal; PG/CardBox is still the true source.

5.10 cmd.sys.ui.action

  • Direction: UI -> UI Worker
  • Required: action_id / agent_id / tool_name / args
  • Optional: metadata / message_id / parent_step_id
  • Rules:
    • action_id is usually the idempotency key; message_id may be used as an alternative idempotency key.
    • If message_id is provided (UUIDv7), UI Worker may use it as idempotency/correlation key.
    • agent_id must have worker_target=ui_worker; tools must have ui_allowed=true (default deny).
    • UI Worker publishes evt.sys.ui.action_ack as acknowledgment event.

5.11 evt.sys.ui.action_ack

  • Direction: UI Worker -> UI
  • Required: action_id / agent_id / status
  • Optional: message_id / tool_result_card_id / error_code / error
  • Meaning: entry acknowledgment / rejection / busy / terminal completion
    • Entry-stage acknowledgments are typically accepted|busy|rejected
    • After tool execution, the terminal acknowledgment is typically done; failure cases use rejected

6. Consumer conventions and consumption modes (mandatory for L0)

6.1 System-level multicast

  • evt. is multicast by default*: all events are stored in JetStream. Different business systems (such as PMO and Observer) must use different durable_name values.
  • NATS maintains an independent cursor per unique durable_name. This means adding one observer does not affect event delivery for business logic.

6.2 Competitive consumption within a business group

  • Horizontal scaling support: multiple instances of the same business system should share the same durable_name and queue_group (under Pull Consumer, Durable guarantees this).
  • Effect: under one durable_name, the same event is delivered to only one instance, enabling load balancing.

6.3 Consumption model comparison table

Type Physical implementation Semantic pattern Persisted Isolation level
cmd.* JetStream Work Queue Yes Competitive within group (single execution)
evt.* JetStream Event Stream Yes System-level multicast / competitive within group
str.* Core NATS Broadcast No Full broadcast (fire-and-forget)

6.4 Detailed conventions

  • cmd.*: stable durable + queue group is recommended.
  • evt.*: replay policy is business-defined; aggregate scenarios may use deliver_policy=all.

6.5 Configuration mapping (current implementation)

  • [nats]:
    • servers, tls_enabled, cert_dir map to NATS client connection config.
  • [nats.pull]:
    • batch_size, max_inflight, fetch_timeout_seconds, warmup_timeout_seconds map to NATSClient pull behavior (core/app_config.py NATSConfig allows additional passthrough fields).

7. Reference list

  • Auto-generated subject emit points: ../REFERENCE/nats_subjects.generated.md (index only, not a substitute for this semantic definition)