Skip to content

feat(bus): sans-IO MeshStore seam for durable replay state + offline outbox #42

Description

@hartsock

Motivation

agent-mesh keeps all of its bus coordination state in process memory. A
restart wipes the anti-replay nonce cache, the per-peer sequence high-water
marks, the outbound sequence counter, and any in-flight replies. Two concrete
consequences fall out of this today:

  1. Restart breaks the peer's own replay defense. The outbound sequence
    counter is Arc::new(AtomicU64::new(1)) (agent-mesh-bus/src/bus.rs:124),
    re-initialized on every Bus::bind. After a restart the bus re-sends
    envelopes starting at sequence = 1, but a peer's SequenceTracker
    (replay.rs) still holds a high-water mark from the previous run and will
    reject every one of them as BadSequence until it climbs back past the old
    mark. The honest, signed monotonic-sequence guarantee silently degrades
    across a restart with no durable counter to resume from.

  2. A send to an offline peer is dropped on the floor. Bus::send_to
    dial_peer (bus.rs:286, bus.rs:301) resolves the peer over mDNS and
    returns BusError::Unreachable if it isn't announced within RESOLVE_TIMEOUT
    (5s). There is no queue, no retry — the message is lost. For a LAN-first
    mesh where a worker laptop sleeps or a peer reboots, "offline at send time"
    is the common case, not the exception.

This issue proposes a sans-IO, async MeshStore seam that the bus depends
on, with an in-memory default that preserves today's exact behavior bit-for-bit,
plus an optional durable backend behind a cargo feature. It stays
broker-less, LAN-first, capability-scoped, and default-safe: nothing changes
unless an operator opts in to a durable backend.

Current state

Everything below is in-memory only and lost on restart. Verified against the
code:

  • Nonce cacheNonceCache (agent-mesh-bus/src/replay.rs:30) is a
    Mutex<{ HashSet<[u8;24]>, VecDeque<[u8;24]> }> with FIFO/LRU eviction at a
    hard capacity. The bus constructs it at 4096 entries in Inbox::new()
    (inbox.rs:115). check_and_insert is the anti-replay gate (inbox.rs:189).
    Module doc is explicit: "a fresh bus starts with an empty cache" — so a
    replayed envelope captured before a restart sails through afterward.
  • Per-peer sequence high-water marksSequenceTracker
    (replay.rs:93) is Mutex<HashMap<Fingerprint, u64>>. check_and_advance
    accepts iff seq > last_seen and stores the new high-water mark
    (replay.rs:111). Lost on restart → the strict-monotonic replay check
    resets per peer.
  • Outbound sequence counterArc<AtomicU64> seeded to 1 per bind
    (bus.rs:124, bus.rs:159), incremented in send_one via
    fetch_add(1, SeqCst) (bus.rs:388). Not persisted → see Motivation feat: Phase 0 — scaffold + identity types + amesh CLI #1.
  • Pending repliesReplyWaiter (agent-mesh-bus/src/reply.rs:47) is a
    Mutex<HashMap<CorrelationId, oneshot::Sender<Vec<u8>>>>. These are
    live in-process tokio::sync::oneshot channels tied to an awaiting
    Bus::request call (bus.rs:199), so the waiter itself cannot survive a
    process restart — but the fact that a request is outstanding (peer,
    topic, correlation id, body) is durable state worth persisting if we want
    at-least-once request delivery across restarts.
  • Outbox — does not exist. send_to dials synchronously and surfaces
    Unreachable; there is no spooled, retried queue (see Motivation feat: Phase 1 — mDNS LAN discovery #2).

There is zero persistence in the crate today: a workspace grep for
sqlite/sled/persist/fsync/outbox/MeshStore finds nothing in the bus.
async-trait is already a workspace dependency (agent-mesh-bus/Cargo.toml:24),
so the trait shape below adds no new core dependency.

Confidentiality note (unchanged by this issue): envelopes are signed
only, not payload-encrypted
. SignedEnvelope (agent-mesh-protocol/src/envelope.rs:44)
carries a plaintext payload: ByteBuf plus a BLAKE3 payload_cid and an
ed25519 agent_sig over (recipient, nonce, sequence, payload_cid);
confidentiality rides on the iroh QUIC/TLS transport session. A durable store
therefore writes plaintext payloads to disk — the store seam must document
this and an at-rest-encryption knob is a natural follow-up, but it is explicitly
out of scope here.

Proposed design

1. The MeshStore trait (sans-IO, async, associated Error)

Define a single async trait in agent-mesh-bus (it is bus-level coordination
state — agent-mesh-protocol stays pure, no I/O). Modeled on matrix's
StateStore/CryptoStore: an associated Error type that callers convert
into BusError, and methods scoped to exactly the four durable concerns above.

#[async_trait::async_trait]
pub trait MeshStore: Send + Sync + 'static {
    type Error: std::fmt::Debug + Into<BusError>;

    // Anti-replay nonce cache (bounded; backend enforces capacity).
    async fn nonce_seen(&self, nonce: &[u8; 24]) -> Result<bool, Self::Error>;
    async fn record_nonce(&self, nonce: [u8; 24]) -> Result<(), Self::Error>;

    // Per-peer inbound high-water mark.
    async fn last_seq(&self, peer: &Fingerprint) -> Result<Option<u64>, Self::Error>;
    async fn advance_seq(&self, peer: &Fingerprint, seq: u64) -> Result<(), Self::Error>;

    // Durable outbound sequence counter (resume, don't reset to 1).
    async fn next_out_seq(&self) -> Result<u64, Self::Error>;

    // Offline outbox: spool, list-due, drop on ack/expiry.
    async fn enqueue_outbox(&self, item: OutboxItem) -> Result<OutboxId, Self::Error>;
    async fn due_outbox(&self) -> Result<Vec<(OutboxId, OutboxItem)>, Self::Error>;
    async fn remove_outbox(&self, id: OutboxId) -> Result<(), Self::Error>;
}

