Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/RocksDb.Extensions/IMergeAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.ComponentModel;

namespace RocksDb.Extensions;

#pragma warning disable CS1591

/// <summary>
/// This interface is not intended to be used directly by the clients of the library.
/// It provides merge operation support with a separate operand type.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public interface IMergeAccessor<TKey, TValue, in TOperand> : IRocksDbAccessor<TKey, TValue>
{
void Merge(TKey key, TOperand operand);
}

#pragma warning restore CS1591
47 changes: 47 additions & 0 deletions src/RocksDb.Extensions/IMergeOperator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace RocksDb.Extensions;

/// <summary>
/// Defines a merge operator for RocksDB that enables atomic read-modify-write operations.
/// Merge operators allow efficient updates without requiring a separate read before write,
/// which is particularly useful for counters, list appends, set unions, and other accumulative operations.
/// </summary>
/// <typeparam name="TValue">The type of the value stored in the database.</typeparam>
/// <typeparam name="TOperand">The type of the merge operand (the delta/change to apply).</typeparam>
/// <remarks>
/// The separation of <typeparamref name="TValue"/> and <typeparamref name="TOperand"/> allows for flexible merge patterns:
/// <list type="bullet">
/// <item><description>For counters: TValue=long, TOperand=long (same type)</description></item>
/// <item><description>For list append: TValue=IList&lt;T&gt;, TOperand=IList&lt;T&gt; (same type)</description></item>
/// <item><description>For list with add/remove: TValue=IList&lt;T&gt;, TOperand=CollectionOperation&lt;T&gt; (different types)</description></item>
/// </list>
/// </remarks>
public interface IMergeOperator<TValue, TOperand>
{
/// <summary>
/// Gets the name of the merge operator. This name is stored in the database
/// and must remain consistent across database opens.
/// </summary>
string Name { get; }

/// <summary>
/// Performs a full merge of the existing value with one or more operands.
/// Called when a Get operation encounters merge operands and needs to produce the final value.
/// </summary>
/// <param name="existingValue">The existing value in the database. For value types, this will be default if no value exists.</param>
/// <param name="operands">The span of merge operands to apply, in order.</param>
/// <returns>The merged value to store.</returns>
TValue FullMerge(TValue? existingValue, ReadOnlySpan<TOperand> operands);

/// <summary>
/// Performs a partial merge of multiple operands without the existing value.
/// Called during compaction to combine multiple merge operands into a single operand.
/// This is an optimization that reduces the number of operands that need to be stored.
/// </summary>
/// <param name="operands">The span of merge operands to combine, in order.</param>
/// <returns>The combined operand, or null if partial merge is not safe for these operands.</returns>
/// <remarks>
/// Return null when it's not safe to combine operands without knowing the existing value.
/// When null is returned, RocksDB will keep the operands separate and call FullMerge later.
/// </remarks>
TOperand? PartialMerge(ReadOnlySpan<TOperand> operands);
}
33 changes: 33 additions & 0 deletions src/RocksDb.Extensions/IRocksDbBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,37 @@ public interface IRocksDbBuilder
/// Use <c>GetRequiredKeyedService&lt;TStore&gt;(columnFamily)</c> to retrieve a specific store instance.
/// </remarks>
IRocksDbBuilder AddStore<TKey, TValue, TStore>(string columnFamily) where TStore : RocksDbStore<TKey, TValue>;

