Skip to content

RedisTableCache #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions Sources/Linq2DynamoDb.DataContext/Caching/EnyimIndexCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,49 @@ public partial class EnyimTableCache
/// <summary>
/// Implements the process of creating and filling the index
/// </summary>
private class EnyimIndexCreator : IIndexCreator
private class EnyimIndexCreator : IndexCreator
{
private readonly EnyimTableCache _parent;
private EnyimTableCache _tableCache
{
get { return (EnyimTableCache) _parent; }
}

private TableIndex _index;
private readonly string _indexKey;
private readonly string _indexKeyInCache;
private ulong _indexVersionInCache;

internal EnyimIndexCreator(EnyimTableCache parent, string indexKeyInCache, SearchConditions searchConditions)
: base(parent, indexKeyInCache, searchConditions)
{
this._parent = parent;
this._index = new TableIndex(searchConditions);
this._indexKey = searchConditions.Key;
this._indexKeyInCache = indexKeyInCache;
}

public bool StartCreatingIndex()
public override bool StartCreatingIndex()
{
// Marking the index as being rebuilt.
// Note: we're using Set mode. This means, that if an index exists in cache, it will be
// overwritten. That's OK, because if an index exists in cache, then in most cases we
// will not be in this place (the data will be simply read from cache).
if (!this._parent._cacheClient.Store(StoreMode.Set, this._indexKeyInCache, this._index, this._parent._ttl))
if (!_tableCache._cacheClient.SetValue(this._indexKeyInCache, this._index))
{
this._parent._cacheClient.Remove(this._indexKeyInCache);
_tableCache._cacheClient.Remove(this._indexKeyInCache);
return false;
}

// (re)registering it in the list of indexes (it should be visible for update operations)
if (!this._parent.PutIndexToList(this._indexKey))
if (!_tableCache.PutIndexToList(this._indexKey))
{
this._parent._cacheClient.Remove(this._indexKeyInCache);
_tableCache._cacheClient.Remove(this._indexKeyInCache);
return false;
}

// remembering the index's current version
var casResult = this._parent._cacheClient.GetWithCas<TableIndex>(this._indexKeyInCache);
var casResult = _tableCache._memcachedClient.GetWithCas<TableIndex>(this._indexKeyInCache);
if
(
(casResult.StatusCode != 0)
||
(casResult.Result == null)
)
{
this._parent._cacheClient.Remove(this._indexKeyInCache);
_tableCache._cacheClient.Remove(this._indexKeyInCache);
return false;
}

Expand All @@ -64,8 +61,8 @@ public bool StartCreatingIndex()

public void AddEntityToIndex(EntityKey entityKey, Document doc)
{
string key = this._parent.GetEntityKeyInCache(entityKey);
if (key.Length > MaxKeyLength)
string key = _tableCache.GetEntityKeyInCache(entityKey);
if (key == null)
{
this._index = null;
return;
Expand All @@ -76,26 +73,26 @@ public void AddEntityToIndex(EntityKey entityKey, Document doc)

// Putting the entity to cache, but only if it doesn't exist there.
// That's because when loading from DynamoDb whe should never overwrite local updates.
this._parent._cacheClient.Store(StoreMode.Add, key, new CacheDocumentWrapper(doc), this._parent._ttl);
_tableCache._cacheClient.AddValue(key, new CacheDocumentWrapper(doc));
}

public void Dispose()
{
if (this._index == null)
{
this._parent._cacheClient.Remove(this._indexKeyInCache);
_tableCache._cacheClient.Remove(this._indexKeyInCache);
return;
}

this._index.IsBeingRebuilt = false;

// saving the index to cache only if it's version didn't change since we started reading results from DynamoDb
var casResult = this._parent._cacheClient.Cas
var casResult = _tableCache._memcachedClient.Cas
(
StoreMode.Replace,
this._indexKeyInCache,
this._index,
this._parent._ttl,
this._index,
_tableCache._cacheClient.DefaultTimeToLive,
this._indexVersionInCache
);

Expand Down
103 changes: 103 additions & 0 deletions Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Enyim.Caching;
using Enyim.Caching.Memcached;

namespace Linq2DynamoDb.DataContext.Caching
{
public class EnyimMemcachedClient : ICacheClient
{
MemcachedClient _cacheClient;
public TimeSpan DefaultTimeToLive { get; set; }

public EnyimMemcachedClient(MemcachedClient client, TimeSpan? defaultTtl = null)
{
_cacheClient = client;
DefaultTimeToLive = defaultTtl ?? TimeSpan.FromMinutes(15);
}

public bool Remove(string key)
{
return _cacheClient.Remove(key);
}

public bool TryRemove(string key)
{
var removeResult = this._cacheClient.ExecuteRemove(key);
return (removeResult.InnerResult == null) ||
(removeResult.InnerResult.Exception == null);
}

public bool TryGetValue<T>(string key, out T value)
{
value = default(T);
var result = _cacheClient.ExecuteGet<T>(key);
if (result.Success)
value = result.Value;

return result.Success;
}

public bool AddValue<T>(string key, T value)
{
return _cacheClient.Store(StoreMode.Add, key, value, DefaultTimeToLive);
}

public bool AddValue<T>(string key, T value, TimeSpan? timeToLive)
{
return _cacheClient.Store(StoreMode.Add, key, value, timeToLive ?? DefaultTimeToLive);
}

public bool AddValue<T>(string key, T value, DateTime? expiration)
{
return StoreWithExpiration(StoreMode.Add, key, value, expiration);
}

public bool SetValue<T>(string key, T value)
{
return _cacheClient.Store(StoreMode.Set, key, value, DefaultTimeToLive);
}

public bool SetValue<T>(string key, T value, TimeSpan? timeToLive)
{
return _cacheClient.Store(StoreMode.Set, key, value, timeToLive ?? DefaultTimeToLive);
}

public bool SetValue<T>(string key, T value, DateTime? expiration)
{
return StoreWithExpiration(StoreMode.Set, key, value, expiration);
}

public bool ReplaceValue<T>(string key, T value)
{
return _cacheClient.Store(StoreMode.Replace, key, value, DefaultTimeToLive);
}

public bool ReplaceValue<T>(string key, T value, TimeSpan? timeToLive)
{
return _cacheClient.Store(StoreMode.Replace, key, value, timeToLive ?? DefaultTimeToLive);
}

public bool ReplaceValue<T>(string key, T value, DateTime? expiration)
{
return StoreWithExpiration(StoreMode.Replace, key, value, expiration);
}

private bool StoreWithExpiration<T>(StoreMode mode, string key, T value, DateTime? expiration)
{
if (expiration.HasValue)
{
if (expiration.Value.Kind != DateTimeKind.Utc)
expiration = expiration.Value.ToUniversalTime();
}
else
{
expiration = DateTime.UtcNow + DefaultTimeToLive;
}
return _cacheClient.Store(mode, key, value, expiration.Value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,58 @@ public partial class EnyimTableCache
/// <summary>
/// Implements the process of creating and filling the projection (readonly) index
/// </summary>
private class EnyimProjectionIndexCreator : IIndexCreator
private class EnyimProjectionIndexCreator : ProjectionIndexCreator
{
private readonly EnyimTableCache _parent;
private EnyimTableCache _tableCache
{
get { return (EnyimTableCache) _parent; }
}

private readonly TableProjectionIndex _index;
private readonly string _indexKey;
private readonly string _indexKeyInCache;
private ulong _indexVersionInCache;

internal EnyimProjectionIndexCreator(EnyimTableCache parent, string indexKey, string indexKeyInCache, SearchConditions searchConditions)
: base(parent, indexKey, indexKeyInCache, searchConditions)
{
this._parent = parent;
this._index = new TableProjectionIndex(searchConditions);
this._indexKey = indexKey;
this._indexKeyInCache = indexKeyInCache;
}

public bool StartCreatingIndex()
public override bool StartCreatingIndex()
{
// Marking the index as being rebuilt.
// Note: we're using Set mode. This means, that if an index exists in cache, it will be
// overwritten. That's OK, because if an index exists in cache, then in most cases we
// will not be in this place (the data will be simply read from cache).
if (!this._parent._cacheClient.Store(StoreMode.Set, this._indexKeyInCache, this._index, this._parent._ttl))
if (!this._tableCache._cacheClient.SetValue(this._indexKeyInCache, this._index))
{
this._parent._cacheClient.Remove(this._indexKeyInCache);
this._tableCache._cacheClient.Remove(this._indexKeyInCache);
return false;
}

// (re)registering it in the list of indexes (it should be visible for update operations)
if (!this._parent.PutIndexToList(this._indexKey))
if (!this._tableCache.PutIndexToList(this._indexKey))
{
this._parent._cacheClient.Remove(this._indexKeyInCache);
this._tableCache._cacheClient.Remove(this._indexKeyInCache);
return false;
}

// remembering the index's current version
var casResult = this._parent._cacheClient.GetWithCas<TableIndex>(this._indexKeyInCache);
var casResult = this._tableCache._memcachedClient.GetWithCas<TableIndex>(this._indexKeyInCache);
if
(
(casResult.StatusCode != 0)
||
(casResult.Result == null)
)
{
this._parent._cacheClient.Remove(this._indexKeyInCache);
this._tableCache._cacheClient.Remove(this._indexKeyInCache);
return false;
}

this._indexVersionInCache = casResult.Cas;
this._parent.Log("Index ({0}) was marked as being rebuilt", (object)this._indexKey);
this._tableCache.Log("Index ({0}) was marked as being rebuilt", (object)this._indexKey);
return true;
}

public void AddEntityToIndex(EntityKey entityKey, Document doc)
public override void AddEntityToIndex(EntityKey entityKey, Document doc)
{
// when creating a projection index, the entity key should not be passed
Debug.Assert(entityKey == null);
Expand All @@ -72,27 +69,27 @@ public void AddEntityToIndex(EntityKey entityKey, Document doc)
this._index.AddEntity(doc);
}

public void Dispose()
public override void Dispose()
{
this._index.IsBeingRebuilt = false;

// saving the index to cache only if it's version didn't change since we started reading results from DynamoDb
var casResult = this._parent._cacheClient.Cas
var casResult = this._tableCache._memcachedClient.Cas
(
StoreMode.Replace,
this._indexKeyInCache,
this._index,
this._parent._ttl,
this._tableCache._cacheClient.DefaultTimeToLive,
this._indexVersionInCache
);

if (casResult.Result)
{
this._parent.Log("Index ({0}) with {1} entities saved to cache", (object)this._indexKey, this._index.Entities.Length);
this._tableCache.Log("Index ({0}) with {1} entities saved to cache", (object)this._indexKey, this._index.Entities.Length);
}
else
{
this._parent.Log("Index ({0}) wasn't saved to cache due to version conflict", (object)this._indexKey);
this._tableCache.Log("Index ({0}) wasn't saved to cache due to version conflict", (object)this._indexKey);
}
}
}
Expand Down
Loading