Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 218 additions & 51 deletions crates/mesh/src/crdt_kv/crdt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cmp::Reverse,
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
Expand All @@ -9,7 +10,9 @@ use parking_lot::{Mutex, RwLock};
use tracing::{debug, info};

use super::{
epoch_max_wins,
kv_store::KvStore,
merge_strategy::MergeStrategy,
operation::{Operation, OperationLog},
replica::{LamportClock, ReplicaId},
};
Expand Down Expand Up @@ -83,6 +86,7 @@ pub struct CrdtOrMap {
store: KvStore,
metadata: Arc<DashMap<String, Vec<ValueMetadata>>>, // Key to list of versions
key_locks: Arc<DashMap<String, Arc<Mutex<()>>>>, // Per-key critical section lock
merge_strategies: Arc<RwLock<Vec<(String, MergeStrategy)>>>,
replica_id: ReplicaId,
clock: LamportClock,
operation_log: Arc<RwLock<OperationLog>>,
Expand All @@ -101,12 +105,56 @@ impl CrdtOrMap {
store: KvStore::new(),
metadata: Arc::new(DashMap::new()),
key_locks: Arc::new(DashMap::new()),
merge_strategies: Arc::new(RwLock::new(Vec::new())),
replica_id,
clock: LamportClock::new(),
operation_log: Arc::new(RwLock::new(OperationLog::new())),
}
}

/// Register the merge strategy for a key prefix.
pub(crate) fn register_merge_strategy(&self, prefix: String, strategy: MergeStrategy) {
let mut strategies = self.merge_strategies.write();
if let Some((_, existing)) = strategies
.iter_mut()
.find(|(registered_prefix, _)| registered_prefix == &prefix)
{
*existing = strategy;
} else {
strategies.push((prefix, strategy));
}
strategies.sort_by_key(|(prefix, _)| Reverse(prefix.len()));
}

fn merge_strategy_for_key(&self, key: &str) -> MergeStrategy {
let strategies = self.merge_strategies.read();
Self::merge_strategy_for_key_from(&strategies, key)
}

fn merge_strategy_for_key_from(
strategies: &[(String, MergeStrategy)],
key: &str,
) -> MergeStrategy {
strategies
.iter()
.find_map(|(prefix, strategy)| key.starts_with(prefix).then(|| strategy.clone()))
.unwrap_or(MergeStrategy::LastWriterWins)
}

fn compact_operation_log(&self, operation_log: &mut OperationLog) {
let strategies = self.merge_strategies.read().clone();
operation_log
.compact_with_strategy(|key| Self::merge_strategy_for_key_from(&strategies, key));
}
Comment thread
CatherineSue marked this conversation as resolved.

fn append_operation(&self, operation: Operation) {
let mut operation_log = self.operation_log.write();
let strategies = self.merge_strategies.read().clone();
operation_log.append_with_strategy(operation, |key| {
Self::merge_strategy_for_key_from(&strategies, key)
});
}
Comment thread
CatherineSue marked this conversation as resolved.

fn key_lock_for(&self, key: &str) -> Arc<Mutex<()>> {
self.key_locks
.entry(key.to_string())
Expand Down Expand Up @@ -140,25 +188,18 @@ impl CrdtOrMap {
let key_lock = self.key_lock_for(&key);
let key_guard = key_lock.lock();

let previous = self.store.get(&key);
let timestamp = self.clock.tick();
let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let mut prev = None;
let value_for_operation = value.clone();
let _ = self.store.upsert(key.clone(), |current| {
prev = current.map(|bytes| bytes.to_vec());
value
});

let operation =
Operation::insert(key.clone(), value_for_operation, timestamp, self.replica_id);
self.operation_log.write().append(operation);
let operation = Operation::insert(key.clone(), value, timestamp, self.replica_id);
let result = if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);

debug!(
"Insert: key={}, timestamp={}, replica={}",
key, timestamp, self.replica_id
);

prev
previous
} else {
self.store.get(&key).map(|bytes| bytes.to_vec())
};
Expand All @@ -179,19 +220,16 @@ impl CrdtOrMap {
let current_value = self.store.get(&key);
let updated_value = updater(current_value.as_deref());
let timestamp = self.clock.tick();
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);

updated_value
let result = if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);
self.store.get(&key).unwrap_or_default()
} else {
self.store.get(&key).unwrap_or_default()
};
Expand Down Expand Up @@ -219,19 +257,16 @@ impl CrdtOrMap {
}
};
let timestamp = self.clock.tick();
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

let result = if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);

self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);

