Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 251 additions & 64 deletions crates/mesh/src/crdt_kv/crdt.rs

Large diffs are not rendered by default.

547 changes: 465 additions & 82 deletions crates/mesh/src/crdt_kv/epoch_max_wins.rs

Large diffs are not rendered by default.

22 changes: 1 addition & 21 deletions crates/mesh/src/crdt_kv/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{
Arc,
};

use dashmap::{mapref::entry::Entry, DashMap};
use dashmap::DashMap;

// ============================================================================
// High-Performance In-Memory KV Storage - Concurrent-Safe Implementation Based on DashMap
Expand Down Expand Up @@ -41,26 +41,6 @@ impl KvStore {
self.store.insert(key, value)
}

/// Atomically compute and update a key in a single DashMap entry operation.
pub fn upsert<F>(&self, key: String, updater: F) -> Vec<u8>
where
F: FnOnce(Option<&[u8]>) -> Vec<u8>,
{
self.generation.fetch_add(1, Ordering::Release);
match self.store.entry(key) {
Entry::Occupied(mut entry) => {
let new_value = updater(Some(entry.get().as_slice()));
entry.get_mut().clone_from(&new_value);
new_value
}
Entry::Vacant(entry) => {
let new_value = updater(None);
entry.insert(new_value.clone());
new_value
}
}
}

/// Get value by key
pub fn get(&self, key: &str) -> Option<Vec<u8>> {
self.store.get(key).map(|v| v.value().clone())
Expand Down
10 changes: 10 additions & 0 deletions crates/mesh/src/crdt_kv/merge_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/// Merge strategy for CRDT namespaces. Determines how conflicts are resolved
/// when two nodes write the same key concurrently.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MergeStrategy {
/// Higher (version, replica_id) wins. Used for worker:*, policy:*, config:*.
LastWriterWins,
/// Compare epochs first, then max within same epoch.
/// Values MUST be exactly 16 bytes: epoch (u64 big-endian) + count (i64 big-endian).
EpochMaxWins,
}
4 changes: 3 additions & 1 deletion crates/mesh/src/crdt_kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
mod crdt;
mod epoch_max_wins;
mod kv_store;
mod merge_strategy;
mod operation;
mod replica;

// Export core types
pub use crdt::CrdtOrMap;
pub use epoch_max_wins::{decode, encode, merge, EpochCount, EPOCH_MAX_WINS_ENCODED_LEN};
pub use epoch_max_wins::{decode, encode, EpochCount, EPOCH_MAX_WINS_ENCODED_LEN};
pub use merge_strategy::MergeStrategy;
pub use operation::{Operation, OperationLog};
pub use replica::ReplicaId;

Expand Down
82 changes: 64 additions & 18 deletions crates/mesh/src/crdt_kv/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use super::replica::ReplicaId;
use super::{epoch_max_wins, merge_strategy::MergeStrategy, replica::ReplicaId};

// ============================================================================
// Operation Type Definition - Atomic Unit of State Change
Expand Down Expand Up @@ -112,9 +112,16 @@ impl OperationLog {
/// new appends, not on every append. If compaction doesn't reduce below
/// threshold (very high key cardinality), the oldest entries are truncated.
pub fn append(&mut self, operation: Operation) {
self.append_with_strategy(operation, |_| MergeStrategy::LastWriterWins);
}

pub(super) fn append_with_strategy<F>(&mut self, operation: Operation, strategy_for_key: F)
where
F: Fn(&str) -> MergeStrategy,
{
self.operations.push(operation);
if self.operations.len() > Self::AUTO_COMPACT_THRESHOLD {
self.compact();
self.compact_with_strategy(strategy_for_key);
// If still over threshold after dedup (extremely high key cardinality
// >10K unique keys), truncate oldest entries. This drops state for the
// oldest keys, which will be re-synced from peers on the next merge.
Expand Down Expand Up @@ -159,22 +166,51 @@ impl OperationLog {
self.operations.is_empty()
}

fn latest_operations_by_key(&self) -> HashMap<String, Operation> {
let mut latest_by_key: HashMap<String, Operation> = HashMap::new();
fn latest_lww_operation<'a, I>(operations: I) -> Option<&'a Operation>
where
I: IntoIterator<Item = &'a Operation>,
{
operations
.into_iter()
.max_by_key(|operation| (operation.timestamp(), operation.replica_id()))
}

fn latest_epoch_max_wins_operation<'a>(
operations: impl IntoIterator<Item = &'a Operation>,
) -> Option<Operation> {
epoch_max_wins::compact_operations(operations)
}

fn latest_operations_by_key_with_strategy<F>(
&self,
strategy_for_key: F,
) -> HashMap<String, Operation>
where
F: Fn(&str) -> MergeStrategy,
{
let mut operations_by_key: HashMap<String, Vec<&Operation>> = HashMap::new();

for operation in &self.operations {
let key = operation.key().to_string();
match latest_by_key.get(&key) {
Some(current)
if (current.timestamp(), current.replica_id())
>= (operation.timestamp(), operation.replica_id()) => {}
_ => {
latest_by_key.insert(key, operation.clone());
}
}
operations_by_key
.entry(operation.key().to_string())
.or_default()
.push(operation);
}

latest_by_key
operations_by_key
.into_iter()
.filter_map(|(key, operations)| {
let latest = match strategy_for_key(&key) {
MergeStrategy::LastWriterWins => {
Self::latest_lww_operation(operations).cloned()
}
MergeStrategy::EpochMaxWins => {
Self::latest_epoch_max_wins_operation(operations)
}
}?;
Some((key, latest))
})
.collect()
}

/// Keep only latest operation per key to bound log growth.
Expand All @@ -185,8 +221,15 @@ impl OperationLog {
/// `apply_operation` and `operation_id` guards keep state semantics safe.
/// Stronger concurrency retention would require vector-clock/version-vector metadata.
pub fn compact(&mut self) {
self.compact_with_strategy(|_| MergeStrategy::LastWriterWins);
}

pub(super) fn compact_with_strategy<F>(&mut self, strategy_for_key: F)
where
F: Fn(&str) -> MergeStrategy,
{
self.operations = self
.latest_operations_by_key()
.latest_operations_by_key_with_strategy(strategy_for_key)
.into_values()
.collect::<Vec<_>>();
self.operations
Expand All @@ -199,9 +242,12 @@ impl OperationLog {
.retain(|operation| operation.timestamp() > watermark);
}

/// Build a latest-state snapshot and clear the operation log.
pub fn snapshot_and_truncate(&mut self) -> HashMap<String, Operation> {
let snapshot = self.latest_operations_by_key();
/// Build a latest-state snapshot with the configured merge strategy and clear the operation log.
pub fn snapshot_and_truncate<F>(&mut self, strategy_for_key: F) -> HashMap<String, Operation>
where
F: Fn(&str) -> MergeStrategy,
{
let snapshot = self.latest_operations_by_key_with_strategy(strategy_for_key);
self.operations.clear();
snapshot
}
Expand Down
Loading
Loading