OutboxItem records { peer_fp, BusMessage, attempts, not_before } — enough to
re-dial and re-send_one after the peer reappears. Keep it minimal and
data-agnostic (the body is already opaque Vec<u8>, consistent with #38).

2. In-memory default (MemoryMeshStore) — preserves today's behavior

Ship a default backend that wraps the existing NonceCache (cap 4096) and
SequenceTracker so the current semantics are byte-for-byte preserved: same
4096-entry FIFO eviction, same strict-monotonic high-water check, outbound
counter starting at 1, and an outbox that is a bounded VecDeque (or simply
absent — enqueue returns the same Unreachable behavior). Bus::bind
constructs a MemoryMeshStore by default, so all existing tests stay green
and no on-disk file is created unless the operator opts in.
This mirrors
matrix's "by default this brings an in-memory store."

3. Optional durable backend behind a cargo feature

Add an optional store-sqlite feature (default-off) providing
SqliteMeshStore over rusqlite (or sqlx): a single-file DB holding the
nonce ring (capacity-bounded with the same FIFO eviction), the per-peer
high-water table, the singleton outbound counter, and the outbox table. Wire it
via a Bus::bind_with option (BusOptions already exists, bus.rs:62) that
takes Arc<dyn MeshStore<Error = …>> (or a generic), defaulting to
MemoryMeshStore. No feature → no rusqlite in the dependency tree.

4. What gets persisted (and what doesn't)

  • Persisted: nonce ring, per-peer inbound high-water marks, the outbound
    sequence counter
    (fixes Motivation feat: Phase 0 — scaffold + identity types + amesh CLI #1), and the outbox of messages to
    peers offline at send time (fixes Motivation feat: Phase 1 — mDNS LAN discovery #2).
  • Not persisted: the live oneshot reply waiters (they are tied to an
    awaiting caller and cannot outlive the process). Optionally persist the
    outstanding-request descriptor so a restarted bus can re-issue, but that is
    a follow-up, not part of the MVP.

5. Invariants

  • agent-mesh-protocol stays I/O-free; the seam lives in agent-mesh-bus.
  • Broker-less and LAN-first: the store is per-bus local durability, never a
    shared server. No cross-bus coordination is introduced.
  • Default-safe and backward compatible: opt-in only; the in-memory default is
    the current behavior.
  • Honest degradation documented: the durable store persists plaintext
    payloads (signed-only model); note it and defer at-rest encryption.

Reference: matrix-rust-sdk

matrix-rust-sdk is read here as a reference implementation, not a dependency
— it proves the sans-IO-core + swappable-backend + memory-store-for-tests shape
at production scale. agent-mesh deliberately rejects matrix's homeserver model;
only the store-trait pattern is borrowed.

  • crates/matrix-sdk-base/src/store/traits.rsStateStore: AsyncTraitDeps
    (line 71) is an #[async_trait] trait with type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error> (line 73). This is exactly the
    associated-error-type shape proposed for MeshStore.
  • Offline send / send-queue API — the same trait carries
    save_send_queue_request (line 373), load_send_queue_requests (line 416),
    update_send_queue_request_status (line 423), and dependent-request methods.
    This is the proven precedent for our outbox: durable spool of outbound
    messages, listed and drained when delivery becomes possible.
  • crates/matrix-sdk-base/src/store/memory_store.rsMemoryStore
    (line 103) impl StateStore (line 154) with type Error = StoreError. The
    module doc (store/mod.rs:19-21) states "By default this brings an in-memory
    store."
    — precisely the default-safe posture proposed here.
  • crates/matrix-sdk-crypto/src/store/traits.rsCryptoStore: AsyncTraitDeps (line 47) with type Error: fmt::Debug + Into<CryptoStoreError>
    (line 49): a second instance of the same async-trait-with-associated-error
    pattern, for the more security-sensitive key store.
  • crates/matrix-sdk-crypto/src/store/memorystore.rsMemoryStore
    (line 82, #[derive(Default)]) impl CryptoStore (line 197): the
    memory-store-for-tests default again, confirming the pattern is uniform
    across both matrix store traits.

The takeaway: a sans-IO trait core, an in-memory default that doubles as the
test backend, and optional persistent backends (SQLite/IndexedDB upstream)
behind features is a proven, low-risk shape we can adopt without taking on any
matrix code or the broker model.

Acceptance criteria

  • A MeshStore async trait with an associated Error: Into<BusError> is
    defined in agent-mesh-bus; agent-mesh-protocol gains no I/O.
  • Trait methods cover: nonce seen/record (bounded), per-peer inbound
    high-water get/advance, a durable outbound sequence counter, and an
    outbox (enqueue / list-due / remove).
  • A default MemoryMeshStore preserves current behavior exactly — 4096-entry
    FIFO nonce cache, strict-monotonic per-peer high-water, outbound counter,
    no on-disk file created by default — and every existing test passes
    unchanged
    .
  • Bus::bind uses MemoryMeshStore by default; Bus::bind_with accepts an
    injected MeshStore (default-safe, backward compatible).
  • The outbound sequence counter resumes from the store across a restart
    instead of resetting to 1; a regression test asserts a re-bound bus with a
    persistent store does not restart its sequence at 1.
  • A send to an offline/unannounced peer enqueues to the outbox instead of
    dropping; a test asserts the spooled message is drained and delivered once
    the peer becomes resolvable.
  • An optional store-sqlite feature provides SqliteMeshStore; with the
    feature off, rusqlite/sqlx is absent from the dependency tree
    (verified by cargo tree).
  • Store integration tests run against both MemoryMeshStore and
    SqliteMeshStore (shared test suite, matrix-style).
  • Docs state plainly that a durable store persists plaintext payloads
    (signed-only envelopes; confidentiality is transport-session-only) and
    flag at-rest encryption as a follow-up.
  • just check and just cov-ci (75% floor) pass; hook/pipeline parity
    untouched.

Relationships


Meta · risk: low (per repo CLAUDE.md) · follow-up from the matrix-rust-sdk ↔ agent-mesh architectural comparison (matrix-rust-sdk is a reference implementation, not a dependency).

File/line references were drafted against a recent checkout; line numbers are indicative — symbols are authoritative (grep by name). Substance verified against origin/main @ f63c55f.

🤖 Generated with Claude Code

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions