Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions db/revision_cache_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,10 @@ func (rc *LRURevisionCache) removeFromCacheByCV(ctx context.Context, docID strin
return
}
// grab the revid key from the value to enable us to remove the reference from the rev lookup map too
elem := element.Value.(*revCacheValue)
elem, ok := element.Value.(*revCacheValue)
if !ok {
return
}

legacyKey := IDAndRev{DocID: docID, RevID: elem.revID, CollectionID: collectionID}
rc.lruList.Remove(element)
Expand All @@ -582,7 +585,10 @@ func (rc *LRURevisionCache) removeFromCacheByRev(ctx context.Context, docID, rev
return
}
// grab the cv key from the value to enable us to remove the reference from the rev lookup map too
elem := element.Value.(*revCacheValue)
elem, ok := element.Value.(*revCacheValue)
if !ok {
return
}

hlvKey := IDandCV{DocID: docID, Source: elem.cv.SourceID, Version: elem.cv.Value, CollectionID: collectionID}
rc.lruList.Remove(element)
Expand Down Expand Up @@ -848,6 +854,7 @@ func (value *revCacheValue) store(docRev DocumentRevision) {

func (value *revCacheValue) updateDelta(toDelta RevisionDelta) (diffInBytes int64) {
value.lock.Lock()
defer value.lock.Unlock()
var previousDeltaBytes int64
if value.delta != nil {
// delta exists, need to pull this to update overall memory size correctly
Expand All @@ -858,7 +865,6 @@ func (value *revCacheValue) updateDelta(toDelta RevisionDelta) (diffInBytes int6
if diffInBytes != 0 {
value.itemBytes.Add(diffInBytes)
}
value.lock.Unlock()
return diffInBytes
}

Expand Down Expand Up @@ -912,17 +918,32 @@ func (rc *LRURevisionCache) revCacheMemoryBasedEviction(ctx context.Context) {

// performEviction will evict the oldest items in the cache till we are below the memory threshold
func (rc *LRURevisionCache) performEviction(ctx context.Context) {
numItemsRemoved := rc.evictBasedOffMemoryUsage(ctx)
rc.cacheNumItems.Add(-numItemsRemoved)
}

func (rc *LRURevisionCache) evictBasedOffMemoryUsage(ctx context.Context) int64 {
var numItemsRemoved, numBytesRemoved int64
rc.lock.Lock() // hold rev cache lock to remove items from cache until we're below memory threshold for the shard
rc.lock.Lock()
defer rc.lock.Unlock()
// check if we are over memory capacity after holding rev cache mutex (protect against another goroutine evicting whilst waiting for mutex above)
if currMemoryUsage := rc.currMemoryUsage.Value(); currMemoryUsage > rc.memoryCapacity {
// find amount of bytes needed to evict till below threshold
bytesNeededToRemove := currMemoryUsage - rc.memoryCapacity
for bytesNeededToRemove > numBytesRemoved {
value := rc._findEvictionValue()
if value == nil {
// no more values ready for eviction
break
if rc.lruList.Len() > 0 {
// no more values ready for eviction
break
} else {
// list is empty, nothing more to evict but stats are wrong so zero stats and return
base.DebugfCtx(ctx, base.KeyCache, "Revision cache memory stats inconsistent for this shard, resetting to zero")
correctionVal := rc.currMemoryUsage.Value()
rc.currMemoryUsage.Add(-correctionVal)
rc.cacheMemoryBytesStat.Add(-correctionVal)
Comment on lines 942 to 945
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be much clearer and potentially safer to use Set(0) for this than doing two separate Value and Add IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this, I have added some comments on this area, we can;t quite set cacheMemoryBytesStat to 0 given this is the memory usage across all shards.

break
}
}
revKey := IDAndRev{DocID: value.id, RevID: value.revID, CollectionID: value.collectionID}
hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.Value, CollectionID: value.collectionID}
Expand Down Expand Up @@ -953,8 +974,7 @@ func (rc *LRURevisionCache) performEviction(ctx context.Context) {
}
}
rc._decrRevCacheMemoryUsage(ctx, -numBytesRemoved) // need update rev cache memory stats before release lock to stop other goroutines evicting based on outdated stats
rc.lock.Unlock() // release lock after removing items from cache
rc.cacheNumItems.Add(-numItemsRemoved)
return numItemsRemoved
}

// _decrRevCacheMemoryUsage atomically decreases overall memory usage for cache and the actual rev cache objects usage.
Expand Down Expand Up @@ -991,7 +1011,13 @@ func (rc *LRURevisionCache) incrRevCacheMemoryUsage(ctx context.Context, bytesCo

func (rc *LRURevisionCache) _findEvictionValue() *revCacheValue {
evictionCandidate := rc.lruList.Back()
revItem := evictionCandidate.Value.(*revCacheValue)
if evictionCandidate == nil {
return nil
}
revItem, ok := evictionCandidate.Value.(*revCacheValue)
if !ok {
return nil
}

if revItem.canEvict.Load() {
rc.lruList.Remove(evictionCandidate)
Expand Down
39 changes: 39 additions & 0 deletions db/revision_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ licenses/APL2.txt.
package db

import (
"bytes"
"context"
"fmt"
"log"
Expand Down Expand Up @@ -2442,3 +2443,41 @@ func TestEvictionWhenStaleCVRemoved(t *testing.T) {
revValueRevEntry := valRevEntry.Value.(*revCacheValue)
assert.Equal(t, "2-abc", revValueRevEntry.revID)
}

func TestUpdateDeltaRevCacheMemoryStatPanic(t *testing.T) {
cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}
backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID)
cacheOptions := &RevisionCacheOptions{
MaxItemCount: 5000,
MaxBytes: 125,
}
cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted)

firstDelta := bytes.Repeat([]byte("a"), 1000)
ctx := base.TestCtx(t)

// Trigger load into cache
docRev, err := cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheIncludeDelta)
require.NoError(t, err, "Error adding to cache")

revCacheDelta := newRevCacheDelta(firstDelta, "1-abc", docRev, false, nil)

// UpdateDelta - start
value := cache.getValue(ctx, "doc1", "1-abc", testCollectionID, false)
if value != nil {
// Remove - start - drop value underneath UpdateDelta
cache.RemoveWithRev(ctx, "doc1", "1-abc", testCollectionID)
// Remove - end
outGoingBytes := value.updateDelta(revCacheDelta)
if outGoingBytes != 0 {
cache.currMemoryUsage.Add(outGoingBytes)
cache.cacheMemoryBytesStat.Add(outGoingBytes)
}
// check for memory based eviction
cache.revCacheMemoryBasedEviction(ctx)
}
// UpdateDelta - end

assert.Equal(t, 0, cache.lruList.Len())
assert.Equal(t, int64(0), memoryBytesCounted.Value())
}