-
Notifications
You must be signed in to change notification settings - Fork 0
Add support for Merge operator #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
6f77e2b
81f3746
8563757
0ba1404
36a7381
1143ec4
da93637
95dbd3a
3b1398f
c985414
63b97fc
bd32b5a
d2670f5
ee73d9f
ae6036c
4cc4e27
0d6565c
47f24e1
00fb780
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| using System.ComponentModel; | ||
|
|
||
| namespace RocksDb.Extensions; | ||
|
|
||
| #pragma warning disable CS1591 | ||
|
|
||
| /// <summary> | ||
| /// This interface is not intended to be used directly by the clients of the library. | ||
| /// It provides merge operation support with a separate operand type. | ||
| /// </summary> | ||
| [EditorBrowsable(EditorBrowsableState.Never)] | ||
| public interface IMergeAccessor<TKey, TValue, in TOperand> : IRocksDbAccessor<TKey, TValue> | ||
| { | ||
| void Merge(TKey key, TOperand operand); | ||
| } | ||
|
|
||
| #pragma warning restore CS1591 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| namespace RocksDb.Extensions; | ||
|
|
||
| /// <summary> | ||
| /// Defines a merge operator for RocksDB that enables atomic read-modify-write operations. | ||
| /// Merge operators allow efficient updates without requiring a separate read before write, | ||
| /// which is particularly useful for counters, list appends, set unions, and other accumulative operations. | ||
| /// </summary> | ||
| /// <typeparam name="TValue">The type of the value stored in the database.</typeparam> | ||
| /// <typeparam name="TOperand">The type of the merge operand (the delta/change to apply).</typeparam> | ||
| /// <remarks> | ||
| /// The separation of <typeparamref name="TValue"/> and <typeparamref name="TOperand"/> allows for flexible merge patterns: | ||
| /// <list type="bullet"> | ||
| /// <item><description>For counters: TValue=long, TOperand=long (same type)</description></item> | ||
| /// <item><description>For list append: TValue=IList<T>, TOperand=IList<T> (same type)</description></item> | ||
| /// <item><description>For list with add/remove: TValue=IList<T>, TOperand=ListOperation<T> (different types)</description></item> | ||
| /// </list> | ||
| /// </remarks> | ||
| public interface IMergeOperator<TValue, TOperand> | ||
| { | ||
| /// <summary> | ||
| /// Gets the name of the merge operator. This name is stored in the database | ||
| /// and must remain consistent across database opens. | ||
| /// </summary> | ||
| string Name { get; } | ||
|
|
||
| /// <summary> | ||
| /// Performs a full merge of the existing value with one or more operands. | ||
| /// Called when a Get operation encounters merge operands and needs to produce the final value. | ||
| /// </summary> | ||
| /// <param name="existingValue">The existing value in the database. For value types, this will be default if no value exists.</param> | ||
| /// <param name="operands">The list of merge operands to apply, in order.</param> | ||
| /// <returns>The merged value to store.</returns> | ||
| TValue FullMerge(TValue? existingValue, IReadOnlyList<TOperand> operands); | ||
|
|
||
| /// <summary> | ||
| /// Performs a partial merge of multiple operands without the existing value. | ||
| /// Called during compaction to combine multiple merge operands into a single operand. | ||
| /// This is an optimization that reduces the number of operands that need to be stored. | ||
| /// </summary> | ||
| /// <param name="operands">The list of merge operands to combine, in order.</param> | ||
| /// <returns>The combined operand, or null if partial merge is not safe for these operands.</returns> | ||
| /// <remarks> | ||
| /// Return null when it's not safe to combine operands without knowing the existing value. | ||
| /// When null is returned, RocksDB will keep the operands separate and call FullMerge later. | ||
| /// </remarks> | ||
| TOperand? PartialMerge(IReadOnlyList<TOperand> operands); | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,108 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Buffers; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using RocksDb.Extensions.MergeOperators; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace RocksDb.Extensions; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Serializes ListOperation<T> which contains an operation type (Add/Remove) and a list of items. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <remarks> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// The serialized format consists of: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - 1 byte: Operation type (0 = Add, 1 = Remove) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - 4 bytes: Number of items | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - For each item: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - 4 bytes: Size of the serialized item | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - N bytes: Serialized item data | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// </remarks> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| internal class ListOperationSerializer<T> : ISerializer<ListOperation<T>> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly ISerializer<T> _itemSerializer; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public ListOperationSerializer(ISerializer<T> itemSerializer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _itemSerializer = itemSerializer; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public bool TryCalculateSize(ref ListOperation<T> value, out int size) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // 1 byte for operation type + 4 bytes for count | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| size = sizeof(byte) + sizeof(int); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < value.Items.Count; i++) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var item = value.Items[i]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (_itemSerializer.TryCalculateSize(ref item, out var itemSize)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| size += sizeof(int); // size prefix for each item | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| size += itemSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void WriteTo(ref ListOperation<T> value, ref Span<byte> span) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| int offset = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Write operation type (1 byte) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| span[offset] = (byte)value.Type; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offset += sizeof(byte); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Write count | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var slice = span.Slice(offset, sizeof(int)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| BitConverter.TryWriteBytes(slice, value.Items.Count); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offset += sizeof(int); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Write each item with size prefix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < value.Items.Count; i++) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var item = value.Items[i]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (_itemSerializer.TryCalculateSize(ref item, out var itemSize)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| slice = span.Slice(offset, sizeof(int)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| BitConverter.TryWriteBytes(slice, itemSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offset += sizeof(int); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| slice = span.Slice(offset, itemSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _itemSerializer.WriteTo(ref item, ref slice); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| offset += itemSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void WriteTo(ref ListOperation<T> value, IBufferWriter<byte> buffer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new NotImplementedException(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new NotImplementedException(); | |
| // Write operation type (1 byte) | |
| var opSpan = buffer.GetSpan(sizeof(byte)); | |
| opSpan[0] = (byte)value.Type; | |
| buffer.Advance(sizeof(byte)); | |
| // Write count (4 bytes) | |
| var countSpan = buffer.GetSpan(sizeof(int)); | |
| BitConverter.TryWriteBytes(countSpan, value.Items.Count); | |
| buffer.Advance(sizeof(int)); | |
| // Write each item with size prefix and data | |
| for (int i = 0; i < value.Items.Count; i++) | |
| { | |
| var item = value.Items[i]; | |
| if (_itemSerializer.TryCalculateSize(ref item, out var itemSize)) | |
| { | |
| // Write size prefix (4 bytes) | |
| var sizeSpan = buffer.GetSpan(sizeof(int)); | |
| BitConverter.TryWriteBytes(sizeSpan, itemSize); | |
| buffer.Advance(sizeof(int)); | |
| // Write item data | |
| var itemSpan = buffer.GetSpan(itemSize); | |
| var tmpSpan = itemSpan.Slice(0, itemSize); | |
| _itemSerializer.WriteTo(ref item, ref tmpSpan); | |
| buffer.Advance(itemSize); | |
| } | |
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| using System.Buffers; | ||
| using CommunityToolkit.HighPerformance.Buffers; | ||
|
|
||
| namespace RocksDb.Extensions; | ||
|
|
||
| internal class MergeAccessor<TKey, TValue, TOperand> : RocksDbAccessor<TKey, TValue>, IMergeAccessor<TKey, TValue, TOperand> | ||
| { | ||
| private readonly ISerializer<TOperand> _operandSerializer; | ||
|
|
||
| public MergeAccessor( | ||
| RocksDbContext db, | ||
| ColumnFamily columnFamily, | ||
| ISerializer<TKey> keySerializer, | ||
| ISerializer<TValue> valueSerializer, | ||
| ISerializer<TOperand> operandSerializer) : base(db, columnFamily, keySerializer, valueSerializer) | ||
| { | ||
| _operandSerializer = operandSerializer; | ||
| } | ||
|
|
||
| public void Merge(TKey key, TOperand operand) | ||
| { | ||
| byte[]? rentedKeyBuffer = null; | ||
| bool useSpanAsKey; | ||
| // ReSharper disable once AssignmentInConditionalExpression | ||
| Span<byte> keyBuffer = (useSpanAsKey = _keySerializer.TryCalculateSize(ref key, out var keySize)) | ||
| ? keySize < MaxStackSize | ||
| ? stackalloc byte[keySize] | ||
| : (rentedKeyBuffer = ArrayPool<byte>.Shared.Rent(keySize)).AsSpan(0, keySize) | ||
| : Span<byte>.Empty; | ||
|
|
||
| ReadOnlySpan<byte> keySpan = keyBuffer; | ||
| ArrayPoolBufferWriter<byte>? keyBufferWriter = null; | ||
Havret marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| byte[]? rentedOperandBuffer = null; | ||
| bool useSpanAsOperand; | ||
| // ReSharper disable once AssignmentInConditionalExpression | ||
| Span<byte> operandBuffer = (useSpanAsOperand = _operandSerializer.TryCalculateSize(ref operand, out var operandSize)) | ||
| ? operandSize < MaxStackSize | ||
| ? stackalloc byte[operandSize] | ||
| : (rentedOperandBuffer = ArrayPool<byte>.Shared.Rent(operandSize)).AsSpan(0, operandSize) | ||
| : Span<byte>.Empty; | ||
|
|
||
|
|
||
| ReadOnlySpan<byte> operandSpan = operandBuffer; | ||
| ArrayPoolBufferWriter<byte>? operandBufferWriter = null; | ||
Havret marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| try | ||
| { | ||
| if (useSpanAsKey) | ||
| { | ||
| _keySerializer.WriteTo(ref key, ref keyBuffer); | ||
| } | ||
| else | ||
| { | ||
| keyBufferWriter = new ArrayPoolBufferWriter<byte>(); | ||
| _keySerializer.WriteTo(ref key, keyBufferWriter); | ||
| keySpan = keyBufferWriter.WrittenSpan; | ||
| } | ||
|
|
||
| if (useSpanAsOperand) | ||
| { | ||
| _operandSerializer.WriteTo(ref operand, ref operandBuffer); | ||
| } | ||
| else | ||
| { | ||
| operandBufferWriter = new ArrayPoolBufferWriter<byte>(); | ||
| _operandSerializer.WriteTo(ref operand, operandBufferWriter); | ||
| operandSpan = operandBufferWriter.WrittenSpan; | ||
| } | ||
|
|
||
| _rocksDbContext.Db.Merge(keySpan, operandSpan, _columnFamily.Handle); | ||
| } | ||
| finally | ||
| { | ||
| keyBufferWriter?.Dispose(); | ||
| operandBufferWriter?.Dispose(); | ||
| if (rentedKeyBuffer is not null) | ||
| { | ||
| ArrayPool<byte>.Shared.Return(rentedKeyBuffer); | ||
| } | ||
|
|
||
| if (rentedOperandBuffer is not null) | ||
| { | ||
| ArrayPool<byte>.Shared.Return(rentedOperandBuffer); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| using RocksDbSharp; | ||
|
|
||
| namespace RocksDb.Extensions; | ||
|
|
||
| /// <summary> | ||
| /// Internal configuration for a merge operator associated with a column family. | ||
| /// </summary> | ||
| internal class MergeOperatorConfig | ||
| { | ||
| /// <summary> | ||
| /// Gets the name of the merge operator. | ||
| /// </summary> | ||
| public string Name { get; set; } = null!; | ||
|
|
||
| /// <summary> | ||
| /// Gets the full merge callback delegate. | ||
| /// </summary> | ||
| public global::RocksDbSharp.MergeOperators.FullMergeFunc FullMerge { get; set; } = null!; | ||
|
|
||
| /// <summary> | ||
| /// Gets the partial merge callback delegate. | ||
| /// </summary> | ||
| public global::RocksDbSharp.MergeOperators.PartialMergeFunc PartialMerge { get; set; } = null!; | ||
|
|
||
| /// <summary> | ||
| /// Gets the value serializer for deserializing and serializing values. | ||
| /// </summary> | ||
| public object ValueSerializer { get; set; } = null!; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| namespace RocksDb.Extensions.MergeOperators; | ||
|
|
||
| /// <summary> | ||
| /// A merge operator that appends items to a list. | ||
| /// Useful for implementing atomic list append operations without requiring a read before write. | ||
| /// </summary> | ||
| /// <typeparam name="T">The type of elements in the list.</typeparam> | ||
| /// <example> | ||
| /// <code> | ||
| /// public class EventLogStore : MergeableRocksDbStore<string, IList<string>, IList<string>> | ||
| /// { | ||
| /// public EventLogStore(IRocksDbAccessor<string, IList<string>> accessor, IMergeAccessor<string, IList<string>> mergeAccessor) | ||
| /// : base(accessor, mergeAccessor) { } | ||
| /// | ||
| /// public void AppendEvent(string key, string eventData) | ||
| /// { | ||
| /// Merge(key, new List<string> { eventData }); | ||
| /// } | ||
| /// } | ||
| /// | ||
| /// // Registration: | ||
| /// builder.AddMergeableStore<string, IList<string>, IList<string>, EventLogStore>("events", new ListAppendMergeOperator<string>()); | ||
| /// </code> | ||
| /// </example> | ||
| public class ListAppendMergeOperator<T> : IMergeOperator<IList<T>, IList<T>> | ||
| { | ||
| /// <inheritdoc /> | ||
| public string Name => $"ListAppendMergeOperator<{typeof(T).Name}>"; | ||
|
|
||
| /// <inheritdoc /> | ||
| public IList<T> FullMerge(IList<T>? existingValue, IReadOnlyList<IList<T>> operands) | ||
| { | ||
| var result = existingValue != null ? new List<T>(existingValue) : new List<T>(); | ||
|
|
||
| foreach (var operand in operands) | ||
| { | ||
| foreach (var item in operand) | ||
| { | ||
| result.Add(item); | ||
| } | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public IList<T> PartialMerge(IReadOnlyList<IList<T>> operands) | ||
| { | ||
| var result = new List<T>(); | ||
|
|
||
| foreach (var operand in operands) | ||
| { | ||
| foreach (var item in operand) | ||
| { | ||
| result.Add(item); | ||
| } | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
TryCalculateSizemethod should returnfalsewhen it cannot calculate the size for variable-sized items, but currently it always returnstrue. This will cause incorrect size calculations when items cannot have their size pre-calculated, leading to buffer allocation issues.