Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 101 additions & 8 deletions crates/mesh/src/crdt_kv/crdt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cmp::Reverse,
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
Expand All @@ -9,7 +10,9 @@ use parking_lot::{Mutex, RwLock};
use tracing::{debug, info};

use super::{
epoch_max_wins,
kv_store::KvStore,
merge_strategy::MergeStrategy,
operation::{Operation, OperationLog},
replica::{LamportClock, ReplicaId},
};
Expand Down Expand Up @@ -83,6 +86,7 @@ pub struct CrdtOrMap {
store: KvStore,
metadata: Arc<DashMap<String, Vec<ValueMetadata>>>, // Key to list of versions
key_locks: Arc<DashMap<String, Arc<Mutex<()>>>>, // Per-key critical section lock
merge_strategies: Arc<RwLock<Vec<(String, MergeStrategy)>>>,
replica_id: ReplicaId,
clock: LamportClock,
operation_log: Arc<RwLock<OperationLog>>,
Expand All @@ -101,12 +105,45 @@ impl CrdtOrMap {
store: KvStore::new(),
metadata: Arc::new(DashMap::new()),
key_locks: Arc::new(DashMap::new()),
merge_strategies: Arc::new(RwLock::new(Vec::new())),
replica_id,
clock: LamportClock::new(),
operation_log: Arc::new(RwLock::new(OperationLog::new())),
}
}

/// Register the merge strategy for a key prefix.
pub(crate) fn register_merge_strategy(&self, prefix: String, strategy: MergeStrategy) {
let mut strategies = self.merge_strategies.write();
if let Some((_, existing)) = strategies
.iter_mut()
.find(|(registered_prefix, _)| registered_prefix == &prefix)
{
*existing = strategy;
} else {
strategies.push((prefix, strategy));
}
strategies.sort_by_key(|(prefix, _)| Reverse(prefix.len()));
}

fn merge_strategy_for_key(&self, key: &str) -> MergeStrategy {
self.merge_strategies
.read()
.iter()
.find_map(|(prefix, strategy)| key.starts_with(prefix).then(|| strategy.clone()))
.unwrap_or(MergeStrategy::LastWriterWins)
}

fn compact_operation_log(&self, operation_log: &mut OperationLog) {
operation_log.compact_with_strategy(|key| self.merge_strategy_for_key(key));
}
Comment thread
CatherineSue marked this conversation as resolved.

fn append_operation(&self, operation: Operation) {
self.operation_log
.write()
.append_with_strategy(operation, |key| self.merge_strategy_for_key(key));
}
Comment thread
CatherineSue marked this conversation as resolved.

