diff --git a/src/RocksDb.Extensions/IMergeAccessor.cs b/src/RocksDb.Extensions/IMergeAccessor.cs new file mode 100644 index 0000000..9c5605e --- /dev/null +++ b/src/RocksDb.Extensions/IMergeAccessor.cs @@ -0,0 +1,17 @@ +using System.ComponentModel; + +namespace RocksDb.Extensions; + +#pragma warning disable CS1591 + +/// +/// This interface is not intended to be used directly by the clients of the library. +/// It provides merge operation support with a separate operand type. +/// +[EditorBrowsable(EditorBrowsableState.Never)] +public interface IMergeAccessor : IRocksDbAccessor +{ + void Merge(TKey key, TOperand operand); +} + +#pragma warning restore CS1591 diff --git a/src/RocksDb.Extensions/IMergeOperator.cs b/src/RocksDb.Extensions/IMergeOperator.cs new file mode 100644 index 0000000..014de46 --- /dev/null +++ b/src/RocksDb.Extensions/IMergeOperator.cs @@ -0,0 +1,47 @@ +namespace RocksDb.Extensions; + +/// +/// Defines a merge operator for RocksDB that enables atomic read-modify-write operations. +/// Merge operators allow efficient updates without requiring a separate read before write, +/// which is particularly useful for counters, list appends, set unions, and other accumulative operations. +/// +/// The type of the value stored in the database. +/// The type of the merge operand (the delta/change to apply). +/// +/// The separation of and allows for flexible merge patterns: +/// +/// For counters: TValue=long, TOperand=long (same type) +/// For list append: TValue=IList<T>, TOperand=IList<T> (same type) +/// For list with add/remove: TValue=IList<T>, TOperand=CollectionOperation<T> (different types) +/// +/// +public interface IMergeOperator +{ + /// + /// Gets the name of the merge operator. This name is stored in the database + /// and must remain consistent across database opens. + /// + string Name { get; } + + /// + /// Performs a full merge of the existing value with one or more operands. + /// Called when a Get operation encounters merge operands and needs to produce the final value. + /// + /// The existing value in the database. For value types, this will be default if no value exists. + /// The span of merge operands to apply, in order. + /// The merged value to store. + TValue FullMerge(TValue? existingValue, ReadOnlySpan operands); + + /// + /// Performs a partial merge of multiple operands without the existing value. + /// Called during compaction to combine multiple merge operands into a single operand. + /// This is an optimization that reduces the number of operands that need to be stored. + /// + /// The span of merge operands to combine, in order. + /// The combined operand, or null if partial merge is not safe for these operands. + /// + /// Return null when it's not safe to combine operands without knowing the existing value. + /// When null is returned, RocksDB will keep the operands separate and call FullMerge later. + /// + TOperand? PartialMerge(ReadOnlySpan operands); +} diff --git a/src/RocksDb.Extensions/IRocksDbBuilder.cs b/src/RocksDb.Extensions/IRocksDbBuilder.cs index 7d1124c..2edf242 100644 --- a/src/RocksDb.Extensions/IRocksDbBuilder.cs +++ b/src/RocksDb.Extensions/IRocksDbBuilder.cs @@ -22,4 +22,37 @@ public interface IRocksDbBuilder /// Use GetRequiredKeyedService<TStore>(columnFamily) to retrieve a specific store instance. /// IRocksDbBuilder AddStore(string columnFamily) where TStore : RocksDbStore; + + /// + /// Adds a mergeable RocksDB store to the builder for the specified column family. + /// This method enforces that the store inherits from + /// and requires a merge operator, providing compile-time safety for merge operations. + /// + /// The name of the column family to associate with the store. + /// The merge operator to use for atomic read-modify-write operations. + /// The type of the store's key. + /// The type of the store's values (the stored state). + /// The type of the merge operand (the delta/change to apply). + /// The type of the store to add. Must inherit from . + /// The builder instance for method chaining. + /// Thrown if the specified column family is already registered. + /// + /// + /// Use this method when your store needs merge operations. The constraint + /// ensures that only stores designed for merge operations can be registered with this method. + /// + /// + /// The separation of and allows for flexible patterns: + /// + /// Counters: TValue=long, TOperand=long (same type) + /// List append: TValue=IList<T>, TOperand=IList<T> (same type) + /// List with add/remove: TValue=IList<T>, TOperand=CollectionOperation<T> (different types) + /// + /// + /// + /// For stores that don't need merge operations, use instead. + /// + /// + IRocksDbBuilder AddMergeableStore(string columnFamily, IMergeOperator mergeOperator) + where TStore : MergeableRocksDbStore; } \ No newline at end of file diff --git a/src/RocksDb.Extensions/ListOperationSerializer.cs b/src/RocksDb.Extensions/ListOperationSerializer.cs new file mode 100644 index 0000000..1a7953f --- /dev/null +++ b/src/RocksDb.Extensions/ListOperationSerializer.cs @@ -0,0 +1,84 @@ +using System.Buffers; +using RocksDb.Extensions.MergeOperators; + +namespace RocksDb.Extensions; + +/// +/// Serializes CollectionOperation<T> which contains an operation type (Add/Remove) and a list of items. +/// +/// +/// +/// The serialized format consists of: +/// - 1 byte: Operation type (0 = Add, 1 = Remove) +/// - Remaining bytes: Serialized list using FixedSizeListSerializer (for primitives) or VariableSizeListSerializer (for complex types) +/// +/// +/// Space efficiency optimization: +/// - For primitive types (int, long, bool, etc.), uses FixedSizeListSerializer which stores: +/// - 4 bytes: list count +/// - N * elementSize bytes: all elements (no per-element size prefix) +/// Example: List<int> with 3 elements = 4 + (3 * 4) = 16 bytes +/// +/// +/// - For non-primitive types (strings, objects, protobuf messages), uses VariableSizeListSerializer which stores: +/// - 4 bytes: list count +/// - For each element: 4 bytes size prefix + element data +/// Example: List<string> with ["ab", "cde"] = 4 + (4+2) + (4+3) = 17 bytes +/// +/// +internal class ListOperationSerializer : ISerializer> +{ + private readonly ISerializer> _listSerializer; + + public ListOperationSerializer(ISerializer itemSerializer) + { + // Use FixedSizeListSerializer for primitive types to avoid storing size for each element + // Use VariableSizeListSerializer for non-primitive types where elements may vary in size + _listSerializer = typeof(T).IsPrimitive + ? new FixedSizeListSerializer(itemSerializer) + : new VariableSizeListSerializer(itemSerializer); + } + + public bool TryCalculateSize(ref CollectionOperation value, out int size) + { + // 1 byte for operation type + size of the list + size = sizeof(byte); + + var items = value.Items; + if (_listSerializer.TryCalculateSize(ref items, out var listSize)) + { + size += listSize; + return true; + } + + return false; + } + + public void WriteTo(ref CollectionOperation value, ref Span span) + { + // Write operation type (1 byte) + span[0] = (byte)value.Type; + + // Write the list using the list serializer + var listSpan = span.Slice(sizeof(byte)); + var items = value.Items; + _listSerializer.WriteTo(ref items, ref listSpan); + } + + public void WriteTo(ref CollectionOperation value, IBufferWriter buffer) + { + throw new NotImplementedException(); + } + + public CollectionOperation Deserialize(ReadOnlySpan buffer) + { + // Read operation type + var operationType = (OperationType)buffer[0]; + + // Read the list using the list serializer + var listBuffer = buffer.Slice(sizeof(byte)); + var items = _listSerializer.Deserialize(listBuffer); + + return new CollectionOperation(operationType, items); + } +} diff --git a/src/RocksDb.Extensions/MergeAccessor.cs b/src/RocksDb.Extensions/MergeAccessor.cs new file mode 100644 index 0000000..6825a2e --- /dev/null +++ b/src/RocksDb.Extensions/MergeAccessor.cs @@ -0,0 +1,88 @@ +using System.Buffers; +using CommunityToolkit.HighPerformance.Buffers; + +namespace RocksDb.Extensions; + +internal class MergeAccessor : RocksDbAccessor, IMergeAccessor +{ + private readonly ISerializer _operandSerializer; + + public MergeAccessor( + RocksDbContext db, + ColumnFamily columnFamily, + ISerializer keySerializer, + ISerializer valueSerializer, + ISerializer operandSerializer) : base(db, columnFamily, keySerializer, valueSerializer) + { + _operandSerializer = operandSerializer; + } + + public void Merge(TKey key, TOperand operand) + { + byte[]? rentedKeyBuffer = null; + bool useSpanAsKey; + // ReSharper disable once AssignmentInConditionalExpression + Span keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize)) + ? keySize < MaxStackSize + ? stackalloc byte[keySize] + : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) + : Span.Empty; + + ReadOnlySpan keySpan = keyBuffer; + ArrayPoolBufferWriter? keyBufferWriter = null; + + byte[]? rentedOperandBuffer = null; + bool useSpanAsOperand; + // ReSharper disable once AssignmentInConditionalExpression + Span operandBuffer = (useSpanAsOperand = _operandSerializer.TryCalculateSize(ref operand, out var operandSize)) + ? operandSize < MaxStackSize + ? stackalloc byte[operandSize] + : (rentedOperandBuffer = ArrayPool.Shared.Rent(operandSize)).AsSpan(0, operandSize) + : Span.Empty; + + + ReadOnlySpan operandSpan = operandBuffer; + ArrayPoolBufferWriter? operandBufferWriter = null; + + try + { + if (useSpanAsKey) + { + KeySerializer.WriteTo(ref key, ref keyBuffer); + } + else + { + keyBufferWriter = new ArrayPoolBufferWriter(); + KeySerializer.WriteTo(ref key, keyBufferWriter); + keySpan = keyBufferWriter.WrittenSpan; + } + + if (useSpanAsOperand) + { + _operandSerializer.WriteTo(ref operand, ref operandBuffer); + } + else + { + operandBufferWriter = new ArrayPoolBufferWriter(); + _operandSerializer.WriteTo(ref operand, operandBufferWriter); + operandSpan = operandBufferWriter.WrittenSpan; + } + + RocksDbContext.Db.Merge(keySpan, operandSpan, ColumnFamily.Handle, RocksDbContext.WriteOptions); + } + finally + { + keyBufferWriter?.Dispose(); + operandBufferWriter?.Dispose(); + if (rentedKeyBuffer is not null) + { + ArrayPool.Shared.Return(rentedKeyBuffer); + } + + if (rentedOperandBuffer is not null) + { + ArrayPool.Shared.Return(rentedOperandBuffer); + } + } + } +} diff --git a/src/RocksDb.Extensions/MergeOperatorConfig.cs b/src/RocksDb.Extensions/MergeOperatorConfig.cs new file mode 100644 index 0000000..c0bfd71 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperatorConfig.cs @@ -0,0 +1,134 @@ +using System.Buffers; +using System.Runtime.CompilerServices; +using CommunityToolkit.HighPerformance.Buffers; + +namespace RocksDb.Extensions; + +/// +/// Internal configuration for a merge operator associated with a column family. +/// +internal class MergeOperatorConfig +{ + /// + /// Gets the name of the merge operator. + /// + public string Name { get; set; } = null!; + + /// + /// Gets the full merge callback delegate. + /// + public global::RocksDbSharp.MergeOperators.FullMergeFunc FullMerge { get; set; } = null!; + + /// + /// Gets the partial merge callback delegate. + /// + public global::RocksDbSharp.MergeOperators.PartialMergeFunc PartialMerge { get; set; } = null!; + + internal static MergeOperatorConfig CreateMergeOperatorConfig( + IMergeOperator mergeOperator, + ISerializer valueSerializer, + ISerializer operandSerializer) + { + return new MergeOperatorConfig + { + Name = mergeOperator.Name, + FullMerge = (ReadOnlySpan _, bool hasExistingValue, ReadOnlySpan existingValue, global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, out bool success) => + { + return FullMergeCallback(hasExistingValue, existingValue, operands, mergeOperator, valueSerializer, operandSerializer, out success); + }, + PartialMerge = (ReadOnlySpan _, global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, out bool success) => + { + return PartialMergeCallback(operands, mergeOperator, operandSerializer, out success); + } + }; + } + + private static byte[] FullMergeCallback(bool hasExistingValue, + ReadOnlySpan existingValue, + global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, + IMergeOperator mergeOperator, + ISerializer valueSerializer, + ISerializer operandSerializer, + out bool success) + { + success = true; + + var existing = hasExistingValue ? valueSerializer.Deserialize(existingValue) : default!; + + var operandArray = ArrayPool.Shared.Rent(operands.Count); + try + { + for (int i = 0; i < operands.Count; i++) + { + operandArray[i] = operandSerializer.Deserialize(operands.Get(i)); + } + + var operandSpan = operandArray.AsSpan(0, operands.Count); + var result = mergeOperator.FullMerge(existing, operandSpan); + + return SerializeValue(result, valueSerializer); + } + catch + { + success = false; + return Array.Empty(); + } + finally + { + ArrayPool.Shared.Return(operandArray, clearArray: RuntimeHelpers.IsReferenceOrContainsReferences()); + } + } + + private static byte[] PartialMergeCallback(global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, + IMergeOperator mergeOperator, + ISerializer operandSerializer, + out bool success) + { + var operandArray = ArrayPool.Shared.Rent(operands.Count); + try + { + for (int i = 0; i < operands.Count; i++) + { + operandArray[i] = operandSerializer.Deserialize(operands.Get(i)); + } + + var operandSpan = operandArray.AsSpan(0, operands.Count); + var result = mergeOperator.PartialMerge(operandSpan); + + if (result == null) + { + success = false; + return Array.Empty(); + } + + success = true; + return SerializeValue(result, operandSerializer); + } + catch + { + success = false; + return Array.Empty(); + } + finally + { + ArrayPool.Shared.Return(operandArray, clearArray: RuntimeHelpers.IsReferenceOrContainsReferences()); + } + } + + private static byte[] SerializeValue(T value, ISerializer serializer) + { + if (serializer.TryCalculateSize(ref value, out var size)) + { + var buffer = new byte[size]; + var span = buffer.AsSpan(); + serializer.WriteTo(ref value, ref span); + return buffer; + } + else + { + using var bufferWriter = new ArrayPoolBufferWriter(); + serializer.WriteTo(ref value, bufferWriter); + return bufferWriter.WrittenSpan.ToArray(); + } + } +} diff --git a/src/RocksDb.Extensions/MergeOperators/ListMergeOperator.cs b/src/RocksDb.Extensions/MergeOperators/ListMergeOperator.cs new file mode 100644 index 0000000..06c42d8 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperators/ListMergeOperator.cs @@ -0,0 +1,102 @@ +namespace RocksDb.Extensions.MergeOperators; + +/// +/// A merge operator that supports both adding and removing items from a list. +/// Each merge operand is a CollectionOperation that specifies whether to add or remove items. +/// Operations are applied in order, enabling atomic list modifications without read-before-write. +/// +/// The type of elements in the list. +/// +/// +/// public class TagsStore : MergeableRocksDbStore<string, IList<string>, CollectionOperation<string>> +/// { +/// public TagsStore(IMergeAccessor<string, IList<string>, CollectionOperation<string>> mergeAccessor) +/// : base(mergeAccessor) { } +/// +/// public void AddTags(string key, params string[] tags) => Merge(key, CollectionOperation<string>.Add(tags)); +/// public void RemoveTags(string key, params string[] tags) => Merge(key, CollectionOperation<string>.Remove(tags)); +/// } +/// +/// // Registration: +/// builder.AddMergeableStore<string, IList<string>, CollectionOperation<string>, TagsStore>("tags", new ListMergeOperator<string>()); +/// +/// +/// +/// +/// The value type stored in RocksDB is IList<T> (the actual list contents), +/// while merge operands are CollectionOperation<T> (the operations to apply). +/// +/// +/// Remove operations delete the first occurrence of each item (same as ). +/// If an item to remove doesn't exist in the list, the operation is silently ignored. +/// +/// +public class ListMergeOperator : IMergeOperator, CollectionOperation> +{ + /// + public string Name => $"ListMergeOperator<{typeof(T).Name}>"; + + /// + public IList FullMerge( + IList? existingValue, + ReadOnlySpan> operands) + { + // Start with existing items or empty list + var result = existingValue != null ? new List(existingValue) : new List(); + + // Apply all operands in order + foreach (var operand in operands) + { + ApplyOperation(result, operand); + } + + return result; + } + + /// + public CollectionOperation? PartialMerge(ReadOnlySpan> operands) + { + var allAdds = new List(); + + foreach (var operand in operands) + { + if (operand.Type == OperationType.Remove) + { + // If there are any removes, we can't safely combine without knowing the existing state + // Return null to signal that RocksDB should keep operands separate + return null; + } + } + + foreach (var operand in operands) + { + foreach (var item in operand.Items) + { + allAdds.Add(item); + } + } + + // Only adds present - safe to combine + return CollectionOperation.Add(allAdds); + } + + private static void ApplyOperation(List result, CollectionOperation operation) + { + switch (operation.Type) + { + case OperationType.Add: + foreach (var item in operation.Items) + { + result.Add(item); + } + break; + + case OperationType.Remove: + foreach (var item in operation.Items) + { + result.Remove(item); // Removes first occurrence + } + break; + } + } +} diff --git a/src/RocksDb.Extensions/MergeOperators/ListOperation.cs b/src/RocksDb.Extensions/MergeOperators/ListOperation.cs new file mode 100644 index 0000000..a0c8c34 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperators/ListOperation.cs @@ -0,0 +1,49 @@ +namespace RocksDb.Extensions.MergeOperators; + +/// +/// Represents an operation (add or remove) to apply to a collection via merge. +/// +/// The type of elements in the collection. +public class CollectionOperation +{ + /// + /// Gets the type of operation to perform. + /// + public OperationType Type { get; } + + /// + /// Gets the items to add or remove. + /// + public IList Items { get; } + + /// + /// Creates a new collection operation. + /// + /// The type of operation. + /// The items to add or remove. + public CollectionOperation(OperationType type, IList items) + { + Type = type; + Items = items ?? throw new ArgumentNullException(nameof(items)); + } + + /// + /// Creates an Add operation for the specified items. + /// + public static CollectionOperation Add(params T[] items) => new(OperationType.Add, items); + + /// + /// Creates an Add operation for the specified items. + /// + public static CollectionOperation Add(IList items) => new(OperationType.Add, items); + + /// + /// Creates a Remove operation for the specified items. + /// + public static CollectionOperation Remove(params T[] items) => new(OperationType.Remove, items); + + /// + /// Creates a Remove operation for the specified items. + /// + public static CollectionOperation Remove(IList items) => new(OperationType.Remove, items); +} diff --git a/src/RocksDb.Extensions/MergeOperators/OperationType.cs b/src/RocksDb.Extensions/MergeOperators/OperationType.cs new file mode 100644 index 0000000..d6305f6 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperators/OperationType.cs @@ -0,0 +1,17 @@ +namespace RocksDb.Extensions.MergeOperators; + +/// +/// Specifies the type of operation to perform on a collection. +/// +public enum OperationType +{ + /// + /// Add items to the collection. + /// + Add, + + /// + /// Remove items from the collection. + /// + Remove +} \ No newline at end of file diff --git a/src/RocksDb.Extensions/MergeableRocksDbStore.cs b/src/RocksDb.Extensions/MergeableRocksDbStore.cs new file mode 100644 index 0000000..99d684a --- /dev/null +++ b/src/RocksDb.Extensions/MergeableRocksDbStore.cs @@ -0,0 +1,66 @@ +namespace RocksDb.Extensions; + +/// +/// Base class for a RocksDB store that supports merge operations. +/// Inherit from this class when you need to use RocksDB's merge operator functionality +/// for efficient atomic read-modify-write operations. +/// +/// The type of the store's keys. +/// The type of the store's values. +/// The type of the merge operand. +/// +/// +/// Merge operations are useful for: +/// - Counters: Increment/decrement without reading current value (TValue=long, TOperand=long) +/// - Lists: Append items without reading the entire list (TValue=IList<T>, TOperand=IList<T>) +/// - Lists with add/remove: Modify lists atomically (TValue=IList<T>, TOperand=CollectionOperation<T>) +/// +/// +/// When using this base class, you must register the store with a merge operator using +/// . +/// +/// +/// +/// +/// // Counter store where value and operand are the same type +/// public class CounterStore : MergeableRocksDbStore<string, long, long> +/// { +/// public CounterStore(IMergeAccessor<string, long, long> mergeAccessor) +/// : base(mergeAccessor) { } +/// +/// public void Increment(string key, long delta = 1) => Merge(key, delta); +/// } +/// +/// // Tags store where value is IList<string> but operand is CollectionOperation<string> +/// public class TagsStore : MergeableRocksDbStore<string, IList<string>, CollectionOperation<string>> +/// { +/// public TagsStore(IMergeAccessor<string, IList<string>, CollectionOperation<string>> mergeAccessor) +/// : base(mergeAccessor) { } +/// +/// public void AddTag(string key, string tag) => Merge(key, CollectionOperation<string>.Add(tag)); +/// public void RemoveTag(string key, string tag) => Merge(key, CollectionOperation<string>.Remove(tag)); +/// } +/// +/// +public abstract class MergeableRocksDbStore : RocksDbStoreBase +{ + private readonly IMergeAccessor _mergeAccessor; + + /// + /// Initializes a new instance of the class. + /// + /// The RocksDB accessor to use for database operations. + protected MergeableRocksDbStore(IMergeAccessor rocksDbAccessor) : base(rocksDbAccessor) + { + _mergeAccessor = rocksDbAccessor; + } + + /// + /// Performs an atomic merge operation on the value associated with the specified key. + /// This operation uses RocksDB's merge operator to combine the operand with the existing value + /// without requiring a separate read operation. + /// + /// The key to merge the operand with. + /// The operand to merge with the existing value. + public void Merge(TKey key, TOperand operand) => _mergeAccessor.Merge(key, operand); +} diff --git a/src/RocksDb.Extensions/RocksDb.Extensions.csproj b/src/RocksDb.Extensions/RocksDb.Extensions.csproj index a7e0c85..87ef619 100644 --- a/src/RocksDb.Extensions/RocksDb.Extensions.csproj +++ b/src/RocksDb.Extensions/RocksDb.Extensions.csproj @@ -27,6 +27,6 @@ - + diff --git a/src/RocksDb.Extensions/RocksDbAccessor.cs b/src/RocksDb.Extensions/RocksDbAccessor.cs index 8ad8fe5..dfa0016 100644 --- a/src/RocksDb.Extensions/RocksDbAccessor.cs +++ b/src/RocksDb.Extensions/RocksDbAccessor.cs @@ -7,12 +7,12 @@ namespace RocksDb.Extensions; internal class RocksDbAccessor : IRocksDbAccessor, ISpanDeserializer { - private const int MaxStackSize = 256; + private protected const int MaxStackSize = 256; - private readonly ISerializer _keySerializer; + protected readonly ISerializer KeySerializer; private readonly ISerializer _valueSerializer; - private readonly RocksDbContext _rocksDbContext; - private readonly ColumnFamily _columnFamily; + private protected readonly RocksDbContext RocksDbContext; + private protected readonly ColumnFamily ColumnFamily; private readonly bool _checkIfExists; public RocksDbAccessor(RocksDbContext rocksDbContext, @@ -20,9 +20,9 @@ public RocksDbAccessor(RocksDbContext rocksDbContext, ISerializer keySerializer, ISerializer valueSerializer) { - _rocksDbContext = rocksDbContext; - _columnFamily = columnFamily; - _keySerializer = keySerializer; + RocksDbContext = rocksDbContext; + ColumnFamily = columnFamily; + KeySerializer = keySerializer; _valueSerializer = valueSerializer; _checkIfExists = typeof(TValue).IsValueType; @@ -34,7 +34,7 @@ public void Remove(TKey key) bool useSpan; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpan = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpan = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -47,16 +47,16 @@ public void Remove(TKey key) { if (useSpan) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } - _rocksDbContext.Db.Remove(keySpan, _columnFamily.Handle); + RocksDbContext.Db.Remove(keySpan, ColumnFamily.Handle, RocksDbContext.WriteOptions); } finally { @@ -73,7 +73,7 @@ public void Put(TKey key, TValue value) byte[]? rentedKeyBuffer = null; bool useSpanAsKey; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpanAsKey = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -99,12 +99,12 @@ public void Put(TKey key, TValue value) { if (useSpanAsKey) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } @@ -119,7 +119,7 @@ public void Put(TKey key, TValue value) valueSpan = valueBufferWriter.WrittenSpan; } - _rocksDbContext.Db.Put(keySpan, valueSpan, _columnFamily.Handle); + RocksDbContext.Db.Put(keySpan, valueSpan, ColumnFamily.Handle, RocksDbContext.WriteOptions); } finally { @@ -143,7 +143,7 @@ public bool TryGet(TKey key, [MaybeNullWhen(false)] out TValue value) bool useSpan; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpan = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpan = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -156,22 +156,22 @@ public bool TryGet(TKey key, [MaybeNullWhen(false)] out TValue value) { if (useSpan) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } - if (_checkIfExists && _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle) == false) + if (_checkIfExists && RocksDbContext.Db.HasKey(keySpan, ColumnFamily.Handle) == false) { value = default; return false; } - value = _rocksDbContext.Db.Get(keySpan, this, _columnFamily.Handle); + value = RocksDbContext.Db.Get(keySpan, this, ColumnFamily.Handle); return value != null; } finally @@ -202,7 +202,7 @@ public void PutRange(ReadOnlySpan keys, ReadOnlySpan values) AddToBatch(keys[i], values[i], batch); } - _rocksDbContext.Db.Write(batch); + RocksDbContext.Db.Write(batch, RocksDbContext.WriteOptions); } public void PutRange(ReadOnlySpan values, Func keySelector) @@ -215,7 +215,7 @@ public void PutRange(ReadOnlySpan values, Func keySelector AddToBatch(key, value, batch); } - _rocksDbContext.Db.Write(batch); + RocksDbContext.Db.Write(batch, RocksDbContext.WriteOptions); } public void PutRange(IReadOnlyList<(TKey key, TValue value)> items) @@ -227,7 +227,7 @@ public void PutRange(IReadOnlyList<(TKey key, TValue value)> items) AddToBatch(key, value, batch); } - _rocksDbContext.Db.Write(batch); + RocksDbContext.Db.Write(batch, RocksDbContext.WriteOptions); } private void AddToBatch(TKey key, TValue value, WriteBatch batch) @@ -235,7 +235,7 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) byte[]? rentedKeyBuffer = null; bool useSpanAsKey; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpanAsKey = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -261,12 +261,12 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) { if (useSpanAsKey) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } @@ -280,8 +280,8 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) _valueSerializer.WriteTo(ref value, valueBufferWriter); valueSpan = valueBufferWriter.WrittenSpan; } - - _ = batch.Put(keySpan, valueSpan, _columnFamily.Handle); + + _ = batch.Put(keySpan, valueSpan, ColumnFamily.Handle); } finally { @@ -298,21 +298,21 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) } } } - + public IEnumerable GetAllKeys() { - using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle); + using var iterator = RocksDbContext.Db.NewIterator(ColumnFamily.Handle); _ = iterator.SeekToFirst(); while (iterator.Valid()) { - yield return _keySerializer.Deserialize(iterator.Key()); + yield return KeySerializer.Deserialize(iterator.Key()); _ = iterator.Next(); } } public IEnumerable GetAllValues() { - using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle); + using var iterator = RocksDbContext.Db.NewIterator(ColumnFamily.Handle); _ = iterator.SeekToFirst(); while (iterator.Valid()) { @@ -320,10 +320,10 @@ public IEnumerable GetAllValues() _ = iterator.Next(); } } - + public int Count() { - using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle); + using var iterator = RocksDbContext.Db.NewIterator(ColumnFamily.Handle); _ = iterator.SeekToFirst(); var count = 0; while (iterator.Valid()) @@ -341,7 +341,7 @@ public bool HasKey(TKey key) bool useSpan; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpan = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpan = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -354,16 +354,16 @@ public bool HasKey(TKey key) { if (useSpan) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } - return _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle); + return RocksDbContext.Db.HasKey(keySpan, ColumnFamily.Handle); } finally { @@ -374,13 +374,13 @@ public bool HasKey(TKey key) } } } - + public void Clear() { - var prevColumnFamilyHandle = _columnFamily.Handle; - _rocksDbContext.Db.DropColumnFamily(_columnFamily.Name); - _columnFamily.Handle = _rocksDbContext.Db.CreateColumnFamily(_rocksDbContext.ColumnFamilyOptions, _columnFamily.Name); - + var prevColumnFamilyHandle = ColumnFamily.Handle; + RocksDbContext.Db.DropColumnFamily(ColumnFamily.Name); + ColumnFamily.Handle = RocksDbContext.Db.CreateColumnFamily(RocksDbContext.ColumnFamilyOptions, ColumnFamily.Name); + Native.Instance.rocksdb_column_family_handle_destroy(prevColumnFamilyHandle.Handle); } } diff --git a/src/RocksDb.Extensions/RocksDbBuilder.cs b/src/RocksDb.Extensions/RocksDbBuilder.cs index 4123075..82f1c38 100644 --- a/src/RocksDb.Extensions/RocksDbBuilder.cs +++ b/src/RocksDb.Extensions/RocksDbBuilder.cs @@ -22,7 +22,10 @@ public IRocksDbBuilder AddStore(string columnFamily) where throw new InvalidOperationException($"{columnFamily} is already registered."); } - _ = _serviceCollection.Configure(options => { options.ColumnFamilies.Add(columnFamily); }); + _ = _serviceCollection.Configure(options => + { + options.ColumnFamilies.Add(columnFamily); + }); _serviceCollection.AddKeyedSingleton(columnFamily, (provider, _) => { @@ -45,6 +48,49 @@ public IRocksDbBuilder AddStore(string columnFamily) where return this; } + public IRocksDbBuilder AddMergeableStore(string columnFamily, IMergeOperator mergeOperator) + where TStore : MergeableRocksDbStore + { + if (!_columnFamilyLookup.Add(columnFamily)) + { + throw new InvalidOperationException($"{columnFamily} is already registered."); + } + + _ = _serviceCollection.Configure(options => + { + options.ColumnFamilies.Add(columnFamily); + + var valueSerializer = CreateSerializer(options.SerializerFactories); + var operandSerializer = CreateSerializer(options.SerializerFactories); + var config = MergeOperatorConfig.CreateMergeOperatorConfig(mergeOperator, valueSerializer, operandSerializer); + options.MergeOperators[columnFamily] = config; + }); + + _serviceCollection.AddKeyedSingleton(columnFamily, (provider, _) => + { + var rocksDbContext = provider.GetRequiredService(); + var columnFamilyHandle = rocksDbContext.Db.GetColumnFamily(columnFamily); + var rocksDbOptions = provider.GetRequiredService>(); + var keySerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); + var valueSerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); + var operandSerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); + + var rocksDbAccessor = new MergeAccessor( + rocksDbContext, + new ColumnFamily(columnFamilyHandle, columnFamily), + keySerializer, + valueSerializer, + operandSerializer + ); + + return ActivatorUtilities.CreateInstance(provider, rocksDbAccessor); + }); + + _serviceCollection.TryAddSingleton(typeof(TStore), provider => provider.GetRequiredKeyedService(columnFamily)); + + return this; + } + private static ISerializer CreateSerializer(IReadOnlyList serializerFactories) { var type = typeof(T); @@ -70,11 +116,25 @@ private static ISerializer CreateSerializer(IReadOnlyList) Activator.CreateInstance(typeof(FixedSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer); + return (ISerializer) Activator.CreateInstance(typeof(FixedSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer)!; } // Use variable size list serializer for non-primitive types - return (ISerializer) Activator.CreateInstance(typeof(VariableSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer); + return (ISerializer) Activator.CreateInstance(typeof(VariableSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer)!; + } + + // Handle CollectionOperation for the ListMergeOperator + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(MergeOperators.CollectionOperation<>)) + { + var itemType = type.GetGenericArguments()[0]; + + // Create the item serializer + var itemSerializer = typeof(RocksDbBuilder).GetMethod(nameof(CreateSerializer), BindingFlags.NonPublic | BindingFlags.Static) + ?.MakeGenericMethod(itemType) + .Invoke(null, new object[] { serializerFactories }); + + // Create ListOperationSerializer + return (ISerializer) Activator.CreateInstance(typeof(ListOperationSerializer<>).MakeGenericType(itemType), itemSerializer)!; } throw new InvalidOperationException($"Type {type.FullName} cannot be used as RocksDbStore key/value. " + diff --git a/src/RocksDb.Extensions/RocksDbContext.cs b/src/RocksDb.Extensions/RocksDbContext.cs index 4cc489e..e616582 100644 --- a/src/RocksDb.Extensions/RocksDbContext.cs +++ b/src/RocksDb.Extensions/RocksDbContext.cs @@ -8,6 +8,7 @@ internal class RocksDbContext : IDisposable private readonly RocksDbSharp.RocksDb _rocksDb; private readonly Cache _cache; private readonly ColumnFamilyOptions _userSpecifiedOptions; + private readonly WriteOptions _writeOptions; private const long BlockCacheSize = 50 * 1024 * 1024L; private const long BlockSize = 4096L; @@ -18,22 +19,9 @@ public RocksDbContext(IOptions options) { var dbOptions = new DbOptions(); _userSpecifiedOptions = new ColumnFamilyOptions(); - var tableConfig = new BlockBasedTableOptions(); _cache = Cache.CreateLru(BlockCacheSize); - tableConfig.SetBlockCache(_cache); - tableConfig.SetBlockSize(BlockSize); - - var filter = BloomFilterPolicy.Create(); - tableConfig.SetFilterPolicy(filter); - _userSpecifiedOptions.SetBlockBasedTableFactory(tableConfig); - _userSpecifiedOptions.SetWriteBufferSize(WriteBufferSize); - _userSpecifiedOptions.SetCompression(Compression.No); - _userSpecifiedOptions.SetCompactionStyle(Compaction.Universal); - _userSpecifiedOptions.SetMaxWriteBufferNumberToMaintain(MaxWriteBuffers); - _userSpecifiedOptions.SetCreateIfMissing(); - _userSpecifiedOptions.SetCreateMissingColumnFamilies(); - _userSpecifiedOptions.SetErrorIfExists(false); - _userSpecifiedOptions.SetInfoLogLevel(InfoLogLevel.Error); + + ConfigureColumnFamilyOptions(_userSpecifiedOptions, _cache); // this is the recommended way to increase parallelism in RocksDb // note that the current implementation of setIncreaseParallelism affects the number @@ -48,15 +36,13 @@ public RocksDbContext(IOptions options) dbOptions.SetUseDirectReads(options.Value.UseDirectReads); dbOptions.SetUseDirectIoForFlushAndCompaction(options.Value.UseDirectIoForFlushAndCompaction); - var fOptions = new FlushOptions(); - fOptions.SetWaitForFlush(options.Value.WaitForFlush); - var writeOptions = new WriteOptions(); - writeOptions.DisableWal(1); + _writeOptions = new WriteOptions(); + _writeOptions.DisableWal(1); _userSpecifiedOptions.EnableStatistics(); - var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies, _userSpecifiedOptions); + var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies, options.Value.MergeOperators, _userSpecifiedOptions); if (options.Value.DeleteExistingDatabaseOnStartup) { @@ -72,17 +58,61 @@ private static void DestroyDatabase(string path) Native.Instance.rocksdb_destroy_db(dbOptions.Handle, path); } + private static void ConfigureColumnFamilyOptions(ColumnFamilyOptions cfOptions, Cache cache) + { + var tableConfig = new BlockBasedTableOptions(); + tableConfig.SetBlockCache(cache); + tableConfig.SetBlockSize(BlockSize); + + var filter = BloomFilterPolicy.Create(); + tableConfig.SetFilterPolicy(filter); + + cfOptions.SetBlockBasedTableFactory(tableConfig); + cfOptions.SetWriteBufferSize(WriteBufferSize); + cfOptions.SetCompression(Compression.No); + cfOptions.SetCompactionStyle(Compaction.Universal); + cfOptions.SetMaxWriteBufferNumber(MaxWriteBuffers); + cfOptions.SetCreateIfMissing(); + cfOptions.SetCreateMissingColumnFamilies(); + cfOptions.SetErrorIfExists(false); + cfOptions.SetInfoLogLevel(InfoLogLevel.Error); + cfOptions.EnableStatistics(); + } + public RocksDbSharp.RocksDb Db => _rocksDb; public ColumnFamilyOptions ColumnFamilyOptions => _userSpecifiedOptions; - private static ColumnFamilies CreateColumnFamilies(IReadOnlyList columnFamilyNames, - ColumnFamilyOptions columnFamilyOptions) + public WriteOptions WriteOptions => _writeOptions; + + + private ColumnFamilies CreateColumnFamilies( + IReadOnlyList columnFamilyNames, + IReadOnlyDictionary mergeOperators, + ColumnFamilyOptions defaultColumnFamilyOptions) { - var columnFamilies = new ColumnFamilies(columnFamilyOptions); + var columnFamilies = new ColumnFamilies(defaultColumnFamilyOptions); foreach (var columnFamilyName in columnFamilyNames) { - columnFamilies.Add(columnFamilyName, columnFamilyOptions); + if (mergeOperators.TryGetValue(columnFamilyName, out var mergeOperatorConfig)) + { + // Create a copy of the default options for this column family + var cfOptions = new ColumnFamilyOptions(); + ConfigureColumnFamilyOptions(cfOptions, _cache); + + // Create and set the merge operator + var mergeOp = global::RocksDbSharp.MergeOperators.Create( + mergeOperatorConfig.Name, + mergeOperatorConfig.PartialMerge, + mergeOperatorConfig.FullMerge); + + cfOptions.SetMergeOperator(mergeOp); + columnFamilies.Add(columnFamilyName, cfOptions); + } + else + { + columnFamilies.Add(columnFamilyName, defaultColumnFamilyOptions); + } } return columnFamilies; diff --git a/src/RocksDb.Extensions/RocksDbOptions.cs b/src/RocksDb.Extensions/RocksDbOptions.cs index f8c9a60..09f5202 100644 --- a/src/RocksDb.Extensions/RocksDbOptions.cs +++ b/src/RocksDb.Extensions/RocksDbOptions.cs @@ -33,6 +33,12 @@ public class RocksDbOptions internal List ColumnFamilies { get; } = new(); + /// + /// Internal dictionary of merge operators per column family. + /// Column family names are case-sensitive, matching RocksDB's behavior. + /// + internal Dictionary MergeOperators { get; } = new(); + /// /// Enables direct I/O mode for reads, which bypasses the OS page cache. /// diff --git a/src/RocksDb.Extensions/RocksDbStore.cs b/src/RocksDb.Extensions/RocksDbStore.cs new file mode 100644 index 0000000..3b573e6 --- /dev/null +++ b/src/RocksDb.Extensions/RocksDbStore.cs @@ -0,0 +1,24 @@ +namespace RocksDb.Extensions; + +/// +/// Base class for a RocksDB store that provides basic operations such as add, update, remove, get and get all. +/// +/// The type of the store's keys. +/// The type of the store's values. +public abstract class RocksDbStore : RocksDbStoreBase +{ + /// + /// Initializes a new instance of the class with the specified RocksDB accessor. + /// + /// The RocksDB accessor to use for database operations. + protected RocksDbStore(IRocksDbAccessor rocksDbAccessor) : base(rocksDbAccessor) + { + } + + /// + /// Gets all the values in the store. (Obsolete, use GetAllValues instead) + /// + /// An enumerable collection of all the values in the store. + [Obsolete("Use GetAllValues() instead.")] + public IEnumerable GetAll() => GetAllValues(); +} diff --git a/src/RocksDb.Extensions/IRocksDbStore.cs b/src/RocksDb.Extensions/RocksDbStoreBase.cs similarity index 88% rename from src/RocksDb.Extensions/IRocksDbStore.cs rename to src/RocksDb.Extensions/RocksDbStoreBase.cs index c39f6b9..0d04dbe 100644 --- a/src/RocksDb.Extensions/IRocksDbStore.cs +++ b/src/RocksDb.Extensions/RocksDbStoreBase.cs @@ -1,21 +1,28 @@ +using System.ComponentModel; using System.Diagnostics.CodeAnalysis; namespace RocksDb.Extensions; /// -/// Base class for a RocksDB store that provides basic operations such as add, update, remove, get and get all. +/// Base class containing common operations for RocksDB stores. +/// This class is not intended for direct use by library consumers. +/// Use or instead. /// /// The type of the store's keys. /// The type of the store's values. -public abstract class RocksDbStore +[EditorBrowsable(EditorBrowsableState.Never)] +public abstract class RocksDbStoreBase { private readonly IRocksDbAccessor _rocksDbAccessor; /// - /// Initializes a new instance of the class with the specified RocksDB accessor. + /// Initializes a new instance of the class. /// /// The RocksDB accessor to use for database operations. - protected RocksDbStore(IRocksDbAccessor rocksDbAccessor) => _rocksDbAccessor = rocksDbAccessor; + protected internal RocksDbStoreBase(IRocksDbAccessor rocksDbAccessor) + { + _rocksDbAccessor = rocksDbAccessor; + } /// /// Removes the specified key and its associated value from the store. @@ -64,13 +71,6 @@ public abstract class RocksDbStore /// An enumerable collection of all the values in the store. public IEnumerable GetAllValues() => _rocksDbAccessor.GetAllValues(); - /// - /// Gets all the values in the store. (Obsolete, use GetAllValues instead) - /// - /// An enumerable collection of all the values in the store. - [Obsolete("Use GetAllValues() instead.")] - public IEnumerable GetAll() => GetAllValues(); - /// /// Determines whether the store contains a value for a specific key. /// @@ -108,3 +108,4 @@ public abstract class RocksDbStore /// An enumerable collection of all the keys in the store. public IEnumerable GetAllKeys() => _rocksDbAccessor.GetAllKeys(); } + diff --git a/test/RocksDb.Extensions.Tests/MergeOperatorTests.cs b/test/RocksDb.Extensions.Tests/MergeOperatorTests.cs new file mode 100644 index 0000000..3db2f3f --- /dev/null +++ b/test/RocksDb.Extensions.Tests/MergeOperatorTests.cs @@ -0,0 +1,250 @@ +using NUnit.Framework; +using RocksDb.Extensions.MergeOperators; +using RocksDb.Extensions.Tests.Utils; +using RocksDb.Extensions.Tests.Protos; +using RocksDb.Extensions.Protobuf; + +namespace RocksDb.Extensions.Tests; + +public class TagsStore : MergeableRocksDbStore, CollectionOperation> +{ + public TagsStore(IMergeAccessor, CollectionOperation> mergeAccessor) + : base(mergeAccessor) + { + } + + public void AddTags(string key, params string[] tags) + { + Merge(key, CollectionOperation.Add(tags)); + } + + public void RemoveTags(string key, params string[] tags) + { + Merge(key, CollectionOperation.Remove(tags)); + } +} + +public class ScoresStore : MergeableRocksDbStore, CollectionOperation> +{ + public ScoresStore(IMergeAccessor, CollectionOperation> mergeAccessor) + : base(mergeAccessor) + { + } + + public void AddScores(string key, params int[] scores) + { + Merge(key, CollectionOperation.Add(scores)); + } + + public void RemoveScores(string key, params int[] scores) + { + Merge(key, CollectionOperation.Remove(scores)); + } +} + +public class MergeOperatorTests +{ + [Test] + public void should_add_to_existing_list_using_merge_operation() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act + store.Put(key, new List { "csharp", "dotnet" }); + store.AddTags(key, "rocksdb"); + + // Assert + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(3)); + Assert.That(tags, Does.Contain("csharp")); + Assert.That(tags, Does.Contain("dotnet")); + Assert.That(tags, Does.Contain("rocksdb")); + } + + [Test] + public void should_add_items_to_list_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act + store.AddTags(key, "csharp", "dotnet"); + store.AddTags(key, "rocksdb"); + + // Assert + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(3)); + Assert.That(tags, Does.Contain("csharp")); + Assert.That(tags, Does.Contain("dotnet")); + Assert.That(tags, Does.Contain("rocksdb")); + } + + [Test] + public void should_remove_items_from_list_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Add items, then remove some + store.Merge(key, CollectionOperation.Add("csharp", "dotnet", "java", "python")); + store.RemoveTags(key, "java", "python"); + + // Assert + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(2)); + Assert.That(tags, Does.Contain("csharp")); + Assert.That(tags, Does.Contain("dotnet")); + Assert.That(tags, Does.Not.Contain("java")); + Assert.That(tags, Does.Not.Contain("python")); + } + + [Test] + public void should_handle_mixed_add_and_remove_operations() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Interleave adds and removes + store.AddTags(key, "a", "b", "c"); + store.RemoveTags(key, "b"); + store.AddTags(key, "d", "e"); + store.RemoveTags(key, "a", "e"); + + // Assert - Should have: c, d + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(2)); + Assert.That(tags, Does.Contain("c")); + Assert.That(tags, Does.Contain("d")); + } + + [Test] + public void should_handle_remove_nonexistent_item_gracefully() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Try to remove items that don't exist + store.AddTags(key, "csharp"); + store.RemoveTags(key, "nonexistent"); + + // Assert - Original item should still be there + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(1)); + Assert.That(tags, Does.Contain("csharp")); + } + + [Test] + public void should_remove_only_first_occurrence_of_duplicate_items() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Add duplicate items, then remove one + store.AddTags(key, "tag", "tag", "tag"); + store.RemoveTags(key, "tag"); + + // Assert - Should have 2 remaining + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(2)); + Assert.That(tags[0], Is.EqualTo("tag")); + Assert.That(tags[1], Is.EqualTo("tag")); + } + + [Test] + public void should_add_primitive_types_to_list_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, ScoresStore, CollectionOperation>( + "scores", + new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "player-1"; + + // Act + store.AddScores(key, 100, 200); + store.AddScores(key, 300); + + // Assert + Assert.That(store.TryGet(key, out var scores), Is.True); + Assert.That(scores, Is.Not.Null); + Assert.That(scores!.Count, Is.EqualTo(3)); + Assert.That(scores, Does.Contain(100)); + Assert.That(scores, Does.Contain(200)); + Assert.That(scores, Does.Contain(300)); + } + + [Test] + public void should_add_and_remove_primitive_types_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, ScoresStore, CollectionOperation>( + "scores", + new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "player-1"; + + // Act - Add items, then remove some + store.AddScores(key, 100, 200, 300, 400); + store.RemoveScores(key, 200, 400); // Remove middle values + + // Assert + Assert.That(store.TryGet(key, out var scores), Is.True); + Assert.That(scores, Is.Not.Null); + Assert.That(scores!.Count, Is.EqualTo(2)); + Assert.That(scores, Does.Contain(100)); + Assert.That(scores, Does.Contain(300)); + Assert.That(scores, Does.Not.Contain(200)); + Assert.That(scores, Does.Not.Contain(400)); + } +} \ No newline at end of file