Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 168 additions & 51 deletions crates/mesh/src/crdt_kv/crdt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cmp::Reverse,
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
Expand All @@ -9,7 +10,9 @@ use parking_lot::{Mutex, RwLock};
use tracing::{debug, info};

use super::{
epoch_max_wins,
kv_store::KvStore,
merge_strategy::MergeStrategy,
operation::{Operation, OperationLog},
replica::{LamportClock, ReplicaId},
};
Expand Down Expand Up @@ -83,6 +86,7 @@ pub struct CrdtOrMap {
store: KvStore,
metadata: Arc<DashMap<String, Vec<ValueMetadata>>>, // Key to list of versions
key_locks: Arc<DashMap<String, Arc<Mutex<()>>>>, // Per-key critical section lock
merge_strategies: Arc<RwLock<Vec<(String, MergeStrategy)>>>,
replica_id: ReplicaId,
clock: LamportClock,
operation_log: Arc<RwLock<OperationLog>>,
Expand All @@ -101,12 +105,56 @@ impl CrdtOrMap {
store: KvStore::new(),
metadata: Arc::new(DashMap::new()),
key_locks: Arc::new(DashMap::new()),
merge_strategies: Arc::new(RwLock::new(Vec::new())),
replica_id,
clock: LamportClock::new(),
operation_log: Arc::new(RwLock::new(OperationLog::new())),
}
}

/// Register the merge strategy for a key prefix.
pub(crate) fn register_merge_strategy(&self, prefix: String, strategy: MergeStrategy) {
let mut strategies = self.merge_strategies.write();
if let Some((_, existing)) = strategies
.iter_mut()
.find(|(registered_prefix, _)| registered_prefix == &prefix)
{
*existing = strategy;
} else {
strategies.push((prefix, strategy));
}
strategies.sort_by_key(|(prefix, _)| Reverse(prefix.len()));
}

fn merge_strategy_for_key(&self, key: &str) -> MergeStrategy {
let strategies = self.merge_strategies.read();
Self::merge_strategy_for_key_from(&strategies, key)
}

fn merge_strategy_for_key_from(
strategies: &[(String, MergeStrategy)],
key: &str,
) -> MergeStrategy {
strategies
.iter()
.find_map(|(prefix, strategy)| key.starts_with(prefix).then(|| strategy.clone()))
.unwrap_or(MergeStrategy::LastWriterWins)
}

fn compact_operation_log(&self, operation_log: &mut OperationLog) {
let strategies = self.merge_strategies.read().clone();
operation_log
.compact_with_strategy(|key| Self::merge_strategy_for_key_from(&strategies, key));
}
Comment thread
CatherineSue marked this conversation as resolved.

fn append_operation(&self, operation: Operation) {
let mut operation_log = self.operation_log.write();
let strategies = self.merge_strategies.read().clone();
operation_log.append_with_strategy(operation, |key| {
Self::merge_strategy_for_key_from(&strategies, key)
});
}
Comment thread
CatherineSue marked this conversation as resolved.

fn key_lock_for(&self, key: &str) -> Arc<Mutex<()>> {
self.key_locks
.entry(key.to_string())
Expand Down Expand Up @@ -140,25 +188,18 @@ impl CrdtOrMap {
let key_lock = self.key_lock_for(&key);
let key_guard = key_lock.lock();

let previous = self.store.get(&key);
let timestamp = self.clock.tick();
let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let mut prev = None;
let value_for_operation = value.clone();
let _ = self.store.upsert(key.clone(), |current| {
prev = current.map(|bytes| bytes.to_vec());
value
});

let operation =
Operation::insert(key.clone(), value_for_operation, timestamp, self.replica_id);
self.operation_log.write().append(operation);
let operation = Operation::insert(key.clone(), value, timestamp, self.replica_id);
let result = if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);

debug!(
"Insert: key={}, timestamp={}, replica={}",
key, timestamp, self.replica_id
);

prev
previous
} else {
self.store.get(&key).map(|bytes| bytes.to_vec())
};
Expand All @@ -179,19 +220,16 @@ impl CrdtOrMap {
let current_value = self.store.get(&key);
let updated_value = updater(current_value.as_deref());
let timestamp = self.clock.tick();
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);

updated_value
let result = if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);
self.store.get(&key).unwrap_or_default()
} else {
self.store.get(&key).unwrap_or_default()
};
Expand Down Expand Up @@ -219,19 +257,16 @@ impl CrdtOrMap {
}
};
let timestamp = self.clock.tick();
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);

