diff --git a/crates/mesh/src/crdt_kv/crdt.rs b/crates/mesh/src/crdt_kv/crdt.rs index 941e50504..b92dc7a9f 100644 --- a/crates/mesh/src/crdt_kv/crdt.rs +++ b/crates/mesh/src/crdt_kv/crdt.rs @@ -1,4 +1,5 @@ use std::{ + cmp::Reverse, collections::HashSet, sync::Arc, time::{Duration, Instant}, @@ -9,7 +10,9 @@ use parking_lot::{Mutex, RwLock}; use tracing::{debug, info}; use super::{ + epoch_max_wins, kv_store::KvStore, + merge_strategy::MergeStrategy, operation::{Operation, OperationLog}, replica::{LamportClock, ReplicaId}, }; @@ -55,6 +58,10 @@ impl ValueMetadata { } } + fn from_rate_limit_live_version(version: epoch_max_wins::RateLimitVersion) -> Self { + Self::new(version.timestamp, version.replica_id) + } + fn tombstone(timestamp: u64, replica_id: ReplicaId) -> Self { Self { timestamp, @@ -68,6 +75,10 @@ impl ValueMetadata { (self.timestamp, self.replica_id) } + fn as_rate_limit_version(&self) -> epoch_max_wins::RateLimitVersion { + epoch_max_wins::RateLimitVersion::new(self.timestamp, self.replica_id) + } + fn matches_version(&self, timestamp: u64, replica_id: ReplicaId) -> bool { self.timestamp == timestamp && self.replica_id == replica_id } @@ -83,6 +94,7 @@ pub struct CrdtOrMap { store: KvStore, metadata: Arc>>, // Key to list of versions key_locks: Arc>>>, // Per-key critical section lock + merge_strategies: Arc>>, replica_id: ReplicaId, clock: LamportClock, operation_log: Arc>, @@ -101,12 +113,56 @@ impl CrdtOrMap { store: KvStore::new(), metadata: Arc::new(DashMap::new()), key_locks: Arc::new(DashMap::new()), + merge_strategies: Arc::new(RwLock::new(Vec::new())), replica_id, clock: LamportClock::new(), operation_log: Arc::new(RwLock::new(OperationLog::new())), } } + /// Register the merge strategy for a key prefix. + pub(crate) fn register_merge_strategy(&self, prefix: String, strategy: MergeStrategy) { + let mut strategies = self.merge_strategies.write(); + if let Some((_, existing)) = strategies + .iter_mut() + .find(|(registered_prefix, _)| registered_prefix == &prefix) + { + *existing = strategy; + } else { + strategies.push((prefix, strategy)); + } + strategies.sort_by_key(|(prefix, _)| Reverse(prefix.len())); + } + + fn merge_strategy_for_key(&self, key: &str) -> MergeStrategy { + let strategies = self.merge_strategies.read(); + Self::merge_strategy_for_key_from(&strategies, key) + } + + fn merge_strategy_for_key_from( + strategies: &[(String, MergeStrategy)], + key: &str, + ) -> MergeStrategy { + strategies + .iter() + .find_map(|(prefix, strategy)| key.starts_with(prefix).then(|| strategy.clone())) + .unwrap_or(MergeStrategy::LastWriterWins) + } + + fn compact_operation_log(&self, operation_log: &mut OperationLog) { + let strategies = self.merge_strategies.read().clone(); + operation_log + .compact_with_strategy(|key| Self::merge_strategy_for_key_from(&strategies, key)); + } + + fn append_operation(&self, operation: Operation) { + let mut operation_log = self.operation_log.write(); + let strategies = self.merge_strategies.read().clone(); + operation_log.append_with_strategy(operation, |key| { + Self::merge_strategy_for_key_from(&strategies, key) + }); + } + fn key_lock_for(&self, key: &str) -> Arc> { self.key_locks .entry(key.to_string()) @@ -140,25 +196,18 @@ impl CrdtOrMap { let key_lock = self.key_lock_for(&key); let key_guard = key_lock.lock(); + let previous = self.store.get(&key); let timestamp = self.clock.tick(); - let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) { - let mut prev = None; - let value_for_operation = value.clone(); - let _ = self.store.upsert(key.clone(), |current| { - prev = current.map(|bytes| bytes.to_vec()); - value - }); - - let operation = - Operation::insert(key.clone(), value_for_operation, timestamp, self.replica_id); - self.operation_log.write().append(operation); + let operation = Operation::insert(key.clone(), value, timestamp, self.replica_id); + let result = if self.apply_insert_locked(&key, operation.clone()) { + self.append_operation(operation); debug!( "Insert: key={}, timestamp={}, replica={}", key, timestamp, self.replica_id ); - prev + previous } else { self.store.get(&key).map(|bytes| bytes.to_vec()) }; @@ -179,19 +228,16 @@ impl CrdtOrMap { let current_value = self.store.get(&key); let updated_value = updater(current_value.as_deref()); let timestamp = self.clock.tick(); + let operation = Operation::insert( + key.clone(), + updated_value.clone(), + timestamp, + self.replica_id, + ); - let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) { - let operation = Operation::insert( - key.clone(), - updated_value.clone(), - timestamp, - self.replica_id, - ); - - self.store.insert(key.clone(), updated_value.clone()); - self.operation_log.write().append(operation); - - updated_value + let result = if self.apply_insert_locked(&key, operation.clone()) { + self.append_operation(operation); + self.store.get(&key).unwrap_or_default() } else { self.store.get(&key).unwrap_or_default() }; @@ -219,19 +265,16 @@ impl CrdtOrMap { } }; let timestamp = self.clock.tick(); + let operation = Operation::insert( + key.clone(), + updated_value.clone(), + timestamp, + self.replica_id, + ); - let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) { - let operation = Operation::insert( - key.clone(), - updated_value.clone(), - timestamp, - self.replica_id, - ); - - self.store.insert(key.clone(), updated_value.clone()); - self.operation_log.write().append(operation); - - updated_value + let result = if self.apply_insert_locked(&key, operation.clone()) { + self.append_operation(operation); + self.store.get(&key).unwrap_or_default() } else { self.store.get(&key).unwrap_or_default() }; @@ -261,16 +304,15 @@ impl CrdtOrMap { let (result, changed) = if let Some(updated_value) = maybe_updated_value { let timestamp = self.clock.tick(); - if self.record_insert_metadata(&key, timestamp, self.replica_id) { - let operation = Operation::insert( - key.clone(), - updated_value.clone(), - timestamp, - self.replica_id, - ); - self.store.insert(key.clone(), updated_value.clone()); - self.operation_log.write().append(operation); - (updated_value, true) + let operation = Operation::insert( + key.clone(), + updated_value.clone(), + timestamp, + self.replica_id, + ); + if self.apply_insert_locked(&key, operation.clone()) { + self.append_operation(operation); + (self.store.get(&key).unwrap_or_default(), true) } else { (self.store.get(&key).unwrap_or_default(), false) } @@ -297,7 +339,7 @@ impl CrdtOrMap { let removed = if self.record_remove_metadata(key, timestamp, self.replica_id) { let operation = Operation::remove(key.to_string(), timestamp, self.replica_id); - self.operation_log.write().append(operation); + self.append_operation(operation); self.store.remove(key) } else { None @@ -453,22 +495,21 @@ impl CrdtOrMap { .collect() }; - let unseen_operations: Vec = { + let mut unseen_operations: Vec = log + .operations() + .iter() + .filter(|operation| { + !seen_operations.contains(&(operation.replica_id(), operation.timestamp())) + }) + .cloned() + .collect(); + unseen_operations.sort_by_key(|operation| (operation.timestamp(), operation.replica_id())); + + { let mut local_log = self.operation_log.write(); local_log.merge(log); - local_log.compact(); - - let mut unseen: Vec = local_log - .operations() - .iter() - .filter(|operation| { - !seen_operations.contains(&(operation.replica_id(), operation.timestamp())) - }) - .cloned() - .collect(); - unseen.sort_by_key(|operation| (operation.timestamp(), operation.replica_id())); - unseen - }; + self.compact_operation_log(&mut local_log); + } // Apply only new operations in deterministic order. for operation in &unseen_operations { @@ -491,15 +532,47 @@ impl CrdtOrMap { fn apply_insert(&self, key: &str, value: Vec, timestamp: u64, replica_id: ReplicaId) { let key_lock = self.key_lock_for(key); let key_guard = key_lock.lock(); + let operation = Operation::insert(key.to_string(), value, timestamp, replica_id); - if self.record_insert_metadata(key, timestamp, replica_id) { - self.store.insert(key.to_string(), value); - } + self.apply_insert_locked(key, operation); drop(key_guard); self.try_cleanup_key_lock(key, &key_lock); } + fn apply_insert_locked(&self, key: &str, operation: Operation) -> bool { + let Operation::Insert { + value, + timestamp, + replica_id, + .. + } = operation + else { + return false; + }; + + match self.merge_strategy_for_key(key) { + MergeStrategy::EpochMaxWins => { + if let Some(merged) = + self.record_epoch_insert_metadata(key, &value, timestamp, replica_id) + { + self.store.insert(key.to_string(), merged); + true + } else { + false + } + } + MergeStrategy::LastWriterWins => { + if self.record_insert_metadata(key, timestamp, replica_id) { + self.store.insert(key.to_string(), value); + true + } else { + false + } + } + } + } + fn compact_key_metadata(versions: &mut Vec) { if versions.len() <= 1 { return; @@ -511,6 +584,16 @@ impl CrdtOrMap { } } + fn newest_rate_limit_tombstone_version( + versions: &[ValueMetadata], + ) -> Option { + versions + .iter() + .filter(|version| version.is_tombstone) + .max_by_key(|version| version.version_key()) + .map(ValueMetadata::as_rate_limit_version) + } + fn record_insert_metadata(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> bool { let new_metadata = ValueMetadata::new(timestamp, replica_id); @@ -545,6 +628,68 @@ impl CrdtOrMap { } } + // Tombstones for EpochMaxWins keys are tracked in two places: + // (a) `ValueMetadata { is_tombstone: true, .. }` in `metadata` + // — used locally for LWW ordering + tombstone GC. + // (b) `tombstone_version` embedded in the stored shard payload + // — propagates across replicas via snapshot/compaction so + // a peer that receives only the post-tombstone Insert + // (the Remove op gone after compaction) still filters + // pre-tombstone inserts. See + // `test_epoch_max_wins_snapshot_only_propagation_preserves_tombstone_boundary`. + fn record_epoch_insert_metadata( + &self, + key: &str, + value: &[u8], + timestamp: u64, + replica_id: ReplicaId, + ) -> Option> { + let incoming_version = epoch_max_wins::RateLimitVersion::new(timestamp, replica_id); + let current = self.store.get(key); + + match self.metadata.entry(key.to_string()) { + MapEntry::Occupied(mut entry) => { + let versions = entry.get_mut(); + + let has_existing_entry = versions + .iter() + .any(|v| v.matches_version(timestamp, replica_id)); + if has_existing_entry { + Self::compact_key_metadata(versions); + return None; + } + + let current_tombstone = Self::newest_rate_limit_tombstone_version(versions); + let Some(merged) = epoch_max_wins::merge_live_value( + current.as_deref(), + current_tombstone, + value, + incoming_version, + ) else { + Self::compact_key_metadata(versions); + return None; + }; + + if !merged.changed { + Self::compact_key_metadata(versions); + return None; + } + versions.clear(); + versions.push(ValueMetadata::from_rate_limit_live_version( + merged.live_version, + )); + Some(merged.value) + } + MapEntry::Vacant(entry) => { + let merged = epoch_max_wins::merge_live_value(None, None, value, incoming_version)?; + entry.insert(vec![ValueMetadata::from_rate_limit_live_version( + merged.live_version, + )]); + Some(merged.value) + } + } + } + /// Apply remove fn apply_remove(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> Option> { let key_lock = self.key_lock_for(key); @@ -604,3 +749,45 @@ impl Default for CrdtOrMap { Self::new() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn epoch_equal_value_insert_does_not_rewind_metadata() { + let replica = CrdtOrMap::new(); + replica.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); + + let key = "rl:global:node-a"; + let newer_insert_replica = ReplicaId::new(); + let older_insert_replica = ReplicaId::new(); + let tombstone_replica = ReplicaId::new(); + + assert!(replica.apply_insert_locked( + key, + Operation::insert( + key.to_string(), + epoch_max_wins::encode(6, 0).to_vec(), + 100, + newer_insert_replica + ), + )); + assert!(!replica.apply_insert_locked( + key, + Operation::insert( + key.to_string(), + epoch_max_wins::encode(6, 0).to_vec(), + 10, + older_insert_replica + ), + )); + + assert_eq!(replica.apply_remove(key, 50, tombstone_replica), None); + assert_eq!( + replica.get(key).and_then(|value| epoch_max_wins::decode(&value)), + Some(epoch_max_wins::EpochCount { epoch: 6, count: 0 }), + "older equal-value insert must not let an intermediate tombstone delete the newer live value", + ); + } +} diff --git a/crates/mesh/src/crdt_kv/epoch_max_wins.rs b/crates/mesh/src/crdt_kv/epoch_max_wins.rs index c4f74b45d..b4b109000 100644 --- a/crates/mesh/src/crdt_kv/epoch_max_wins.rs +++ b/crates/mesh/src/crdt_kv/epoch_max_wins.rs @@ -1,37 +1,88 @@ -//! Epoch-aware max-wins merge for rate-limit counter values. +//! Rate-limit shard merge for epoch-aware counters. //! -//! Plain max-wins undoes window resets: A resets to 0, B still has -//! 100, max(0, 100) reverts the reset. This merge compares epoch -//! first, then max-count within the same epoch — a reset (higher -//! epoch, count = 0) always beats a higher count at an older epoch. +//! Gateway code writes the simple application payload `(epoch, count)` as +//! 16 bytes: `u64` big-endian epoch followed by `i64` big-endian count. +//! Inside the CRDT, `rl:` values are normalized into a rate-limit shard +//! state that also carries a normalized frontier of live points plus the +//! newest tombstone boundary. That extra metadata is what lets operation-log +//! compaction keep deletes meaningful: a delayed insert from before a +//! tombstone cannot be resurrected just because the log compacted to one live +//! value. //! -//! Wire format: 16 bytes, `u64` big-endian epoch in bytes 0..8, -//! `i64` big-endian count in bytes 8..16. Fixed-size + big-endian so -//! the mesh crate can compare values without an application -//! callback. Signed count leaves room for future sentinels. -//! -//! Malformed input (length ≠ 16): if one side decodes, it wins. If -//! both fail, keep `local` per the `MergeStrategy::EpochMaxWins` -//! contract in `kv.rs` — a no-op on the store. This sacrifices -//! commutativity for the malformed/malformed case, but rate-limit -//! counters write on every increment and reset every window, so a -//! well-formed write restores clean state before the non-convergence -//! matters. +//! Stored and gossiped `rl:` values are always serialized [`RateLimitShard`] +//! states. Raw epoch/count payloads are accepted at the insert boundary and by +//! the public decoder because local namespace subscribers can observe the +//! pre-normalized write payload. Malformed stored input: if one side decodes, +//! it wins. If both fail, keep `local` per the `MergeStrategy::EpochMaxWins` +//! contract in `kv.rs` - a no-op on the store. use std::cmp::Ordering; -/// Fixed wire size: 8-byte big-endian epoch + 8-byte big-endian count. +use serde::{Deserialize, Serialize}; + +use super::{operation::Operation, replica::ReplicaId}; + +/// Fixed application payload size: 8-byte epoch + 8-byte count. pub const EPOCH_MAX_WINS_ENCODED_LEN: usize = 16; -/// Parsed value returned owned so callers don't need to keep the -/// source slice alive across the merge. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// Parsed value returned owned so callers don't need to keep the source slice +/// alive across the merge. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct EpochCount { pub epoch: u64, pub count: i64, } -/// Encode `(epoch, count)` to the 16-byte big-endian wire format. +/// Lamport version for a rate-limit shard state component. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub(super) struct RateLimitVersion { + pub timestamp: u64, + pub replica_id: ReplicaId, +} + +impl RateLimitVersion { + pub(super) fn new(timestamp: u64, replica_id: ReplicaId) -> Self { + Self { + timestamp, + replica_id, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct LivePoint { + value: EpochCount, + version: RateLimitVersion, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct RateLimitShard { + live_points: Vec, + tombstone_version: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum RateLimitState { + Live(RateLimitShard), + Tombstone(RateLimitVersion), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum ValueWinner { + Local, + Remote, + Equal, +} + +pub(super) struct LiveMerge { + pub value: Vec, + pub live_version: RateLimitVersion, + pub changed: bool, +} + +/// Encode `(epoch, count)` to the 16-byte application payload. `rl:` CRDT +/// inserts normalize this payload into a [`RateLimitShard`] state before +/// storing it. #[must_use] pub fn encode(epoch: u64, count: i64) -> [u8; EPOCH_MAX_WINS_ENCODED_LEN] { let mut buf = [0u8; EPOCH_MAX_WINS_ENCODED_LEN]; @@ -40,10 +91,16 @@ pub fn encode(epoch: u64, count: i64) -> [u8; EPOCH_MAX_WINS_ENCODED_LEN] { buf } -/// Decode 16 bytes. `None` on any other length (caller treats as -/// malformed). +/// Decode a normalized CRDT shard state or raw application payload. +/// `None` means malformed. #[must_use] pub fn decode(bytes: &[u8]) -> Option { + decode_shard(bytes) + .and_then(|shard| shard.current_value()) + .or_else(|| decode_raw_epoch_count(bytes)) +} + +fn decode_raw_epoch_count(bytes: &[u8]) -> Option { if bytes.len() != EPOCH_MAX_WINS_ENCODED_LEN { return None; } @@ -52,30 +109,255 @@ pub fn decode(bytes: &[u8]) -> Option { Some(EpochCount { epoch, count }) } -/// Merge two rate-limit values per the epoch-max-wins rule. -/// -/// Both decode: higher epoch wins; on equal epochs, max count wins. -/// One decodes: the well-formed side wins. Neither decodes: keep -/// `local` (no-op, per the `EpochMaxWins` contract in `kv.rs`). -/// -/// Returned `Vec` so the caller can write it straight back. -#[must_use] -pub fn merge(local: &[u8], remote: &[u8]) -> Vec { - match (decode(local), decode(remote)) { - (Some(l), Some(r)) => { - let winner = match l.epoch.cmp(&r.epoch) { - Ordering::Greater => l, - Ordering::Less => r, - Ordering::Equal => EpochCount { - epoch: l.epoch, - count: l.count.max(r.count), +fn encode_shard(shard: &RateLimitShard) -> Option> { + bincode::serialize(shard).ok() +} + +fn decode_shard(bytes: &[u8]) -> Option { + let shard: RateLimitShard = bincode::deserialize(bytes).ok()?; + (!shard.live_points.is_empty()).then_some(shard) +} + +fn decode_stored_live(bytes: &[u8]) -> Option { + decode_shard(bytes) +} + +fn compare_epoch_count(local: EpochCount, remote: EpochCount) -> ValueWinner { + match local.epoch.cmp(&remote.epoch) { + Ordering::Greater => ValueWinner::Local, + Ordering::Less => ValueWinner::Remote, + Ordering::Equal => match local.count.cmp(&remote.count) { + Ordering::Greater => ValueWinner::Local, + Ordering::Less => ValueWinner::Remote, + Ordering::Equal => ValueWinner::Equal, + }, + } +} + +impl RateLimitShard { + fn from_live_point(point: LivePoint) -> Self { + Self { + live_points: vec![point], + tombstone_version: None, + } + } + + fn current_value(&self) -> Option { + self.live_points + .iter() + .map(|point| point.value) + .reduce( + |current, candidate| match compare_epoch_count(current, candidate) { + ValueWinner::Remote => candidate, + ValueWinner::Local | ValueWinner::Equal => current, }, + ) + } + + fn newest_live_version(&self) -> Option { + self.live_points.iter().map(|point| point.version).max() + } + + fn merged( + mut points: Vec, + tombstone_version: Option, + ) -> Option { + points.retain(|point| tombstone_version.is_none_or(|tombstone| point.version > tombstone)); + if points.is_empty() { + return None; + } + + points.sort_by_key(|point| std::cmp::Reverse(point.version)); + let mut suffix_best: Option = None; + let mut frontier = Vec::new(); + for point in points { + let keep = suffix_best.is_none_or(|best| { + matches!(compare_epoch_count(point.value, best), ValueWinner::Local) + }); + if keep { + suffix_best = Some(match suffix_best { + Some(best) => match compare_epoch_count(best, point.value) { + ValueWinner::Remote => point.value, + ValueWinner::Local | ValueWinner::Equal => best, + }, + None => point.value, + }); + frontier.push(point); + } + } + frontier.sort_by_key(|point| point.version); + + Some(Self { + live_points: frontier, + tombstone_version, + }) + } + + fn live_points_after_tombstone( + &self, + tombstone_version: Option, + ) -> Vec { + self.live_points + .iter() + .filter(|point| tombstone_version.is_none_or(|tombstone| point.version > tombstone)) + .cloned() + .collect() + } +} + +impl RateLimitState { + fn tombstone_version(&self) -> Option { + match self { + Self::Live(shard) => shard.tombstone_version, + Self::Tombstone(version) => Some(*version), + } + } + + fn live_points_after_tombstone( + &self, + tombstone_version: Option, + ) -> Vec { + match self { + Self::Live(shard) => shard.live_points_after_tombstone(tombstone_version), + Self::Tombstone(_) => Vec::new(), + } + } + + fn merge(self, other: Self) -> Option { + let tombstone_version = self.tombstone_version().max(other.tombstone_version()); + let mut live_points = self.live_points_after_tombstone(tombstone_version); + live_points.extend(other.live_points_after_tombstone(tombstone_version)); + + match RateLimitShard::merged(live_points, tombstone_version) { + Some(shard) => Some(Self::Live(shard)), + None => tombstone_version.map(Self::Tombstone), + } + } + + fn into_operation(self, key: String) -> Option { + match self { + Self::Live(shard) => { + let live_version = shard.newest_live_version()?; + Some(Operation::insert( + key, + encode_shard(&shard)?, + live_version.timestamp, + live_version.replica_id, + )) + } + Self::Tombstone(version) => Some(Operation::remove( + key, + version.timestamp, + version.replica_id, + )), + } + } +} + +fn state_from_insert_value(value: &[u8], version: RateLimitVersion) -> Option { + if let Some(shard) = decode_shard(value) { + return Some(RateLimitState::Live(shard)); + } + decode_raw_epoch_count(value).map(|value| { + RateLimitState::Live(RateLimitShard::from_live_point(LivePoint { + value, + version, + })) + }) +} + +fn state_from_stored_value(value: &[u8]) -> Option { + decode_stored_live(value).map(RateLimitState::Live) +} + +pub(super) fn merge_live_value( + current_value: Option<&[u8]>, + current_tombstone_version: Option, + incoming_value: &[u8], + incoming_version: RateLimitVersion, +) -> Option { + let incoming = state_from_insert_value(incoming_value, incoming_version)?; + let current = current_value.and_then(state_from_stored_value); + let current = match (current, current_tombstone_version) { + (Some(current), Some(tombstone_version)) => { + current.merge(RateLimitState::Tombstone(tombstone_version)) + } + (Some(current), None) => Some(current), + (None, Some(tombstone_version)) => Some(RateLimitState::Tombstone(tombstone_version)), + (None, None) => None, + }; + + let merged = match current { + Some(current) => current.merge(incoming)?, + None => incoming, + }; + let RateLimitState::Live(shard) = merged else { + return None; + }; + let value = encode_shard(&shard)?; + let changed = current_value != Some(value.as_slice()); + let live_version = shard.newest_live_version()?; + Some(LiveMerge { + value, + live_version, + changed, + }) +} + +pub(super) fn compact_operations<'a>( + operations: impl IntoIterator, +) -> Option { + let mut key = None; + let mut state: Option = None; + + for operation in operations { + key.get_or_insert_with(|| operation.key().to_string()); + let operation_state = match operation { + Operation::Insert { + value, + timestamp, + replica_id, + .. + } => { + match state_from_insert_value(value, RateLimitVersion::new(*timestamp, *replica_id)) + { + Some(state) => state, + None => continue, + } + } + Operation::Remove { + timestamp, + replica_id, + .. + } => RateLimitState::Tombstone(RateLimitVersion::new(*timestamp, *replica_id)), + }; + state = Some(match state { + Some(current) => current.merge(operation_state)?, + None => operation_state, + }); + } + + state.and_then(|state| state.into_operation(key?)) +} + +/// Merge two stored rate-limit shard payloads. Production merges go +/// through [`merge_live_value`] (it carries the live-version metadata +/// the CRDT needs). This byte-only form exists solely for the unit +/// tests below. +#[cfg(test)] +#[must_use] +fn merge(local: &[u8], remote: &[u8]) -> Vec { + match (decode_stored_live(local), decode_stored_live(remote)) { + (Some(local_state), Some(remote_state)) => { + let Some(RateLimitState::Live(shard)) = + RateLimitState::Live(local_state).merge(RateLimitState::Live(remote_state)) + else { + return local.to_vec(); }; - encode(winner.epoch, winner.count).to_vec() + encode_shard(&shard).unwrap_or_else(|| local.to_vec()) } - (Some(_), None) => local.to_vec(), + (Some(_), None) | (None, None) => local.to_vec(), (None, Some(_)) => remote.to_vec(), - (None, None) => local.to_vec(), } } @@ -83,8 +365,23 @@ pub fn merge(local: &[u8], remote: &[u8]) -> Vec { mod tests { use super::*; + fn rate_limit_version(timestamp: u64) -> RateLimitVersion { + RateLimitVersion::new(timestamp, ReplicaId::new()) + } + + fn stored(epoch: u64, count: i64, timestamp: u64) -> Vec { + merge_live_value( + None, + None, + &encode(epoch, count), + rate_limit_version(timestamp), + ) + .expect("raw epoch/count insert normalizes") + .value + } + #[test] - fn encode_decode_round_trip() { + fn raw_epoch_count_payload_round_trip() { for (epoch, count) in [ (0_u64, 0_i64), (1, 1), @@ -95,26 +392,45 @@ mod tests { ] { let buf = encode(epoch, count); assert_eq!(buf.len(), EPOCH_MAX_WINS_ENCODED_LEN); - let decoded = decode(&buf).expect("encoded buffer is 16 bytes"); + let decoded = decode_raw_epoch_count(&buf).expect("encoded buffer is 16 bytes"); assert_eq!(decoded, EpochCount { epoch, count }); } } #[test] - fn decode_rejects_wrong_lengths() { - assert_eq!(decode(&[]), None); - assert_eq!(decode(&[0u8; 15]), None); - assert_eq!(decode(&[0u8; 17]), None); - // Just inside is fine, one byte off is not. - assert!(decode(&[0u8; 16]).is_some()); + fn public_decode_accepts_raw_epoch_count_payload() { + assert_eq!( + decode(&encode(1, 2)), + Some(EpochCount { epoch: 1, count: 2 }) + ); + } + + #[test] + fn raw_epoch_count_decode_rejects_wrong_lengths() { + assert_eq!(decode_raw_epoch_count(&[]), None); + assert_eq!(decode_raw_epoch_count(&[0u8; 15]), None); + assert_eq!(decode_raw_epoch_count(&[0u8; 17]), None); + assert!(decode_raw_epoch_count(&[0u8; 16]).is_some()); + } + + #[test] + fn normalized_shard_decodes_to_epoch_count() { + let merged = merge_live_value(None, None, &encode(7, 42), rate_limit_version(10)) + .expect("raw epoch/count insert normalizes to shard state"); + assert_ne!(merged.value.len(), EPOCH_MAX_WINS_ENCODED_LEN); + assert_eq!( + decode(&merged.value), + Some(EpochCount { + epoch: 7, + count: 42 + }) + ); } #[test] fn same_epoch_max_count_wins() { - // Normal counting within a window; highest observed count is - // the cluster-wide truth. Also asserts commutativity. - let local = encode(5, 30); - let remote = encode(5, 42); + let local = stored(5, 30, 1); + let remote = stored(5, 42, 2); let merged = merge(&local, &remote); assert_eq!( decode(&merged).unwrap(), @@ -128,16 +444,13 @@ mod tests { #[test] fn higher_epoch_wins_even_with_lower_count() { - // Reset must propagate: epoch 6 count 0 beats epoch 5 count 30. - let merged = merge(&encode(5, 30), &encode(6, 0)); + let merged = merge(&stored(5, 30, 1), &stored(6, 0, 2)); assert_eq!(decode(&merged).unwrap(), EpochCount { epoch: 6, count: 0 }); } #[test] fn lower_epoch_loses_to_local_newer_window() { - // Stale remote from old window is dropped; local window-6 - // state survives. - let merged = merge(&encode(6, 10), &encode(5, 100)); + let merged = merge(&stored(6, 10, 1), &stored(5, 100, 2)); assert_eq!( decode(&merged).unwrap(), EpochCount { @@ -149,31 +462,26 @@ mod tests { #[test] fn near_simultaneous_reset_both_at_zero() { - // Both sides at epoch 5 count 0. max(0, 0) = 0. - let merged = merge(&encode(5, 0), &encode(5, 0)); + let merged = merge(&stored(5, 0, 1), &stored(5, 0, 2)); assert_eq!(decode(&merged).unwrap(), EpochCount { epoch: 5, count: 0 }); } #[test] fn malformed_remote_keeps_local() { - // Corrupt remote must not overwrite healthy local. - let local = encode(5, 30); + let local = stored(5, 30, 1); let merged = merge(&local, &[0xFFu8; 15]); - assert_eq!(merged, local.to_vec()); + assert_eq!(merged, local); } #[test] fn malformed_local_is_replaced_by_remote() { - // Healthy remote recovers a corrupt local. - let remote = encode(5, 30); + let remote = stored(5, 30, 1); let merged = merge(&[], &remote); - assert_eq!(merged, remote.to_vec()); + assert_eq!(merged, remote); } #[test] fn both_malformed_keeps_local_no_panic() { - // Per EpochMaxWins contract, both-malformed is a no-op that - // keeps local. Non-commutative by design — see module docs. let corrupt_local = vec![1u8, 2, 3]; let merged = merge(&corrupt_local, &[0xFFu8; 17]); assert_eq!(merged, corrupt_local); @@ -181,9 +489,7 @@ mod tests { #[test] fn signed_count_preserves_sign() { - // Negative counts round-trip; the merge must not silently - // reinterpret as unsigned. - let merged = merge(&encode(5, -10), &encode(5, -5)); + let merged = merge(&stored(5, -10, 1), &stored(5, -5, 2)); assert_eq!( decode(&merged).unwrap(), EpochCount { @@ -195,18 +501,15 @@ mod tests { #[test] fn merge_is_idempotent() { - // merge(v, v) == v — gossip re-delivery must not drift. - let value = encode(42, 7); - assert_eq!(merge(&value, &value), value.to_vec()); + let value = stored(42, 7, 1); + assert_eq!(merge(&value, &value), value); } #[test] fn merge_is_associative_on_three_values() { - // ((a ⊕ b) ⊕ c) == (a ⊕ (b ⊕ c)). Required for eventual - // consistency under reordering. - let a = encode(5, 10); - let b = encode(6, 3); - let c = encode(6, 9); + let a = stored(5, 10, 1); + let b = stored(6, 3, 2); + let c = stored(6, 9, 3); let ab_then_c = merge(&merge(&a, &b), &c); let a_then_bc = merge(&a, &merge(&b, &c)); assert_eq!(ab_then_c, a_then_bc); @@ -215,4 +518,84 @@ mod tests { EpochCount { epoch: 6, count: 9 } ); } + + #[test] + fn compacted_live_state_remembers_tombstone_boundary() { + let key = "rl:global:node-a".to_string(); + let ops = [ + Operation::insert(key.clone(), encode(9, 99).to_vec(), 10, ReplicaId::new()), + Operation::remove(key.clone(), 20, ReplicaId::new()), + Operation::insert(key.clone(), encode(1, 1).to_vec(), 30, ReplicaId::new()), + ]; + + let compacted = + compact_operations(ops.iter()).expect("post-tombstone live insert remains live"); + assert!(matches!(compacted, Operation::Insert { .. })); + + let delayed = Operation::insert(key.clone(), encode(9, 99).to_vec(), 10, ReplicaId::new()); + let compacted_again = compact_operations([compacted, delayed].iter()) + .expect("compacted live shard remains live"); + let Operation::Insert { value, .. } = compacted_again else { + panic!("expected live compacted shard"); + }; + assert_eq!( + decode(&value), + Some(EpochCount { epoch: 1, count: 1 }), + "pre-tombstone high-epoch insert must stay suppressed after compaction", + ); + } + + #[test] + fn compacted_live_state_uses_newest_live_version() { + let key = "rl:global:node-a".to_string(); + let ops = [ + Operation::remove(key.clone(), 50, ReplicaId::new()), + Operation::insert(key.clone(), encode(7, 100).to_vec(), 60, ReplicaId::new()), + Operation::insert(key.clone(), encode(6, 1).to_vec(), 70, ReplicaId::new()), + ]; + + let compacted = compact_operations(ops.iter()).expect("live state wins"); + let Operation::Insert { + value, timestamp, .. + } = compacted + else { + panic!("expected live compacted shard"); + }; + assert_eq!(timestamp, 70); + assert_eq!( + decode(&value), + Some(EpochCount { + epoch: 7, + count: 100 + }) + ); + } + + #[test] + fn compact_operations_skips_malformed_inserts() { + let key = "rl:global:node-a".to_string(); + let malformed = Operation::insert(key.clone(), vec![1, 2, 3], 100, ReplicaId::new()); + let valid = Operation::insert(key.clone(), encode(5, 42).to_vec(), 10, ReplicaId::new()); + let compacted = + compact_operations([malformed.clone(), valid].iter()).expect("valid insert survives"); + + let Operation::Insert { value, .. } = compacted else { + panic!("valid insert should remain after skipping malformed insert"); + }; + assert_eq!( + decode(&value), + Some(EpochCount { + epoch: 5, + count: 42 + }) + ); + + let tombstone = Operation::remove(key.clone(), 110, ReplicaId::new()); + let compacted = + compact_operations([malformed, tombstone].iter()).expect("tombstone survives"); + let Operation::Remove { timestamp, .. } = compacted else { + panic!("tombstone should remain after skipping malformed insert"); + }; + assert_eq!(timestamp, 110); + } } diff --git a/crates/mesh/src/crdt_kv/kv_store.rs b/crates/mesh/src/crdt_kv/kv_store.rs index a8e95a98c..d1bb99fdb 100644 --- a/crates/mesh/src/crdt_kv/kv_store.rs +++ b/crates/mesh/src/crdt_kv/kv_store.rs @@ -3,7 +3,7 @@ use std::sync::{ Arc, }; -use dashmap::{mapref::entry::Entry, DashMap}; +use dashmap::DashMap; // ============================================================================ // High-Performance In-Memory KV Storage - Concurrent-Safe Implementation Based on DashMap @@ -41,26 +41,6 @@ impl KvStore { self.store.insert(key, value) } - /// Atomically compute and update a key in a single DashMap entry operation. - pub fn upsert(&self, key: String, updater: F) -> Vec - where - F: FnOnce(Option<&[u8]>) -> Vec, - { - self.generation.fetch_add(1, Ordering::Release); - match self.store.entry(key) { - Entry::Occupied(mut entry) => { - let new_value = updater(Some(entry.get().as_slice())); - entry.get_mut().clone_from(&new_value); - new_value - } - Entry::Vacant(entry) => { - let new_value = updater(None); - entry.insert(new_value.clone()); - new_value - } - } - } - /// Get value by key pub fn get(&self, key: &str) -> Option> { self.store.get(key).map(|v| v.value().clone()) diff --git a/crates/mesh/src/crdt_kv/merge_strategy.rs b/crates/mesh/src/crdt_kv/merge_strategy.rs new file mode 100644 index 000000000..01f3e7bd1 --- /dev/null +++ b/crates/mesh/src/crdt_kv/merge_strategy.rs @@ -0,0 +1,10 @@ +/// Merge strategy for CRDT namespaces. Determines how conflicts are resolved +/// when two nodes write the same key concurrently. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MergeStrategy { + /// Higher (version, replica_id) wins. Used for worker:*, policy:*, config:*. + LastWriterWins, + /// Compare epochs first, then max within same epoch. + /// Values MUST be exactly 16 bytes: epoch (u64 big-endian) + count (i64 big-endian). + EpochMaxWins, +} diff --git a/crates/mesh/src/crdt_kv/mod.rs b/crates/mesh/src/crdt_kv/mod.rs index c7570b405..c70e73a15 100644 --- a/crates/mesh/src/crdt_kv/mod.rs +++ b/crates/mesh/src/crdt_kv/mod.rs @@ -5,12 +5,14 @@ mod crdt; mod epoch_max_wins; mod kv_store; +mod merge_strategy; mod operation; mod replica; // Export core types pub use crdt::CrdtOrMap; -pub use epoch_max_wins::{decode, encode, merge, EpochCount, EPOCH_MAX_WINS_ENCODED_LEN}; +pub use epoch_max_wins::{decode, encode, EpochCount, EPOCH_MAX_WINS_ENCODED_LEN}; +pub use merge_strategy::MergeStrategy; pub use operation::{Operation, OperationLog}; pub use replica::ReplicaId; diff --git a/crates/mesh/src/crdt_kv/operation.rs b/crates/mesh/src/crdt_kv/operation.rs index 4bad001c1..609a2db6c 100644 --- a/crates/mesh/src/crdt_kv/operation.rs +++ b/crates/mesh/src/crdt_kv/operation.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; -use super::replica::ReplicaId; +use super::{epoch_max_wins, merge_strategy::MergeStrategy, replica::ReplicaId}; // ============================================================================ // Operation Type Definition - Atomic Unit of State Change @@ -112,9 +112,16 @@ impl OperationLog { /// new appends, not on every append. If compaction doesn't reduce below /// threshold (very high key cardinality), the oldest entries are truncated. pub fn append(&mut self, operation: Operation) { + self.append_with_strategy(operation, |_| MergeStrategy::LastWriterWins); + } + + pub(super) fn append_with_strategy(&mut self, operation: Operation, strategy_for_key: F) + where + F: Fn(&str) -> MergeStrategy, + { self.operations.push(operation); if self.operations.len() > Self::AUTO_COMPACT_THRESHOLD { - self.compact(); + self.compact_with_strategy(strategy_for_key); // If still over threshold after dedup (extremely high key cardinality // >10K unique keys), truncate oldest entries. This drops state for the // oldest keys, which will be re-synced from peers on the next merge. @@ -159,22 +166,51 @@ impl OperationLog { self.operations.is_empty() } - fn latest_operations_by_key(&self) -> HashMap { - let mut latest_by_key: HashMap = HashMap::new(); + fn latest_lww_operation<'a, I>(operations: I) -> Option<&'a Operation> + where + I: IntoIterator, + { + operations + .into_iter() + .max_by_key(|operation| (operation.timestamp(), operation.replica_id())) + } + + fn latest_epoch_max_wins_operation<'a>( + operations: impl IntoIterator, + ) -> Option { + epoch_max_wins::compact_operations(operations) + } + + fn latest_operations_by_key_with_strategy( + &self, + strategy_for_key: F, + ) -> HashMap + where + F: Fn(&str) -> MergeStrategy, + { + let mut operations_by_key: HashMap> = HashMap::new(); for operation in &self.operations { - let key = operation.key().to_string(); - match latest_by_key.get(&key) { - Some(current) - if (current.timestamp(), current.replica_id()) - >= (operation.timestamp(), operation.replica_id()) => {} - _ => { - latest_by_key.insert(key, operation.clone()); - } - } + operations_by_key + .entry(operation.key().to_string()) + .or_default() + .push(operation); } - latest_by_key + operations_by_key + .into_iter() + .filter_map(|(key, operations)| { + let latest = match strategy_for_key(&key) { + MergeStrategy::LastWriterWins => { + Self::latest_lww_operation(operations).cloned() + } + MergeStrategy::EpochMaxWins => { + Self::latest_epoch_max_wins_operation(operations) + } + }?; + Some((key, latest)) + }) + .collect() } /// Keep only latest operation per key to bound log growth. @@ -185,8 +221,15 @@ impl OperationLog { /// `apply_operation` and `operation_id` guards keep state semantics safe. /// Stronger concurrency retention would require vector-clock/version-vector metadata. pub fn compact(&mut self) { + self.compact_with_strategy(|_| MergeStrategy::LastWriterWins); + } + + pub(super) fn compact_with_strategy(&mut self, strategy_for_key: F) + where + F: Fn(&str) -> MergeStrategy, + { self.operations = self - .latest_operations_by_key() + .latest_operations_by_key_with_strategy(strategy_for_key) .into_values() .collect::>(); self.operations @@ -199,9 +242,12 @@ impl OperationLog { .retain(|operation| operation.timestamp() > watermark); } - /// Build a latest-state snapshot and clear the operation log. - pub fn snapshot_and_truncate(&mut self) -> HashMap { - let snapshot = self.latest_operations_by_key(); + /// Build a latest-state snapshot with the configured merge strategy and clear the operation log. + pub fn snapshot_and_truncate(&mut self, strategy_for_key: F) -> HashMap + where + F: Fn(&str) -> MergeStrategy, + { + let snapshot = self.latest_operations_by_key_with_strategy(strategy_for_key); self.operations.clear(); snapshot } diff --git a/crates/mesh/src/crdt_kv/tests.rs b/crates/mesh/src/crdt_kv/tests.rs index ab8b114db..5865b677e 100644 --- a/crates/mesh/src/crdt_kv/tests.rs +++ b/crates/mesh/src/crdt_kv/tests.rs @@ -7,7 +7,10 @@ use tracing_subscriber::{ use super::{ crdt::CrdtOrMap, + epoch_max_wins::{decode, encode, EpochCount}, + merge_strategy::MergeStrategy, operation::{Operation, OperationLog}, + replica::ReplicaId, }; static INIT: Once = Once::new(); @@ -219,6 +222,182 @@ fn test_older_insert_applied_later_does_not_overwrite_winner() { assert_eq!(replica.get("key1"), Some(b"newer_value".to_vec())); } +#[test] +fn test_epoch_max_wins_compaction_uses_value_epoch() { + init_test_logging(); + let replica = CrdtOrMap::new(); + replica.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); + + let key = "rl:global:node-a"; + let older_reset = + Operation::insert(key.to_string(), encode(6, 0).to_vec(), 1, ReplicaId::new()); + let newer_stale_count = Operation::insert( + key.to_string(), + encode(5, 100).to_vec(), + 2, + ReplicaId::new(), + ); + + let mut log = OperationLog::new(); + log.append(newer_stale_count); + log.append(older_reset); + + replica.merge(&log); + + let value = replica.get(key).expect("rate-limit shard should exist"); + assert_eq!(decode(&value), Some(EpochCount { epoch: 6, count: 0 })); +} + +#[test] +fn test_epoch_max_wins_preserves_newer_tombstone() { + init_test_logging(); + let replica = CrdtOrMap::new(); + replica.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); + + let key = "rl:global:dead-node"; + let stale_insert = + Operation::insert(key.to_string(), encode(6, 50).to_vec(), 1, ReplicaId::new()); + let tombstone = Operation::remove(key.to_string(), 2, ReplicaId::new()); + + let mut log = OperationLog::new(); + log.append(stale_insert); + log.append(tombstone); + + replica.merge(&log); + + assert_eq!(replica.get(key), None); +} + +#[test] +fn test_epoch_max_wins_local_write_cannot_rewind_epoch() { + init_test_logging(); + let replica = CrdtOrMap::new(); + replica.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); + + let key = "rl:global:node-a"; + replica.insert(key.to_string(), encode(6, 0).to_vec()); + replica.insert(key.to_string(), encode(5, 100).to_vec()); + + let value = replica.get(key).expect("rate-limit shard should exist"); + assert_eq!(decode(&value), Some(EpochCount { epoch: 6, count: 0 })); +} + +#[test] +fn test_epoch_max_wins_tombstone_compares_against_newest_live_version() { + init_test_logging(); + let replica = CrdtOrMap::new(); + replica.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); + + let key = "rl:global:node-a"; + let stale_newer_timestamp = Operation::insert( + key.to_string(), + encode(5, 100).to_vec(), + 100, + ReplicaId::new(), + ); + let epoch_winner_older_timestamp = + Operation::insert(key.to_string(), encode(6, 0).to_vec(), 90, ReplicaId::new()); + let tombstone_after_epoch_winner = Operation::remove(key.to_string(), 95, ReplicaId::new()); + + let mut stale_log = OperationLog::new(); + stale_log.append(stale_newer_timestamp); + replica.merge(&stale_log); + + let mut reset_log = OperationLog::new(); + reset_log.append(epoch_winner_older_timestamp); + replica.merge(&reset_log); + assert_eq!( + decode(&replica.get(key).expect("reset should win stale count")), + Some(EpochCount { epoch: 6, count: 0 }), + ); + + let mut tombstone_log = OperationLog::new(); + tombstone_log.append(tombstone_after_epoch_winner); + replica.merge(&tombstone_log); + + assert_eq!( + decode( + &replica + .get(key) + .expect("newer live version suppresses tombstone") + ), + Some(EpochCount { epoch: 6, count: 0 }), + ); +} + +#[test] +fn test_epoch_max_wins_snapshot_only_propagation_preserves_tombstone_boundary() { + // Snapshot-only path: the source replica compacts its log so a + // peer receives just one Insert per key (with the shard's + // `tombstone_version` embedded), never the original Remove op. + // A late peer that still holds the pre-tombstone high-epoch + // insert must not be able to resurrect it. + init_test_logging(); + let key = "rl:global:node-a"; + + // Source: pre-tombstone high-epoch insert, then tombstone, then + // post-tombstone lower-epoch insert. After merge+compact, the + // log holds a single shard insert with tombstone_version=65. + let source = CrdtOrMap::new(); + source.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); + let mut source_log = OperationLog::new(); + source_log.append(Operation::insert( + key.to_string(), + encode(7, 99).to_vec(), + 60, + ReplicaId::new(), + )); + source_log.append(Operation::remove(key.to_string(), 65, ReplicaId::new())); + source_log.append(Operation::insert( + key.to_string(), + encode(6, 1).to_vec(), + 70, + ReplicaId::new(), + )); + source.merge(&source_log); + + let snapshot_log = source.get_operation_log(); + assert_eq!( + snapshot_log.operations().len(), + 1, + "compaction must reduce to a single shard insert", + ); + + // Receiver applies the snapshot — gets the shard with + // tombstone_version embedded but no Remove op in its log. + let receiver = CrdtOrMap::new(); + receiver.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); + receiver.merge(&snapshot_log); + assert_eq!( + decode(&receiver.get(key).expect("post-tombstone insert applied")), + Some(EpochCount { epoch: 6, count: 1 }), + ); + + // Late peer that never saw the Remove gossips the original + // pre-tombstone high-epoch insert. The receiver must reject it + // — the shard's embedded tombstone_version (65) > the late + // insert's version (60), so it gets filtered. + let mut late_log = OperationLog::new(); + late_log.append(Operation::insert( + key.to_string(), + encode(7, 99).to_vec(), + 60, + ReplicaId::new(), + )); + receiver.merge(&late_log); + + assert_eq!( + decode( + &receiver + .get(key) + .expect("post-tombstone state must survive late pre-tombstone insert") + ), + Some(EpochCount { epoch: 6, count: 1 }), + "pre-tombstone insert must not resurrect when only the snapshot \ + (no Remove op) has reached the receiver", + ); +} + // ============================================================================ // Serialization Tests // ============================================================================ @@ -282,6 +461,210 @@ fn test_operation_log_merge_deduplicates() { assert_eq!(merged_log.len(), merged_once_len); } +#[test] +fn test_operation_log_snapshot_uses_merge_strategy() { + let key = "rl:global:node-a"; + let stale_newer_timestamp = Operation::insert( + key.to_string(), + encode(5, 100).to_vec(), + 2, + ReplicaId::new(), + ); + let epoch_winner_older_timestamp = + Operation::insert(key.to_string(), encode(6, 0).to_vec(), 1, ReplicaId::new()); + + let mut log = OperationLog::new(); + log.append(stale_newer_timestamp); + log.append(epoch_winner_older_timestamp); + + let snapshot = log.snapshot_and_truncate(|key| { + if key.starts_with("rl:") { + MergeStrategy::EpochMaxWins + } else { + MergeStrategy::LastWriterWins + } + }); + + let Operation::Insert { value, .. } = snapshot.get(key).expect("snapshot keeps rl shard") + else { + panic!("snapshot should keep an insert"); + }; + assert_eq!(decode(value), Some(EpochCount { epoch: 6, count: 0 })); + assert!(log.is_empty(), "snapshot truncates the source log"); +} + +#[test] +fn test_operation_log_epoch_max_wins_tombstone_selection_is_order_independent() { + let key = "rl:global:node-a"; + let stale_lower_epoch = Operation::insert( + key.to_string(), + encode(5, 100).to_vec(), + 80, + ReplicaId::new(), + ); + let epoch_winner_older_timestamp = + Operation::insert(key.to_string(), encode(6, 0).to_vec(), 90, ReplicaId::new()); + let tombstone_after_epoch_winner = Operation::remove(key.to_string(), 95, ReplicaId::new()); + let orders = [ + [ + stale_lower_epoch.clone(), + epoch_winner_older_timestamp.clone(), + tombstone_after_epoch_winner.clone(), + ], + [ + stale_lower_epoch.clone(), + tombstone_after_epoch_winner.clone(), + epoch_winner_older_timestamp.clone(), + ], + [ + epoch_winner_older_timestamp.clone(), + stale_lower_epoch.clone(), + tombstone_after_epoch_winner.clone(), + ], + [ + epoch_winner_older_timestamp.clone(), + tombstone_after_epoch_winner.clone(), + stale_lower_epoch.clone(), + ], + [ + tombstone_after_epoch_winner.clone(), + stale_lower_epoch.clone(), + epoch_winner_older_timestamp.clone(), + ], + [ + tombstone_after_epoch_winner.clone(), + epoch_winner_older_timestamp.clone(), + stale_lower_epoch.clone(), + ], + ]; + + for order in orders { + let mut log = OperationLog::new(); + for operation in order { + log.append(operation); + } + + let snapshot = log.snapshot_and_truncate(|key| { + if key.starts_with("rl:") { + MergeStrategy::EpochMaxWins + } else { + MergeStrategy::LastWriterWins + } + }); + + let Some(Operation::Remove { timestamp, .. }) = snapshot.get(key) else { + panic!("tombstone should win consistently for order {snapshot:?}"); + }; + assert_eq!(*timestamp, 95); + } +} + +#[test] +fn test_operation_log_epoch_max_wins_post_tombstone_insert_revives_key() { + let key = "rl:global:node-a"; + let pre_tombstone_higher_epoch = + Operation::insert(key.to_string(), encode(7, 0).to_vec(), 90, ReplicaId::new()); + let tombstone = Operation::remove(key.to_string(), 95, ReplicaId::new()); + let post_tombstone_lower_epoch = Operation::insert( + key.to_string(), + encode(6, 0).to_vec(), + 100, + ReplicaId::new(), + ); + let orders = [ + [ + pre_tombstone_higher_epoch.clone(), + tombstone.clone(), + post_tombstone_lower_epoch.clone(), + ], + [ + pre_tombstone_higher_epoch.clone(), + post_tombstone_lower_epoch.clone(), + tombstone.clone(), + ], + [ + tombstone.clone(), + pre_tombstone_higher_epoch.clone(), + post_tombstone_lower_epoch.clone(), + ], + [ + tombstone.clone(), + post_tombstone_lower_epoch.clone(), + pre_tombstone_higher_epoch.clone(), + ], + [ + post_tombstone_lower_epoch.clone(), + pre_tombstone_higher_epoch.clone(), + tombstone.clone(), + ], + [ + post_tombstone_lower_epoch.clone(), + tombstone.clone(), + pre_tombstone_higher_epoch.clone(), + ], + ]; + + for order in orders { + let mut log = OperationLog::new(); + for operation in order { + log.append(operation); + } + + let snapshot = log.snapshot_and_truncate(|key| { + if key.starts_with("rl:") { + MergeStrategy::EpochMaxWins + } else { + MergeStrategy::LastWriterWins + } + }); + + let Some(Operation::Insert { + value, timestamp, .. + }) = snapshot.get(key) + else { + panic!("post-tombstone insert should revive key for order {snapshot:?}"); + }; + assert_eq!(*timestamp, 100); + assert_eq!(decode(value), Some(EpochCount { epoch: 6, count: 0 })); + } +} + +#[test] +fn test_operation_log_epoch_max_wins_post_tombstone_insert_wins_over_pre_tombstone_equal_epoch() { + let key = "rl:global:node-a"; + let newer_insert = Operation::insert( + key.to_string(), + encode(6, 0).to_vec(), + 100, + ReplicaId::new(), + ); + let older_equal_insert = + Operation::insert(key.to_string(), encode(6, 0).to_vec(), 10, ReplicaId::new()); + let tombstone_between = Operation::remove(key.to_string(), 50, ReplicaId::new()); + + let mut log = OperationLog::new(); + log.append(older_equal_insert); + log.append(tombstone_between); + log.append(newer_insert); + + let snapshot = log.snapshot_and_truncate(|key| { + if key.starts_with("rl:") { + MergeStrategy::EpochMaxWins + } else { + MergeStrategy::LastWriterWins + } + }); + + let Some(Operation::Insert { + value, timestamp, .. + }) = snapshot.get(key) + else { + panic!("newer equal-value insert should win over intermediate tombstone"); + }; + assert_eq!(*timestamp, 100); + assert_eq!(decode(value), Some(EpochCount { epoch: 6, count: 0 })); +} + #[test] fn test_apply_operation_log() { init_test_logging(); diff --git a/crates/mesh/src/kv.rs b/crates/mesh/src/kv.rs index 19efde7c2..dc1799681 100644 --- a/crates/mesh/src/kv.rs +++ b/crates/mesh/src/kv.rs @@ -19,31 +19,15 @@ use dashmap::{mapref::entry::Entry as DashMapEntry, DashMap}; use parking_lot::RwLock; use tokio::sync::mpsc; -use crate::{chunk_assembler::ChunkAssembler, crdt_kv::CrdtOrMap}; +use crate::{ + chunk_assembler::ChunkAssembler, + crdt_kv::{CrdtOrMap, MergeStrategy}, +}; // ============================================================================ // Type Definitions // ============================================================================ -/// Merge strategy for CRDT namespaces. Determines how conflicts are resolved -/// when two nodes write the same key concurrently. -#[derive(Debug, Clone)] -#[expect(clippy::enum_variant_names)] -pub enum MergeStrategy { - /// Higher (version, replica_id) wins. Used for worker:*, policy:*, config:*. - LastWriterWins, - /// Higher numeric value wins (simple max). Reserved for future use. - MaxValueWins, - /// Compare epochs first, then max within same epoch. - /// The mesh crate implements this internally — no application callback needed. - /// Values MUST be exactly 16 bytes: epoch (u64 big-endian) + count (i64 big-endian). - /// The adapter is responsible for serializing RateLimitValue to this fixed format. - /// - /// If either local or remote value is not exactly 16 bytes (corrupt/truncated message), - /// the merge keeps the well-formed value. If both are malformed, keeps local. - EpochMaxWins, -} - /// Routing mode for stream namespaces. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamRouting { @@ -495,6 +479,7 @@ impl MeshKV { pub fn new(server_name: String) -> Self { let replica_id = Self::derive_replica_id(&server_name); let store = Arc::new(CrdtOrMap::new()); + store.register_merge_strategy("config:".to_string(), MergeStrategy::LastWriterWins); let subscriber_registry = Arc::new(SubscriberRegistry::new()); let mut configured_prefixes = HashMap::new(); configured_prefixes.insert( @@ -577,6 +562,8 @@ impl MeshKV { }, ); } + self.store + .register_merge_strategy(prefix.to_string(), merge_strategy.clone()); Arc::new(CrdtNamespace { prefix: prefix.to_string(), diff --git a/crates/mesh/src/lib.rs b/crates/mesh/src/lib.rs index 7eab1017c..2cff6e831 100644 --- a/crates/mesh/src/lib.rs +++ b/crates/mesh/src/lib.rs @@ -34,12 +34,12 @@ mod tests; // v2 API pub use chunking::MAX_STREAM_CHUNK_BYTES; pub use crdt_kv::{ - decode as decode_epoch_count, encode as encode_epoch_count, merge as merge_epoch_max_wins, - CrdtOrMap, EpochCount, OperationLog, EPOCH_MAX_WINS_ENCODED_LEN, + decode as decode_epoch_count, encode as encode_epoch_count, CrdtOrMap, EpochCount, + MergeStrategy, OperationLog, EPOCH_MAX_WINS_ENCODED_LEN, }; pub use kv::{ - CrdtNamespace, DrainHandle, MergeStrategy, MeshKV, StreamConfig, StreamDrainFn, - StreamNamespace, StreamRouting, Subscription, + CrdtNamespace, DrainHandle, MeshKV, StreamConfig, StreamDrainFn, StreamNamespace, + StreamRouting, Subscription, }; pub use metrics::init_mesh_metrics; pub use mtls::{MTLSConfig, MTLSManager, OptionalMTLSManager}; diff --git a/model_gateway/src/mesh/adapters/rate_limit_sync.rs b/model_gateway/src/mesh/adapters/rate_limit_sync.rs index e6c65e06e..136d7a798 100644 --- a/model_gateway/src/mesh/adapters/rate_limit_sync.rs +++ b/model_gateway/src/mesh/adapters/rate_limit_sync.rs @@ -8,11 +8,14 @@ //! window resets propagate without undoing the reset via a naive //! `max(old, new)`. //! -//! Wire format is the shared 16-byte layout from the mesh crate: +//! The gateway writes the shared 16-byte application payload: //! `u64` big-endian epoch in bytes 0..8, `i64` big-endian count in -//! bytes 8..16. The helpers [`encode_epoch_count`] / -//! [`decode_epoch_count`] match what the mesh merge itself reads, so -//! this adapter never drifts from the merge format. +//! bytes 8..16. The mesh CRDT normalizes stored `rl:` values into a +//! rate-limit shard state that also remembers live/tombstone merge +//! metadata. [`decode_epoch_count`] reads that shard state back into +//! plain epoch/count and also accepts the raw write payload that local +//! namespace subscribers can observe before CRDT normalization, so the +//! adapter never handles CRDT internals. //! //! The caller owns the epoch clock — typically //! `now.as_secs() / window.as_secs()`. `sync_counter` writes the @@ -29,9 +32,7 @@ use std::sync::Arc; use bytes::Bytes; -use smg_mesh::{ - decode_epoch_count, encode_epoch_count, CrdtNamespace, EpochCount, EPOCH_MAX_WINS_ENCODED_LEN, -}; +use smg_mesh::{decode_epoch_count, encode_epoch_count, CrdtNamespace, EpochCount}; use tracing::{debug, warn}; const PREFIX: &str = "rl:"; @@ -87,7 +88,7 @@ impl RateLimitSyncAdapter { /// Aggregate reads always hit the CRDT store directly, so /// subscription is observability-only today: it logs remote /// shards at debug and warns when a value doesn't match the - /// 16-byte wire format. + /// rate-limit shard format. pub fn start(self: &Arc) { let mut sub = self.rate_limits.subscribe(""); #[expect( @@ -141,7 +142,7 @@ impl RateLimitSyncAdapter { None => warn!( shard, len = bytes.len(), - "rate-limit value must be exactly {EPOCH_MAX_WINS_ENCODED_LEN} bytes", + "rate-limit value must decode as epoch/count or normalized shard state", ), } } @@ -210,7 +211,7 @@ mod tests { } #[tokio::test] - async fn sync_counter_writes_sixteen_byte_big_endian_value() { + async fn sync_counter_writes_decodable_epoch_count_value() { let mesh = MeshKV::new("node-a".into()); let ns = rl_namespace(&mesh); let adapter = RateLimitSyncAdapter::new(ns.clone(), "node-a".into()); @@ -220,11 +221,6 @@ mod tests { let raw = ns .get("rl:global:node-a") .expect("shard is written under rl:{counter}:{node}"); - assert_eq!( - raw.len(), - EPOCH_MAX_WINS_ENCODED_LEN, - "wire format is fixed-size" - ); let decoded = decode_epoch_count(&raw).unwrap(); assert_eq!( decoded,