Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/RocksDb.Extensions/IMergeAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.ComponentModel;

namespace RocksDb.Extensions;

#pragma warning disable CS1591

/// <summary>
/// This interface is not intended to be used directly by the clients of the library.
/// It provides merge operation support with a separate operand type.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public interface IMergeAccessor<TKey, TValue, in TOperand> : IRocksDbAccessor<TKey, TValue>
{
void Merge(TKey key, TOperand operand);
}

#pragma warning restore CS1591
47 changes: 47 additions & 0 deletions src/RocksDb.Extensions/IMergeOperator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace RocksDb.Extensions;

/// <summary>
/// Defines a merge operator for RocksDB that enables atomic read-modify-write operations.
/// Merge operators allow efficient updates without requiring a separate read before write,
/// which is particularly useful for counters, list appends, set unions, and other accumulative operations.
/// </summary>
/// <typeparam name="TValue">The type of the value stored in the database.</typeparam>
/// <typeparam name="TOperand">The type of the merge operand (the delta/change to apply).</typeparam>
/// <remarks>
/// The separation of <typeparamref name="TValue"/> and <typeparamref name="TOperand"/> allows for flexible merge patterns:
/// <list type="bullet">
/// <item><description>For counters: TValue=long, TOperand=long (same type)</description></item>
/// <item><description>For list append: TValue=IList&lt;T&gt;, TOperand=IList&lt;T&gt; (same type)</description></item>
/// <item><description>For list with add/remove: TValue=IList&lt;T&gt;, TOperand=ListOperation&lt;T&gt; (different types)</description></item>
/// </list>
/// </remarks>
public interface IMergeOperator<TValue, TOperand>
{
/// <summary>
/// Gets the name of the merge operator. This name is stored in the database
/// and must remain consistent across database opens.
/// </summary>
string Name { get; }

/// <summary>
/// Performs a full merge of the existing value with one or more operands.
/// Called when a Get operation encounters merge operands and needs to produce the final value.
/// </summary>
/// <param name="existingValue">The existing value in the database. For value types, this will be default if no value exists.</param>
/// <param name="operands">The span of merge operands to apply, in order.</param>
/// <returns>The merged value to store.</returns>
TValue FullMerge(TValue? existingValue, ReadOnlySpan<TOperand> operands);

/// <summary>
/// Performs a partial merge of multiple operands without the existing value.
/// Called during compaction to combine multiple merge operands into a single operand.
/// This is an optimization that reduces the number of operands that need to be stored.
/// </summary>
/// <param name="operands">The span of merge operands to combine, in order.</param>
/// <returns>The combined operand, or null if partial merge is not safe for these operands.</returns>
/// <remarks>
/// Return null when it's not safe to combine operands without knowing the existing value.
/// When null is returned, RocksDB will keep the operands separate and call FullMerge later.
/// </remarks>
TOperand? PartialMerge(ReadOnlySpan<TOperand> operands);
}
33 changes: 33 additions & 0 deletions src/RocksDb.Extensions/IRocksDbBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,37 @@ public interface IRocksDbBuilder
/// Use <c>GetRequiredKeyedService&lt;TStore&gt;(columnFamily)</c> to retrieve a specific store instance.
/// </remarks>
IRocksDbBuilder AddStore<TKey, TValue, TStore>(string columnFamily) where TStore : RocksDbStore<TKey, TValue>;

/// <summary>
/// Adds a mergeable RocksDB store to the builder for the specified column family.
/// This method enforces that the store inherits from <see cref="MergeableRocksDbStore{TKey,TValue,TOperand}"/>
/// and requires a merge operator, providing compile-time safety for merge operations.
/// </summary>
/// <param name="columnFamily">The name of the column family to associate with the store.</param>
/// <param name="mergeOperator">The merge operator to use for atomic read-modify-write operations.</param>
/// <typeparam name="TKey">The type of the store's key.</typeparam>
/// <typeparam name="TValue">The type of the store's values (the stored state).</typeparam>
/// <typeparam name="TOperand">The type of the merge operand (the delta/change to apply).</typeparam>
/// <typeparam name="TStore">The type of the store to add. Must inherit from <see cref="MergeableRocksDbStore{TKey,TValue,TOperand}"/>.</typeparam>
/// <returns>The builder instance for method chaining.</returns>
/// <exception cref="InvalidOperationException">Thrown if the specified column family is already registered.</exception>
/// <remarks>
/// <para>
/// Use this method when your store needs merge operations. The <typeparamref name="TStore"/> constraint
/// ensures that only stores designed for merge operations can be registered with this method.
/// </para>
/// <para>
/// The separation of <typeparamref name="TValue"/> and <typeparamref name="TOperand"/> allows for flexible patterns:
/// <list type="bullet">
/// <item><description>Counters: TValue=long, TOperand=long (same type)</description></item>
/// <item><description>List append: TValue=IList&lt;T&gt;, TOperand=IList&lt;T&gt; (same type)</description></item>
/// <item><description>List with add/remove: TValue=IList&lt;T&gt;, TOperand=ListOperation&lt;T&gt; (different types)</description></item>
/// </list>
/// </para>
/// <para>
/// For stores that don't need merge operations, use <see cref="AddStore{TKey,TValue,TStore}(string)"/> instead.
/// </para>
/// </remarks>
IRocksDbBuilder AddMergeableStore<TKey, TValue, TStore, TOperand>(string columnFamily, IMergeOperator<TValue, TOperand> mergeOperator)
where TStore : MergeableRocksDbStore<TKey, TValue, TOperand>;
}
108 changes: 108 additions & 0 deletions src/RocksDb.Extensions/ListOperationSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using System.Buffers;
using RocksDb.Extensions.MergeOperators;