updated_value
let result = if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);
self.store.get(&key).unwrap_or_default()
} else {
self.store.get(&key).unwrap_or_default()
};
Expand Down Expand Up @@ -261,16 +296,15 @@ impl CrdtOrMap {

let (result, changed) = if let Some(updated_value) = maybe_updated_value {
let timestamp = self.clock.tick();
if self.record_insert_metadata(&key, timestamp, self.replica_id) {
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);
self.store.insert(key.clone(), updated_value.clone());
self.operation_log.write().append(operation);
(updated_value, true)
let operation = Operation::insert(
key.clone(),
updated_value.clone(),
timestamp,
self.replica_id,
);
if self.apply_insert_locked(&key, operation.clone()) {
self.append_operation(operation);
(self.store.get(&key).unwrap_or_default(), true)
} else {
(self.store.get(&key).unwrap_or_default(), false)
}
Expand All @@ -297,7 +331,7 @@ impl CrdtOrMap {

let removed = if self.record_remove_metadata(key, timestamp, self.replica_id) {
let operation = Operation::remove(key.to_string(), timestamp, self.replica_id);
self.operation_log.write().append(operation);
self.append_operation(operation);
self.store.remove(key)
} else {
None
Expand Down Expand Up @@ -456,7 +490,7 @@ impl CrdtOrMap {
let unseen_operations: Vec<Operation> = {
let mut local_log = self.operation_log.write();
local_log.merge(log);
local_log.compact();
self.compact_operation_log(&mut local_log);

let mut unseen: Vec<Operation> = local_log
.operations()
Expand Down Expand Up @@ -491,15 +525,47 @@ impl CrdtOrMap {
fn apply_insert(&self, key: &str, value: Vec<u8>, timestamp: u64, replica_id: ReplicaId) {
let key_lock = self.key_lock_for(key);
let key_guard = key_lock.lock();
let operation = Operation::insert(key.to_string(), value, timestamp, replica_id);

if self.record_insert_metadata(key, timestamp, replica_id) {
self.store.insert(key.to_string(), value);
}
self.apply_insert_locked(key, operation);

drop(key_guard);
self.try_cleanup_key_lock(key, &key_lock);
}

fn apply_insert_locked(&self, key: &str, operation: Operation) -> bool {
let Operation::Insert {
value,
timestamp,
replica_id,
..
} = operation
else {
return false;
};

match self.merge_strategy_for_key(key) {
MergeStrategy::EpochMaxWins => {
if let Some(merged) =
self.record_epoch_insert_metadata(key, &value, timestamp, replica_id)
{
self.store.insert(key.to_string(), merged);
true
} else {
false
}
}
MergeStrategy::LastWriterWins => {
if self.record_insert_metadata(key, timestamp, replica_id) {
self.store.insert(key.to_string(), value);
true
} else {
false
}
}
}
}

fn compact_key_metadata(versions: &mut Vec<ValueMetadata>) {
if versions.len() <= 1 {
return;
Expand Down Expand Up @@ -545,6 +611,65 @@ impl CrdtOrMap {
}
}

fn record_epoch_insert_metadata(
&self,
key: &str,
value: &[u8],
timestamp: u64,
replica_id: ReplicaId,
) -> Option<Vec<u8>> {
let new_metadata = ValueMetadata::new(timestamp, replica_id);
let current = self.store.get(key);
let merged = current.as_deref().map_or_else(
|| value.to_vec(),
|local| epoch_max_wins::merge(local, value),
);
let value_changed = current.as_deref() != Some(merged.as_slice());

match self.metadata.entry(key.to_string()) {
MapEntry::Occupied(mut entry) => {
let versions = entry.get_mut();

let has_existing_entry = versions
.iter()
.any(|v| v.matches_version(timestamp, replica_id));
if has_existing_entry {
Self::compact_key_metadata(versions);
return None;
}

let has_newer_tombstone = versions
.iter()
.any(|v| v.is_tombstone && v.is_newer_than(timestamp, replica_id));
if has_newer_tombstone {
Self::compact_key_metadata(versions);
return None;
}

if current.is_some() && !value_changed {
if versions
.iter()
.any(|v| v.is_newer_than(timestamp, replica_id))
{
Self::compact_key_metadata(versions);
return None;
}
versions.push(new_metadata);
Self::compact_key_metadata(versions);
return Some(merged);
}

versions.clear();
versions.push(new_metadata);
Some(merged)
}
MapEntry::Vacant(entry) => {
entry.insert(vec![new_metadata]);
Some(merged)
}
}
}

/// Apply remove
fn apply_remove(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> Option<Vec<u8>> {
let key_lock = self.key_lock_for(key);
Expand Down Expand Up @@ -604,3 +729,45 @@ impl Default for CrdtOrMap {
Self::new()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn epoch_equal_value_insert_does_not_rewind_metadata() {
let replica = CrdtOrMap::new();
replica.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins);

let key = "rl:global:node-a";
let newer_insert_replica = ReplicaId::new();
let older_insert_replica = ReplicaId::new();
let tombstone_replica = ReplicaId::new();

assert!(replica.apply_insert_locked(
key,
Operation::insert(
key.to_string(),
epoch_max_wins::encode(6, 0).to_vec(),
100,
newer_insert_replica
),
));
assert!(!replica.apply_insert_locked(
key,
Operation::insert(
key.to_string(),
epoch_max_wins::encode(6, 0).to_vec(),
10,
older_insert_replica
),
));

assert_eq!(replica.apply_remove(key, 50, tombstone_replica), None);
assert_eq!(
replica.get(key),
Some(epoch_max_wins::encode(6, 0).to_vec()),
"older equal-value insert must not let an intermediate tombstone delete the newer live value",
);
}
}
Loading
Loading