/// <summary>
/// Adds a mergeable RocksDB store to the builder for the specified column family.
/// This method enforces that the store inherits from <see cref="MergeableRocksDbStore{TKey,TValue,TOperand}"/>
/// and requires a merge operator, providing compile-time safety for merge operations.
/// </summary>
/// <param name="columnFamily">The name of the column family to associate with the store.</param>
/// <param name="mergeOperator">The merge operator to use for atomic read-modify-write operations.</param>
/// <typeparam name="TKey">The type of the store's key.</typeparam>
/// <typeparam name="TValue">The type of the store's values (the stored state).</typeparam>
/// <typeparam name="TOperand">The type of the merge operand (the delta/change to apply).</typeparam>
/// <typeparam name="TStore">The type of the store to add. Must inherit from <see cref="MergeableRocksDbStore{TKey,TValue,TOperand}"/>.</typeparam>
/// <returns>The builder instance for method chaining.</returns>
/// <exception cref="InvalidOperationException">Thrown if the specified column family is already registered.</exception>
/// <remarks>
/// <para>
/// Use this method when your store needs merge operations. The <typeparamref name="TStore"/> constraint
/// ensures that only stores designed for merge operations can be registered with this method.
/// </para>
/// <para>
/// The separation of <typeparamref name="TValue"/> and <typeparamref name="TOperand"/> allows for flexible patterns:
/// <list type="bullet">
/// <item><description>Counters: TValue=long, TOperand=long (same type)</description></item>
/// <item><description>List append: TValue=IList&lt;T&gt;, TOperand=IList&lt;T&gt; (same type)</description></item>
/// <item><description>List with add/remove: TValue=IList&lt;T&gt;, TOperand=CollectionOperation&lt;T&gt; (different types)</description></item>
/// </list>
/// </para>
/// <para>
/// For stores that don't need merge operations, use <see cref="AddStore{TKey,TValue,TStore}(string)"/> instead.
/// </para>
/// </remarks>
IRocksDbBuilder AddMergeableStore<TKey, TValue, TStore, TOperand>(string columnFamily, IMergeOperator<TValue, TOperand> mergeOperator)
where TStore : MergeableRocksDbStore<TKey, TValue, TOperand>;
}
84 changes: 84 additions & 0 deletions src/RocksDb.Extensions/ListOperationSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System.Buffers;
using RocksDb.Extensions.MergeOperators;

namespace RocksDb.Extensions;

/// <summary>
/// Serializes CollectionOperation&lt;T&gt; which contains an operation type (Add/Remove) and a list of items.
/// </summary>
/// <remarks>
/// <para>
/// The serialized format consists of:
/// - 1 byte: Operation type (0 = Add, 1 = Remove)
/// - Remaining bytes: Serialized list using FixedSizeListSerializer (for primitives) or VariableSizeListSerializer (for complex types)
/// </para>
/// <para>
/// Space efficiency optimization:
/// - For primitive types (int, long, bool, etc.), uses FixedSizeListSerializer which stores:
/// - 4 bytes: list count
/// - N * elementSize bytes: all elements (no per-element size prefix)
/// Example: List&lt;int&gt; with 3 elements = 4 + (3 * 4) = 16 bytes
/// </para>
/// <para>
/// - For non-primitive types (strings, objects, protobuf messages), uses VariableSizeListSerializer which stores:
/// - 4 bytes: list count
/// - For each element: 4 bytes size prefix + element data
/// Example: List&lt;string&gt; with ["ab", "cde"] = 4 + (4+2) + (4+3) = 17 bytes
/// </para>
/// </remarks>
internal class ListOperationSerializer<T> : ISerializer<CollectionOperation<T>>
{
private readonly ISerializer<IList<T>> _listSerializer;

public ListOperationSerializer(ISerializer<T> itemSerializer)
{
// Use FixedSizeListSerializer for primitive types to avoid storing size for each element
// Use VariableSizeListSerializer for non-primitive types where elements may vary in size
_listSerializer = typeof(T).IsPrimitive
? new FixedSizeListSerializer<T>(itemSerializer)
: new VariableSizeListSerializer<T>(itemSerializer);
}

public bool TryCalculateSize(ref CollectionOperation<T> value, out int size)
{
// 1 byte for operation type + size of the list
size = sizeof(byte);

var items = value.Items;
if (_listSerializer.TryCalculateSize(ref items, out var listSize))
{
size += listSize;
return true;
}

return false;
}

public void WriteTo(ref CollectionOperation<T> value, ref Span<byte> span)
{
// Write operation type (1 byte)
span[0] = (byte)value.Type;

// Write the list using the list serializer
var listSpan = span.Slice(sizeof(byte));
var items = value.Items;
_listSerializer.WriteTo(ref items, ref listSpan);
}

public void WriteTo(ref CollectionOperation<T> value, IBufferWriter<byte> buffer)
{
throw new NotImplementedException();
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteTo(ref ListOperation<T> value, IBufferWriter<byte> buffer) method throws NotImplementedException, but this is a required method for the serializer interface. This will cause runtime failures when the buffer writer path is used instead of the span-based path. This method should be implemented to support variable-sized serialization.

Suggested change
throw new NotImplementedException();
// Write operation type (1 byte)
var opSpan = buffer.GetSpan(sizeof(byte));
opSpan[0] = (byte)value.Type;
buffer.Advance(sizeof(byte));
// Write count (4 bytes)
var countSpan = buffer.GetSpan(sizeof(int));
BitConverter.TryWriteBytes(countSpan, value.Items.Count);
buffer.Advance(sizeof(int));
// Write each item with size prefix and data
for (int i = 0; i < value.Items.Count; i++)
{
var item = value.Items[i];
if (_itemSerializer.TryCalculateSize(ref item, out var itemSize))
{
// Write size prefix (4 bytes)
var sizeSpan = buffer.GetSpan(sizeof(int));
BitConverter.TryWriteBytes(sizeSpan, itemSize);
buffer.Advance(sizeof(int));
// Write item data
var itemSpan = buffer.GetSpan(itemSize);
var tmpSpan = itemSpan.Slice(0, itemSize);
_itemSerializer.WriteTo(ref item, ref tmpSpan);
buffer.Advance(itemSize);
}
}

Copilot uses AI. Check for mistakes.
}

public CollectionOperation<T> Deserialize(ReadOnlySpan<byte> buffer)
{
// Read operation type
var operationType = (OperationType)buffer[0];

// Read the list using the list serializer
var listBuffer = buffer.Slice(sizeof(byte));
var items = _listSerializer.Deserialize(listBuffer);

return new CollectionOperation<T>(operationType, items);
}
}
88 changes: 88 additions & 0 deletions src/RocksDb.Extensions/MergeAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System.Buffers;
using CommunityToolkit.HighPerformance.Buffers;