namespace RocksDb.Extensions;

/// <summary>
/// Serializes ListOperation&lt;T&gt; which contains an operation type (Add/Remove) and a list of items.
/// </summary>
/// <remarks>
/// The serialized format consists of:
/// - 1 byte: Operation type (0 = Add, 1 = Remove)
/// - 4 bytes: Number of items
/// - For each item:
/// - 4 bytes: Size of the serialized item
/// - N bytes: Serialized item data
/// </remarks>
internal class ListOperationSerializer<T> : ISerializer<ListOperation<T>>
{
private readonly ISerializer<T> _itemSerializer;

public ListOperationSerializer(ISerializer<T> itemSerializer)
{
_itemSerializer = itemSerializer;
}

public bool TryCalculateSize(ref ListOperation<T> value, out int size)
{
// 1 byte for operation type + 4 bytes for count
size = sizeof(byte) + sizeof(int);

for (int i = 0; i < value.Items.Count; i++)
{
var item = value.Items[i];
if (_itemSerializer.TryCalculateSize(ref item, out var itemSize))
{
size += sizeof(int); // size prefix for each item
size += itemSize;
}
}

return true;
}
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TryCalculateSize method should return false when it cannot calculate the size for variable-sized items, but currently it always returns true. This will cause incorrect size calculations when items cannot have their size pre-calculated, leading to buffer allocation issues.

Copilot uses AI. Check for mistakes.

public void WriteTo(ref ListOperation<T> value, ref Span<byte> span)
{
int offset = 0;

// Write operation type (1 byte)
span[offset] = (byte)value.Type;
offset += sizeof(byte);

// Write count
var slice = span.Slice(offset, sizeof(int));
BitConverter.TryWriteBytes(slice, value.Items.Count);
offset += sizeof(int);

// Write each item with size prefix
for (int i = 0; i < value.Items.Count; i++)
{
var item = value.Items[i];
if (_itemSerializer.TryCalculateSize(ref item, out var itemSize))
{
slice = span.Slice(offset, sizeof(int));
BitConverter.TryWriteBytes(slice, itemSize);
offset += sizeof(int);

slice = span.Slice(offset, itemSize);
_itemSerializer.WriteTo(ref item, ref slice);
offset += itemSize;
}
}
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteTo method doesn't handle the case where _itemSerializer.TryCalculateSize returns false. When this happens, the item cannot be serialized using the span-based approach, but the method silently skips these items, leading to data loss. The method should fall back to using a buffer writer for items that don't support fixed-size serialization.

Copilot uses AI. Check for mistakes.
}
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteTo method only writes items when TryCalculateSize succeeds for each item, but there's no handling for the case when TryCalculateSize returns false. This creates an inconsistency with the TryCalculateSize method (which also has this issue) and could result in incomplete serialization where the count written doesn't match the actual number of items serialized.

Copilot uses AI. Check for mistakes.

public void WriteTo(ref ListOperation<T> value, IBufferWriter<byte> buffer)
{
throw new NotImplementedException();
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteTo(ref ListOperation<T> value, IBufferWriter<byte> buffer) method throws NotImplementedException, but this is a required method for the serializer interface. This will cause runtime failures when the buffer writer path is used instead of the span-based path. This method should be implemented to support variable-sized serialization.

Suggested change
throw new NotImplementedException();
// Write operation type (1 byte)
var opSpan = buffer.GetSpan(sizeof(byte));
opSpan[0] = (byte)value.Type;
buffer.Advance(sizeof(byte));
// Write count (4 bytes)
var countSpan = buffer.GetSpan(sizeof(int));
BitConverter.TryWriteBytes(countSpan, value.Items.Count);
buffer.Advance(sizeof(int));
// Write each item with size prefix and data
for (int i = 0; i < value.Items.Count; i++)
{
var item = value.Items[i];
if (_itemSerializer.TryCalculateSize(ref item, out var itemSize))
{
// Write size prefix (4 bytes)
var sizeSpan = buffer.GetSpan(sizeof(int));
BitConverter.TryWriteBytes(sizeSpan, itemSize);
buffer.Advance(sizeof(int));
// Write item data
var itemSpan = buffer.GetSpan(itemSize);
var tmpSpan = itemSpan.Slice(0, itemSize);
_itemSerializer.WriteTo(ref item, ref tmpSpan);
buffer.Advance(itemSize);
}
}

Copilot uses AI. Check for mistakes.
}

public ListOperation<T> Deserialize(ReadOnlySpan<byte> buffer)
{
int offset = 0;

// Read operation type
var operationType = (OperationType)buffer[offset];
offset += sizeof(byte);

// Read count
var slice = buffer.Slice(offset, sizeof(int));
var count = BitConverter.ToInt32(slice);
offset += sizeof(int);

// Read items
var items = new List<T>(count);
for (int i = 0; i < count; i++)
{
slice = buffer.Slice(offset, sizeof(int));
var itemSize = BitConverter.ToInt32(slice);
offset += sizeof(int);

slice = buffer.Slice(offset, itemSize);
var item = _itemSerializer.Deserialize(slice);
items.Add(item);
offset += itemSize;
}

return new ListOperation<T>(operationType, items);
}
}
88 changes: 88 additions & 0 deletions src/RocksDb.Extensions/MergeAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System.Buffers;
using CommunityToolkit.HighPerformance.Buffers;