fn key_lock_for(&self, key: &str) -> Arc<Mutex<()>> {
self.key_locks
.entry(key.to_string())
Expand Down Expand Up @@ -151,7 +188,7 @@ impl CrdtOrMap {

let operation =
Operation::insert(key.clone(), value_for_operation, timestamp, self.replica_id);
self.operation_log.write().append(operation);
self.append_operation(operation);

debug!(
"Insert: key={}, timestamp={}, replica={}",
Expand Down Expand Up @@ -189,7 +226,7 @@ impl CrdtOrMap {
);

self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);
self.append_operation(operation);

updated_value
} else {
Expand Down Expand Up @@ -229,7 +266,7 @@ impl CrdtOrMap {
);

self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);
self.append_operation(operation);

updated_value
} else {
Expand Down Expand Up @@ -269,7 +306,7 @@ impl CrdtOrMap {
self.replica_id,
);
self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);
self.append_operation(operation);
(updated_value, true)
} else {
(self.store.get(&key).unwrap_or_default(), false)
Expand Down Expand Up @@ -297,7 +334,7 @@ impl CrdtOrMap {

let removed = if self.record_remove_metadata(key, timestamp, self.replica_id) {
let operation = Operation::remove(key.to_string(), timestamp, self.replica_id);
self.operation_log.write().append(operation);
self.append_operation(operation);
self.store.remove(key)
} else {
None
Expand Down Expand Up @@ -456,7 +493,7 @@ impl CrdtOrMap {
let unseen_operations: Vec<Operation> = {
let mut local_log = self.operation_log.write();
local_log.merge(log);
local_log.compact();
self.compact_operation_log(&mut local_log);

let mut unseen: Vec<Operation> = local_log
.operations()
Expand Down Expand Up @@ -492,8 +529,25 @@ impl CrdtOrMap {
let key_lock = self.key_lock_for(key);
let key_guard = key_lock.lock();

if self.record_insert_metadata(key, timestamp, replica_id) {
self.store.insert(key.to_string(), value);
match self.merge_strategy_for_key(key) {
MergeStrategy::EpochMaxWins => {
if self.record_epoch_insert_metadata(key, timestamp, replica_id) {
let current = self.store.get(key);
let merged = if let Some(local) = current.as_deref() {
epoch_max_wins::merge(local, &value)
} else {
value
};
if current.as_deref() != Some(merged.as_slice()) {
self.store.insert(key.to_string(), merged);
}
}
}
MergeStrategy::LastWriterWins | MergeStrategy::MaxValueWins => {
if self.record_insert_metadata(key, timestamp, replica_id) {
self.store.insert(key.to_string(), value);
}
}
}

drop(key_guard);
Expand Down Expand Up @@ -545,6 +599,45 @@ impl CrdtOrMap {
}
}

fn record_epoch_insert_metadata(
&self,
key: &str,
timestamp: u64,
replica_id: ReplicaId,
) -> bool {
let new_metadata = ValueMetadata::new(timestamp, replica_id);

match self.metadata.entry(key.to_string()) {
MapEntry::Occupied(mut entry) => {
let versions = entry.get_mut();

let has_existing_entry = versions
.iter()
.any(|v| !v.is_tombstone && v.matches_version(timestamp, replica_id));
Comment thread
CatherineSue marked this conversation as resolved.
Outdated
if has_existing_entry {
Self::compact_key_metadata(versions);
return false;
}

let has_newer_tombstone = versions
.iter()
.any(|v| v.is_tombstone && v.is_newer_than(timestamp, replica_id));
if has_newer_tombstone {
Self::compact_key_metadata(versions);
return false;
}

versions.push(new_metadata);
Self::compact_key_metadata(versions);
Comment thread
CatherineSue marked this conversation as resolved.
Outdated
true
}
MapEntry::Vacant(entry) => {
entry.insert(vec![new_metadata]);
true
}
}
}

/// Apply remove
fn apply_remove(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> Option<Vec<u8>> {
let key_lock = self.key_lock_for(key);
Expand Down
13 changes: 13 additions & 0 deletions crates/mesh/src/crdt_kv/merge_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/// Merge strategy for CRDT namespaces. Determines how conflicts are resolved
/// when two nodes write the same key concurrently.
#[derive(Debug, Clone, PartialEq, Eq)]
#[expect(clippy::enum_variant_names)]
pub enum MergeStrategy {
/// Higher (version, replica_id) wins. Used for worker:*, policy:*, config:*.
LastWriterWins,
/// Higher numeric value wins (simple max). Reserved for future use.
MaxValueWins,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
/// Compare epochs first, then max within same epoch.
/// Values MUST be exactly 16 bytes: epoch (u64 big-endian) + count (i64 big-endian).
EpochMaxWins,
}
2 changes: 2 additions & 0 deletions crates/mesh/src/crdt_kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
mod crdt;
mod epoch_max_wins;
mod kv_store;
mod merge_strategy;
mod operation;
mod replica;

// Export core types
pub use crdt::CrdtOrMap;
pub use epoch_max_wins::{decode, encode, merge, EpochCount, EPOCH_MAX_WINS_ENCODED_LEN};
pub use merge_strategy::MergeStrategy;
pub use operation::{Operation, OperationLog};
pub use replica::ReplicaId;

Expand Down
66 changes: 58 additions & 8 deletions crates/mesh/src/crdt_kv/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use super::replica::ReplicaId;
use super::{epoch_max_wins, merge_strategy::MergeStrategy, replica::ReplicaId};

// ============================================================================
// Operation Type Definition - Atomic Unit of State Change
Expand Down Expand Up @@ -112,9 +112,16 @@ impl OperationLog {
/// new appends, not on every append. If compaction doesn't reduce below
/// threshold (very high key cardinality), the oldest entries are truncated.
pub fn append(&mut self, operation: Operation) {
self.append_with_strategy(operation, |_| MergeStrategy::LastWriterWins);
}

pub(super) fn append_with_strategy<F>(&mut self, operation: Operation, strategy_for_key: F)
where
F: Fn(&str) -> MergeStrategy,
{
self.operations.push(operation);
if self.operations.len() > Self::AUTO_COMPACT_THRESHOLD {
self.compact();
self.compact_with_strategy(strategy_for_key);
// If still over threshold after dedup (extremely high key cardinality
// >10K unique keys), truncate oldest entries. This drops state for the
// oldest keys, which will be re-synced from peers on the next merge.
Expand Down Expand Up @@ -159,16 +166,51 @@ impl OperationLog {
self.operations.is_empty()
}

fn latest_operations_by_key(&self) -> HashMap<String, Operation> {
fn candidate_wins(current: &Operation, candidate: &Operation, strategy: MergeStrategy) -> bool {
match strategy {
MergeStrategy::EpochMaxWins => {
if let (
Operation::Insert {
value: current_value,
..
},
Operation::Insert {
value: candidate_value,
..
},
) = (current, candidate)
{
let merged = epoch_max_wins::merge(current_value, candidate_value);
if merged == *candidate_value && merged != *current_value {
return true;
}
if merged == *current_value && merged != *candidate_value {
return false;
}
Comment thread
CatherineSue marked this conversation as resolved.
Outdated
}
}
MergeStrategy::LastWriterWins | MergeStrategy::MaxValueWins => {}
}

(candidate.timestamp(), candidate.replica_id())
> (current.timestamp(), current.replica_id())
}

fn latest_operations_by_key_with_strategy<F>(
&self,
strategy_for_key: F,
) -> HashMap<String, Operation>
where
F: Fn(&str) -> MergeStrategy,
{
let mut latest_by_key: HashMap<String, Operation> = HashMap::new();

for operation in &self.operations {
let key = operation.key().to_string();
match latest_by_key.get(&key) {
Some(current)
if (current.timestamp(), current.replica_id())
>= (operation.timestamp(), operation.replica_id()) => {}
_ => {
if !Self::candidate_wins(current, operation, strategy_for_key(&key)) => {}
Some(_) | None => {
latest_by_key.insert(key, operation.clone());
}
}
Expand All @@ -185,8 +227,15 @@ impl OperationLog {
/// `apply_operation` and `operation_id` guards keep state semantics safe.
/// Stronger concurrency retention would require vector-clock/version-vector metadata.
pub fn compact(&mut self) {
self.compact_with_strategy(|_| MergeStrategy::LastWriterWins);
}

pub(super) fn compact_with_strategy<F>(&mut self, strategy_for_key: F)
where
F: Fn(&str) -> MergeStrategy,
{
self.operations = self
.latest_operations_by_key()
.latest_operations_by_key_with_strategy(strategy_for_key)
.into_values()
.collect::<Vec<_>>();
self.operations
Expand All @@ -201,7 +250,8 @@ impl OperationLog {

/// Build a latest-state snapshot and clear the operation log.
pub fn snapshot_and_truncate(&mut self) -> HashMap<String, Operation> {
let snapshot = self.latest_operations_by_key();
let snapshot =
self.latest_operations_by_key_with_strategy(|_| MergeStrategy::LastWriterWins);
Comment thread
CatherineSue marked this conversation as resolved.
Outdated
self.operations.clear();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
snapshot
}
Expand Down
Loading
Loading