Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/RocksDb.Extensions/ColumnFamily.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using RocksDbSharp;

namespace RocksDb.Extensions;

internal class ColumnFamily
{
public ColumnFamilyHandle Handle { get; set; }
public string Name { get; }

public ColumnFamily(ColumnFamilyHandle handle, string name)
{
Handle = handle;
Name = name;
}
}
1 change: 1 addition & 0 deletions src/RocksDb.Extensions/IRocksDbAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public interface IRocksDbAccessor<TKey, TValue>
void PutRange(IReadOnlyList<(TKey key, TValue value)> items);
IEnumerable<TValue> GetAll();
bool HasKey(TKey key);
void Clear();
}

#pragma warning restore CS1591
11 changes: 11 additions & 0 deletions src/RocksDb.Extensions/IRocksDbStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,15 @@ public abstract class RocksDbStore<TKey, TValue>
/// <param name="key">The key to check in the store for an associated value.</param>
/// <returns><c>true</c> if the store contains an element with the specified key; otherwise, <c>false</c>.</returns>
public bool HasKey(TKey key) => _rocksDbAccessor.HasKey(key);

/// <summary>
/// Resets the column family associated with the store.
/// This operation destroys the current column family and creates a new one,
/// effectively removing all stored key-value pairs.
///
/// Note: This method is intended for scenarios where a complete reset of the column family
/// is required. The operation may involve internal reallocation and metadata changes, which
/// can impact performance during execution. Use with caution in high-frequency workflows.
/// </summary>
public void Clear() => _rocksDbAccessor.Clear();
}
38 changes: 22 additions & 16 deletions src/RocksDb.Extensions/RocksDbAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ internal class RocksDbAccessor<TKey, TValue> : IRocksDbAccessor<TKey, TValue>, I

private readonly ISerializer<TKey> _keySerializer;
private readonly ISerializer<TValue> _valueSerializer;
private readonly RocksDbSharp.RocksDb _rocksDb;
private readonly ColumnFamilyHandle _columnFamilyHandle;
private readonly RocksDbContext _rocksDbContext;
private readonly ColumnFamily _columnFamily;
private readonly bool _checkIfExists;

public RocksDbAccessor(RocksDbSharp.RocksDb rocksDb,
ColumnFamilyHandle columnFamilyHandle,
public RocksDbAccessor(RocksDbContext rocksDbContext,
ColumnFamily columnFamily,
ISerializer<TKey> keySerializer,
ISerializer<TValue> valueSerializer)
{
_rocksDb = rocksDb;
_columnFamilyHandle = columnFamilyHandle;
_rocksDbContext = rocksDbContext;
_columnFamily = columnFamily;
_keySerializer = keySerializer;
_valueSerializer = valueSerializer;

Expand Down Expand Up @@ -56,7 +56,7 @@ public void Remove(TKey key)
keySpan = keyBufferWriter.WrittenSpan;
}

_rocksDb.Remove(keySpan, _columnFamilyHandle);
_rocksDbContext.Db.Remove(keySpan, _columnFamily.Handle);
}
finally
{
Expand Down Expand Up @@ -119,7 +119,7 @@ public void Put(TKey key, TValue value)
valueSpan = valueBufferWriter.WrittenSpan;
}

_rocksDb.Put(keySpan, valueSpan, _columnFamilyHandle);
_rocksDbContext.Db.Put(keySpan, valueSpan, _columnFamily.Handle);
}
finally
{
Expand Down Expand Up @@ -165,13 +165,13 @@ public bool TryGet(TKey key, [MaybeNullWhen(false)] out TValue value)
keySpan = keyBufferWriter.WrittenSpan;
}

if (_checkIfExists && _rocksDb.HasKey(keySpan, _columnFamilyHandle) == false)
if (_checkIfExists && _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle) == false)
{
value = default;
return false;
}

value = _rocksDb.Get(keySpan, this, _columnFamilyHandle);
value = _rocksDbContext.Db.Get(keySpan, this, _columnFamily.Handle);
return value != null;
}
finally
Expand Down Expand Up @@ -202,7 +202,7 @@ public void PutRange(ReadOnlySpan<TKey> keys, ReadOnlySpan<TValue> values)
AddToBatch(keys[i], values[i], batch);
}