updated_value
let result = if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);
self.store.get(&key).unwrap_or_default()
} else {
self.store.get(&key).unwrap_or_default()
};
Expand Down Expand Up @@ -261,16 +296,15 @@ impl CrdtOrMap {

let (result, changed) = if let Some(updated_value) = maybe_updated_value {
let timestamp = self.clock.tick();
if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);
self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);
(updated_value, true)
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);
if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);
(self.store.get(&key).unwrap_or_default(), true)
} else {
(self.store.get(&key).unwrap_or_default(), false)
}
Expand All @@ -297,7 +331,7 @@ impl CrdtOrMap {

let removed = if self.record_remove_metadata(key, timestamp, self.replica_id) {
let operation = Operation::remove(key.to_string(), timestamp, self.replica_id);
self.operation_log.write().append(operation);
self.append_operation(operation);
self.store.remove(key)
} else {
None
Expand Down Expand Up @@ -456,7 +490,7 @@ impl CrdtOrMap {
let unseen_operations: Vec<Operation> = {
let mut local_log = self.operation_log.write();
local_log.merge(log);
local_log.compact();
self.compact_operation_log(&mut local_log);

let mut unseen: Vec<Operation> = local_log
.operations()
Expand Down Expand Up @@ -491,15 +525,47 @@ impl CrdtOrMap {
fn apply_insert(&self, key: &str, value: Vec<u8>, timestamp: u64, replica_id: ReplicaId) {
let key_lock = self.key_lock_for(key);
let key_guard = key_lock.lock();
let operation = Operation::insert(key.to_string(), value, timestamp, replica_id);

if self.record_insert_metadata(key, timestamp, replica_id) {
self.store.insert(key.to_string(), value);
}
self.apply_insert_locked(key, operation);

drop(key_guard);
self.try_cleanup_key_lock(key, &key_lock);
}

fn apply_insert_locked(&self, key: &str, operation: Operation) -> bool {
let Operation::Insert {
value,
timestamp,
replica_id,
..
} = operation
else {
return false;
};

match self.merge_strategy_for_key(key) {
MergeStrategy::EpochMaxWins => {
if let Some(merged) =
self.record_epoch_insert_metadata(key, &value, timestamp, replica_id)
{
self.store.insert(key.to_string(), merged);
true
} else {
false
}
}
MergeStrategy::LastWriterWins => {
if self.record_insert_metadata(key, timestamp, replica_id) {
self.store.insert(key.to_string(), value);
true
} else {
false
}
}
}
}

fn compact_key_metadata(versions: &mut Vec<ValueMetadata>) {
if versions.len() <= 1 {
return;
Expand Down Expand Up @@ -545,6 +611,57 @@ impl CrdtOrMap {
}
}

fn record_epoch_insert_metadata(
&self,
key: &str,
value: &[u8],
timestamp: u64,
replica_id: ReplicaId,
) -> Option<Vec<u8>> {
let new_metadata = ValueMetadata::new(timestamp, replica_id);
let current = self.store.get(key);
let merged = current.as_deref().map_or_else(
|| value.to_vec(),
|local| epoch_max_wins::merge(local, value),
);
let candidate_wins_value = current.as_deref() != Some(merged.as_slice()) || merged == value;
Comment thread
CatherineSue marked this conversation as resolved.
Outdated

match self.metadata.entry(key.to_string()) {
MapEntry::Occupied(mut entry) => {
let versions = entry.get_mut();

let has_existing_entry = versions
.iter()
.any(|v| v.matches_version(timestamp, replica_id));
if has_existing_entry {
Self::compact_key_metadata(versions);
return None;
}

let has_newer_tombstone = versions
.iter()
.any(|v| v.is_tombstone && v.is_newer_than(timestamp, replica_id));
if has_newer_tombstone {
Self::compact_key_metadata(versions);
return None;
}

if current.is_some() && !candidate_wins_value {
Self::compact_key_metadata(versions);
return None;
}

versions.clear();
versions.push(new_metadata);
Some(merged)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
MapEntry::Vacant(entry) => {
entry.insert(vec![new_metadata]);
Some(merged)
}
}
}

/// Apply remove
fn apply_remove(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> Option<Vec<u8>> {
let key_lock = self.key_lock_for(key);
Expand Down
46 changes: 31 additions & 15 deletions crates/mesh/src/crdt_kv/epoch_max_wins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ pub struct EpochCount {
pub count: i64,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum ValueWinner {
Local,
Remote,
Equal,
}

/// Encode `(epoch, count)` to the 16-byte big-endian wire format.
#[must_use]
pub fn encode(epoch: u64, count: i64) -> [u8; EPOCH_MAX_WINS_ENCODED_LEN] {
Expand All @@ -52,6 +59,27 @@ pub fn decode(bytes: &[u8]) -> Option<EpochCount> {
Some(EpochCount { epoch, count })
}

/// Compare two rate-limit values without allocating a merged buffer.
///
/// Malformed handling mirrors [`merge`]: a well-formed value wins over a
/// malformed value, and two malformed values keep local.
#[must_use]
pub(super) fn winner(local: &[u8], remote: &[u8]) -> ValueWinner {
match (decode(local), decode(remote)) {
(Some(l), Some(r)) => match l.epoch.cmp(&r.epoch) {
Ordering::Greater => ValueWinner::Local,
Ordering::Less => ValueWinner::Remote,
Ordering::Equal => match l.count.cmp(&r.count) {
Ordering::Greater => ValueWinner::Local,
Ordering::Less => ValueWinner::Remote,
Ordering::Equal => ValueWinner::Equal,
},
},
(Some(_), None) | (None, None) => ValueWinner::Local,
(None, Some(_)) => ValueWinner::Remote,
}
}

/// Merge two rate-limit values per the epoch-max-wins rule.
///
/// Both decode: higher epoch wins; on equal epochs, max count wins.
Expand All @@ -61,21 +89,9 @@ pub fn decode(bytes: &[u8]) -> Option<EpochCount> {
/// Returned `Vec<u8>` so the caller can write it straight back.
#[must_use]
pub fn merge(local: &[u8], remote: &[u8]) -> Vec<u8> {
match (decode(local), decode(remote)) {
(Some(l), Some(r)) => {
let winner = match l.epoch.cmp(&r.epoch) {
Ordering::Greater => l,
Ordering::Less => r,
Ordering::Equal => EpochCount {
epoch: l.epoch,
count: l.count.max(r.count),
},
};
encode(winner.epoch, winner.count).to_vec()
}
(Some(_), None) => local.to_vec(),
(None, Some(_)) => remote.to_vec(),
(None, None) => local.to_vec(),
match winner(local, remote) {
ValueWinner::Local | ValueWinner::Equal => local.to_vec(),
ValueWinner::Remote => remote.to_vec(),
}
}

Expand Down
Loading
Loading