namespace RocksDb.Extensions;

internal class MergeAccessor<TKey, TValue, TOperand> : RocksDbAccessor<TKey, TValue>, IMergeAccessor<TKey, TValue, TOperand>
{
private readonly ISerializer<TOperand> _operandSerializer;

public MergeAccessor(
RocksDbContext db,
ColumnFamily columnFamily,
ISerializer<TKey> keySerializer,
ISerializer<TValue> valueSerializer,
ISerializer<TOperand> operandSerializer) : base(db, columnFamily, keySerializer, valueSerializer)
{
_operandSerializer = operandSerializer;
}

public void Merge(TKey key, TOperand operand)
{
byte[]? rentedKeyBuffer = null;
bool useSpanAsKey;
// ReSharper disable once AssignmentInConditionalExpression
Span<byte> keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize))
? keySize < MaxStackSize
? stackalloc byte[keySize]
: (rentedKeyBuffer = ArrayPool<byte>.Shared.Rent(keySize)).AsSpan(0, keySize)
: Span<byte>.Empty;

ReadOnlySpan<byte> keySpan = keyBuffer;
ArrayPoolBufferWriter<byte>? keyBufferWriter = null;

byte[]? rentedOperandBuffer = null;
bool useSpanAsOperand;
// ReSharper disable once AssignmentInConditionalExpression
Span<byte> operandBuffer = (useSpanAsOperand = _operandSerializer.TryCalculateSize(ref operand, out var operandSize))
? operandSize < MaxStackSize
? stackalloc byte[operandSize]
: (rentedOperandBuffer = ArrayPool<byte>.Shared.Rent(operandSize)).AsSpan(0, operandSize)
: Span<byte>.Empty;


ReadOnlySpan<byte> operandSpan = operandBuffer;
ArrayPoolBufferWriter<byte>? operandBufferWriter = null;

try
{
if (useSpanAsKey)
{
KeySerializer.WriteTo(ref key, ref keyBuffer);
}
else
{
keyBufferWriter = new ArrayPoolBufferWriter<byte>();
KeySerializer.WriteTo(ref key, keyBufferWriter);
keySpan = keyBufferWriter.WrittenSpan;
}

if (useSpanAsOperand)
{
_operandSerializer.WriteTo(ref operand, ref operandBuffer);
}
else
{
operandBufferWriter = new ArrayPoolBufferWriter<byte>();
_operandSerializer.WriteTo(ref operand, operandBufferWriter);
operandSpan = operandBufferWriter.WrittenSpan;
}

RocksDbContext.Db.Merge(keySpan, operandSpan, ColumnFamily.Handle);
}
finally
{
keyBufferWriter?.Dispose();
operandBufferWriter?.Dispose();
if (rentedKeyBuffer is not null)
{
ArrayPool<byte>.Shared.Return(rentedKeyBuffer);
}

if (rentedOperandBuffer is not null)
{
ArrayPool<byte>.Shared.Return(rentedOperandBuffer);
}
}
}
}
29 changes: 29 additions & 0 deletions src/RocksDb.Extensions/MergeOperatorConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using RocksDbSharp;

namespace RocksDb.Extensions;

/// <summary>
/// Internal configuration for a merge operator associated with a column family.
/// </summary>
internal class MergeOperatorConfig
{
/// <summary>
/// Gets the name of the merge operator.
/// </summary>
public string Name { get; set; } = null!;

/// <summary>
/// Gets the full merge callback delegate.
/// </summary>
public global::RocksDbSharp.MergeOperators.FullMergeFunc FullMerge { get; set; } = null!;

/// <summary>
/// Gets the partial merge callback delegate.
/// </summary>
public global::RocksDbSharp.MergeOperators.PartialMergeFunc PartialMerge { get; set; } = null!;

/// <summary>
/// Gets the value serializer for deserializing and serializing values.
/// </summary>
public object ValueSerializer { get; set; } = null!;
}
Loading