_rocksDb.Write(batch);
_rocksDbContext.Db.Write(batch);
}

public void PutRange(ReadOnlySpan<TValue> values, Func<TValue, TKey> keySelector)
Expand All @@ -215,7 +215,7 @@ public void PutRange(ReadOnlySpan<TValue> values, Func<TValue, TKey> keySelector
AddToBatch(key, value, batch);
}

_rocksDb.Write(batch);
_rocksDbContext.Db.Write(batch);
}

public void PutRange(IReadOnlyList<(TKey key, TValue value)> items)
Expand All @@ -227,7 +227,7 @@ public void PutRange(IReadOnlyList<(TKey key, TValue value)> items)
AddToBatch(key, value, batch);
}

_rocksDb.Write(batch);
_rocksDbContext.Db.Write(batch);
}

private void AddToBatch(TKey key, TValue value, WriteBatch batch)
Expand Down Expand Up @@ -281,7 +281,7 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch)
valueSpan = valueBufferWriter.WrittenSpan;
}

_ = batch.Put(keySpan, valueSpan, _columnFamilyHandle);
_ = batch.Put(keySpan, valueSpan, _columnFamily.Handle);
}
finally
{
Expand All @@ -301,7 +301,7 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch)

public IEnumerable<TValue> GetAll()
{
using var iterator = _rocksDb.NewIterator(_columnFamilyHandle);
using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle);
_ = iterator.SeekToFirst();
while (iterator.Valid())
{
Expand Down Expand Up @@ -338,7 +338,7 @@ public bool HasKey(TKey key)
keySpan = keyBufferWriter.WrittenSpan;
}

return _rocksDb.HasKey(keySpan, _columnFamilyHandle);
return _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle);
}
finally
{
Expand All @@ -349,5 +349,11 @@ public bool HasKey(TKey key)
}
}
}

public void Clear()
{
_rocksDbContext.Db.DropColumnFamily(_columnFamily.Name);
_columnFamily.Handle = _rocksDbContext.Db.CreateColumnFamily(_rocksDbContext.ColumnFamilyOptions, _columnFamily.Name);
}
}

4 changes: 2 additions & 2 deletions src/RocksDb.Extensions/RocksDbBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public IRocksDbBuilder AddStore<TKey, TValue, TStore>(string columnFamily) where
var keySerializer = CreateSerializer<TKey>(rocksDbOptions.Value.SerializerFactories);
var valueSerializer = CreateSerializer<TValue>(rocksDbOptions.Value.SerializerFactories);
var rocksDbAccessor = new RocksDbAccessor<TKey, TValue>(
rocksDbContext.Db,
columnFamilyHandle,
rocksDbContext,
new ColumnFamily(columnFamilyHandle, columnFamily),
keySerializer,
valueSerializer
);
Expand Down
27 changes: 15 additions & 12 deletions src/RocksDb.Extensions/RocksDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal class RocksDbContext : IDisposable
{
private readonly RocksDbSharp.RocksDb _rocksDb;
private readonly Cache _cache;
private readonly ColumnFamilyOptions _userSpecifiedOptions;

private const long BlockCacheSize = 50 * 1024 * 1024L;
private const long BlockSize = 4096L;
Expand All @@ -16,23 +17,23 @@ internal class RocksDbContext : IDisposable
public RocksDbContext(IOptions<RocksDbOptions> options)
{
var dbOptions = new DbOptions();
var userSpecifiedOptions = new ColumnFamilyOptions();
_userSpecifiedOptions = new ColumnFamilyOptions();
var tableConfig = new BlockBasedTableOptions();
_cache = Cache.CreateLru(BlockCacheSize);
tableConfig.SetBlockCache(_cache);
tableConfig.SetBlockSize(BlockSize);

var filter = BloomFilterPolicy.Create();
tableConfig.SetFilterPolicy(filter);
userSpecifiedOptions.SetBlockBasedTableFactory(tableConfig);
userSpecifiedOptions.SetWriteBufferSize(WriteBufferSize);
userSpecifiedOptions.SetCompression(Compression.No);
userSpecifiedOptions.SetCompactionStyle(Compaction.Universal);
userSpecifiedOptions.SetMaxWriteBufferNumberToMaintain(MaxWriteBuffers);
userSpecifiedOptions.SetCreateIfMissing();
userSpecifiedOptions.SetCreateMissingColumnFamilies();
userSpecifiedOptions.SetErrorIfExists(false);
userSpecifiedOptions.SetInfoLogLevel(InfoLogLevel.Error);
_userSpecifiedOptions.SetBlockBasedTableFactory(tableConfig);
_userSpecifiedOptions.SetWriteBufferSize(WriteBufferSize);
_userSpecifiedOptions.SetCompression(Compression.No);
_userSpecifiedOptions.SetCompactionStyle(Compaction.Universal);
_userSpecifiedOptions.SetMaxWriteBufferNumberToMaintain(MaxWriteBuffers);
_userSpecifiedOptions.SetCreateIfMissing();
_userSpecifiedOptions.SetCreateMissingColumnFamilies();
_userSpecifiedOptions.SetErrorIfExists(false);
_userSpecifiedOptions.SetInfoLogLevel(InfoLogLevel.Error);

// this is the recommended way to increase parallelism in RocksDb
// note that the current implementation of setIncreaseParallelism affects the number
Expand All @@ -51,9 +52,9 @@ public RocksDbContext(IOptions<RocksDbOptions> options)
var writeOptions = new WriteOptions();
writeOptions.DisableWal(1);

userSpecifiedOptions.EnableStatistics();
_userSpecifiedOptions.EnableStatistics();

var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies, userSpecifiedOptions);
var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies, _userSpecifiedOptions);