namespace RocksDb.Extensions;

internal class MergeAccessor<TKey, TValue, TOperand> : RocksDbAccessor<TKey, TValue>, IMergeAccessor<TKey, TValue, TOperand>
{
private readonly ISerializer<TOperand> _operandSerializer;

public MergeAccessor(
RocksDbContext db,
ColumnFamily columnFamily,
ISerializer<TKey> keySerializer,
ISerializer<TValue> valueSerializer,
ISerializer<TOperand> operandSerializer) : base(db, columnFamily, keySerializer, valueSerializer)
{
_operandSerializer = operandSerializer;
}

public void Merge(TKey key, TOperand operand)
{
byte[]? rentedKeyBuffer = null;
bool useSpanAsKey;
// ReSharper disable once AssignmentInConditionalExpression
Span<byte> keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize))
? keySize < MaxStackSize
? stackalloc byte[keySize]
: (rentedKeyBuffer = ArrayPool<byte>.Shared.Rent(keySize)).AsSpan(0, keySize)
: Span<byte>.Empty;

ReadOnlySpan<byte> keySpan = keyBuffer;
ArrayPoolBufferWriter<byte>? keyBufferWriter = null;

byte[]? rentedOperandBuffer = null;
bool useSpanAsOperand;
// ReSharper disable once AssignmentInConditionalExpression
Span<byte> operandBuffer = (useSpanAsOperand = _operandSerializer.TryCalculateSize(ref operand, out var operandSize))
? operandSize < MaxStackSize
? stackalloc byte[operandSize]
: (rentedOperandBuffer = ArrayPool<byte>.Shared.Rent(operandSize)).AsSpan(0, operandSize)
: Span<byte>.Empty;


ReadOnlySpan<byte> operandSpan = operandBuffer;
ArrayPoolBufferWriter<byte>? operandBufferWriter = null;

try
{
if (useSpanAsKey)
{
KeySerializer.WriteTo(ref key, ref keyBuffer);
}
else
{
keyBufferWriter = new ArrayPoolBufferWriter<byte>();
KeySerializer.WriteTo(ref key, keyBufferWriter);
keySpan = keyBufferWriter.WrittenSpan;
}

if (useSpanAsOperand)
{
_operandSerializer.WriteTo(ref operand, ref operandBuffer);
}
else
{
operandBufferWriter = new ArrayPoolBufferWriter<byte>();
_operandSerializer.WriteTo(ref operand, operandBufferWriter);
operandSpan = operandBufferWriter.WrittenSpan;
}

RocksDbContext.Db.Merge(keySpan, operandSpan, ColumnFamily.Handle, RocksDbContext.WriteOptions);
}
finally
{
keyBufferWriter?.Dispose();
operandBufferWriter?.Dispose();
if (rentedKeyBuffer is not null)
{
ArrayPool<byte>.Shared.Return(rentedKeyBuffer);
}

if (rentedOperandBuffer is not null)
{
ArrayPool<byte>.Shared.Return(rentedOperandBuffer);
}
}
}
}
Loading