if (options.Value.DeleteExistingDatabaseOnStartup)
{
Expand All @@ -71,6 +72,8 @@ private static void DestroyDatabase(string path)

public RocksDbSharp.RocksDb Db => _rocksDb;

public ColumnFamilyOptions ColumnFamilyOptions => _userSpecifiedOptions;

private static ColumnFamilies CreateColumnFamilies(IReadOnlyList<string> columnFamilyNames,
ColumnFamilyOptions columnFamilyOptions)
{
Expand Down
61 changes: 61 additions & 0 deletions test/RocksDb.Extensions.Tests/ClearStoreTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using NUnit.Framework;
using RocksDb.Extensions.Tests.Utils;
using Shouldly;

namespace RocksDb.Extensions.Tests;

public class ClearStoreTests
{
[Test]
public void should_reset_store_range_data_to_store()
{
// Setup RocksDbStore
using var testFixture = CreateTestFixture<int, string>();

// Put some data
var store = testFixture.GetStore<RocksDbGenericStore<int, string>>();
var cacheKeys = Enumerable.Range(0, 100)
.Select(x => (key: x, value: x.ToString()))
.ToList();

store.PutRange(cacheKeys);

// Verify that data was added
foreach (var (key, expectedValue) in cacheKeys)
{
store.HasKey(key).ShouldBeTrue();
store.TryGet(key, out var value).ShouldBeTrue();
value.ShouldBe(expectedValue);
}

// Clear the store
store.Clear();

// Verify that data is no longer there
foreach (var (key, expectedValue) in cacheKeys)
{
store.HasKey(key).ShouldBeFalse();
store.TryGet(key, out _).ShouldBeFalse();
}

// Try to put the data again
store.PutRange(cacheKeys);

// Verify that data is available again
foreach (var (key, expectedValue) in cacheKeys)
{
store.HasKey(key).ShouldBeTrue();
store.TryGet(key, out var value).ShouldBeTrue();
value.ShouldBe(expectedValue);
}
}

private static TestFixture CreateTestFixture<TKey, TValue>()
{
var testFixture = TestFixture.Create(rockDb =>
{
_ = rockDb.AddStore<TKey, TValue, RocksDbGenericStore<TKey, TValue>>("my-store");
});
return testFixture;
}
}
Loading