diff --git a/benchmark_cache_test.go b/benchmark_cache_test.go new file mode 100644 index 00000000..3dfebca5 --- /dev/null +++ b/benchmark_cache_test.go @@ -0,0 +1,167 @@ +package gubernator_test + +import ( + "strconv" + "sync" + "testing" + "time" + + "github.com/mailgun/gubernator/v2" + "github.com/mailgun/holster/v4/clock" +) + +func BenchmarkCache(b *testing.B) { + testCases := []struct{ + Name string + NewTestCache func() gubernator.Cache + LockRequired bool + }{ + { + Name: "LRUCache", + NewTestCache: func() gubernator.Cache{ + return gubernator.NewLRUCache(0) + }, + LockRequired: true, + }, + // { + // Name: "SyncLRUCache", + // NewTestCache: func() gubernator.Cache{ + // return gubernator.NewSyncLRUCache(0) + // }, + // LockRequired: false, + // }, + } + + for _, testCase := range testCases { + b.Run(testCase.Name, func(b *testing.B) { + b.Run("Sequential reads", func(b *testing.B) { + cache := testCase.NewTestCache() + expire := clock.Now().Add(time.Hour).UnixMilli() + + for i := 0; i < b.N; i++ { + key := strconv.Itoa(i) + item := &gubernator.CacheItem{ + Key: key, + Value: i, + ExpireAt: expire, + } + cache.Add(item) + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + key := strconv.Itoa(i) + _, _ = cache.GetItem(key) + } + }) + + b.Run("Sequential writes", func(b *testing.B) { + cache := testCase.NewTestCache() + expire := clock.Now().Add(time.Hour).UnixMilli() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + item := &gubernator.CacheItem{ + Key: strconv.Itoa(i), + Value: i, + ExpireAt: expire, + } + cache.Add(item) + } + }) + + b.Run("Concurrent reads", func(b *testing.B) { + cache := testCase.NewTestCache() + expire := clock.Now().Add(time.Hour).UnixMilli() + + for i := 0; i < b.N; i++ { + key := strconv.Itoa(i) + item := &gubernator.CacheItem{ + Key: key, + Value: i, + ExpireAt: expire, + } + cache.Add(item) + } + + var wg sync.WaitGroup + var mutex sync.Mutex + var task func(i int) + + if testCase.LockRequired { + task = func(i int) { + mutex.Lock() + defer mutex.Unlock() + key := strconv.Itoa(i) + _, _ = cache.GetItem(key) + wg.Done() + } + } else { + task = func(i int) { + key := strconv.Itoa(i) + _, _ = cache.GetItem(key) + wg.Done() + } + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + go task(i) + } + + wg.Wait() + }) + + b.Run("Concurrent writes", func(b *testing.B) { + cache := testCase.NewTestCache() + expire := clock.Now().Add(time.Hour).UnixMilli() + + var wg sync.WaitGroup + var mutex sync.Mutex + var task func(i int) + + if testCase.LockRequired { + task = func(i int) { + mutex.Lock() + defer mutex.Unlock() + item := &gubernator.CacheItem{ + Key: strconv.Itoa(i), + Value: i, + ExpireAt: expire, + } + cache.Add(item) + wg.Done() + } + } else { + task = func(i int) { + item := &gubernator.CacheItem{ + Key: strconv.Itoa(i), + Value: i, + ExpireAt: expire, + } + cache.Add(item) + wg.Done() + } + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + go task(i) + } + + wg.Wait() + }) + + }) + } +} diff --git a/daemon.go b/daemon.go index b77db11c..c0f343da 100644 --- a/daemon.go +++ b/daemon.go @@ -88,7 +88,7 @@ func (s *Daemon) Start(ctx context.Context) error { s.promRegister.Register(cacheCollector) cacheFactory := func(maxSize int) Cache { - cache := NewSyncLRUCache(maxSize) + cache := NewLRUCache(maxSize) cacheCollector.AddCache(cache) return cache } diff --git a/sync_lrucache.go b/sync_lrucache.go deleted file mode 100644 index 3e20bb88..00000000 --- a/sync_lrucache.go +++ /dev/null @@ -1,171 +0,0 @@ -/* -Copyright 2018-2022 Mailgun Technologies Inc - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gubernator - -type SyncLRUCache struct { - cache *LRUCache - done chan struct{} - addRequest chan syncLRUCacheAddRequest - addResponse chan syncLRUCacheAddResponse - updateExpirationRequest chan syncLRUCacheUpdateExpirationRequest - updateExpirationResponse chan syncLRUCacheUpdateExpirationResponse - getItemRequest chan syncLRUCacheGetItemRequest - getItemResponse chan syncLRUCacheGetItemResponse - eachRequest chan syncLRUCacheEachRequest - eachResponse chan syncLRUCacheEachResponse - removeRequest chan syncLRUCacheRemoveRequest - removeResponse chan syncLRUCacheRemoveResponse - sizeRequest chan syncLRUCacheSizeRequest - sizeResponse chan syncLRUCacheSizeResponse -} - -type syncLRUCacheAddRequest struct { - Item *CacheItem -} - -type syncLRUCacheAddResponse struct { - Exists bool -} - -type syncLRUCacheUpdateExpirationRequest struct { - Key string - ExpireAt int64 -} - -type syncLRUCacheUpdateExpirationResponse struct { - Ok bool -} - -type syncLRUCacheGetItemRequest struct { - Key string -} - -type syncLRUCacheGetItemResponse struct { - Item *CacheItem - Ok bool -} - -type syncLRUCacheEachRequest struct{} - -type syncLRUCacheEachResponse struct { - Each chan *CacheItem -} - -type syncLRUCacheRemoveRequest struct { - Key string -} - -type syncLRUCacheRemoveResponse struct{} - -type syncLRUCacheSizeRequest struct{} - -type syncLRUCacheSizeResponse struct { - Size int64 -} - -// Thread-safe implementation of `LRUCache`. -func NewSyncLRUCache(maxSize int) *SyncLRUCache { - c := &SyncLRUCache{ - cache: NewLRUCache(maxSize), - done: make(chan struct{}), - addRequest: make(chan syncLRUCacheAddRequest), - addResponse: make(chan syncLRUCacheAddResponse), - updateExpirationRequest: make(chan syncLRUCacheUpdateExpirationRequest), - updateExpirationResponse: make(chan syncLRUCacheUpdateExpirationResponse), - getItemRequest: make(chan syncLRUCacheGetItemRequest), - getItemResponse: make(chan syncLRUCacheGetItemResponse), - eachRequest: make(chan syncLRUCacheEachRequest), - eachResponse: make(chan syncLRUCacheEachResponse), - removeRequest: make(chan syncLRUCacheRemoveRequest), - removeResponse: make(chan syncLRUCacheRemoveResponse), - sizeRequest: make(chan syncLRUCacheSizeRequest), - sizeResponse: make(chan syncLRUCacheSizeResponse), - } - - go c.listen() - - return c -} - -func (c *SyncLRUCache) listen() { - // Listen for requests and dispatch to underlying cache object. - for { - select { - case request := <-c.addRequest: - exists := c.cache.Add(request.Item) - c.addResponse <- syncLRUCacheAddResponse{Exists: exists} - case request := <-c.updateExpirationRequest: - ok := c.cache.UpdateExpiration(request.Key, request.ExpireAt) - c.updateExpirationResponse <- syncLRUCacheUpdateExpirationResponse{Ok: ok} - case request := <-c.getItemRequest: - item, ok := c.cache.GetItem(request.Key) - c.getItemResponse <- syncLRUCacheGetItemResponse{Item: item, Ok: ok} - case <-c.eachRequest: - each := c.cache.Each() - c.eachResponse <- syncLRUCacheEachResponse{Each: each} - case request := <-c.removeRequest: - c.cache.Remove(request.Key) - c.removeResponse <- syncLRUCacheRemoveResponse{} - case <-c.sizeRequest: - size := c.cache.Size() - c.sizeResponse <- syncLRUCacheSizeResponse{Size: size} - case <-c.done: - return - } - } -} - -func (c *SyncLRUCache) Add(item *CacheItem) bool { - c.addRequest <- syncLRUCacheAddRequest{Item: item} - response := <-c.addResponse - return response.Exists -} - -func (c *SyncLRUCache) UpdateExpiration(key string, expireAt int64) bool { - c.updateExpirationRequest <- syncLRUCacheUpdateExpirationRequest{Key: key, ExpireAt: expireAt} - response := <-c.updateExpirationResponse - return response.Ok -} - -func (c *SyncLRUCache) GetItem(key string) (*CacheItem, bool) { - c.getItemRequest <- syncLRUCacheGetItemRequest{Key: key} - response := <-c.getItemResponse - return response.Item, response.Ok -} - -func (c *SyncLRUCache) Each() chan *CacheItem { - c.eachRequest <- syncLRUCacheEachRequest{} - response := <-c.eachResponse - return response.Each -} - -func (c *SyncLRUCache) Remove(key string) { - c.removeRequest <- syncLRUCacheRemoveRequest{Key: key} - <-c.removeResponse -} - -// Returns the number of items in the cache. -func (c *SyncLRUCache) Size() int64 { - c.sizeRequest <- syncLRUCacheSizeRequest{} - response := <-c.sizeResponse - return response.Size -} - -func (c *SyncLRUCache) Close() error { - close(c.done) - return nil -} diff --git a/sync_lrucache_test.go b/sync_lrucache_test.go deleted file mode 100644 index f3621637..00000000 --- a/sync_lrucache_test.go +++ /dev/null @@ -1,405 +0,0 @@ -/* -Copyright 2018-2022 Mailgun Technologies Inc - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package gubernator_test - -import ( - "strconv" - "sync" - "testing" - "time" - - "github.com/mailgun/gubernator/v2" - "github.com/mailgun/holster/v4/clock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSyncLRUCache(t *testing.T) { - const iterations = 1000 - const concurrency = 100 - - t.Run("Happy path", func(t *testing.T) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - - // Populate cache. - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - exists := syncCache.Add(item) - assert.False(t, exists) - } - - // Validate cache. - assert.Equal(t, int64(iterations), syncCache.Size()) - - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item, ok := syncCache.GetItem(key) - assert.True(t, ok) - require.NotNil(t, item) - assert.Equal(t, item.Value, i) - } - - // Clear cache. - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - syncCache.Remove(key) - } - - assert.Zero(t, syncCache.Size()) - }) - - t.Run("Concurrent reads", func(t *testing.T) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - - // Populate cache. - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - exists := syncCache.Add(item) - assert.False(t, exists) - } - - assert.Equal(t, int64(iterations), syncCache.Size()) - var launchWg, doneWg sync.WaitGroup - launchWg.Add(1) - - for thread := 0; thread < concurrency; thread++ { - doneWg.Add(1) - - go func() { - defer doneWg.Done() - launchWg.Wait() - - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item, ok := syncCache.GetItem(key) - assert.True(t, ok) - require.NotNil(t, item) - assert.Equal(t, item.Value, i) - } - }() - } - - // Wait for goroutines to finish. - launchWg.Done() - doneWg.Wait() - }) - - t.Run("Concurrent writes", func(t *testing.T) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - var launchWg, doneWg sync.WaitGroup - launchWg.Add(1) - - for thread := 0; thread < concurrency; thread++ { - doneWg.Add(1) - - go func() { - defer doneWg.Done() - launchWg.Wait() - - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - syncCache.Add(item) - } - }() - } - - // Wait for goroutines to finish. - launchWg.Done() - doneWg.Wait() - }) - - t.Run("Concurrent reads and writes", func(t *testing.T) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - - // Populate cache. - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - exists := syncCache.Add(item) - assert.False(t, exists) - } - - assert.Equal(t, int64(iterations), syncCache.Size()) - var launchWg, doneWg sync.WaitGroup - launchWg.Add(1) - - for thread := 0; thread < concurrency; thread++ { - doneWg.Add(2) - - go func() { - defer doneWg.Done() - launchWg.Wait() - - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item, ok := syncCache.GetItem(key) - assert.True(t, ok) - require.NotNil(t, item) - assert.Equal(t, item.Value, i) - } - }() - - go func() { - defer doneWg.Done() - launchWg.Wait() - - for i := 0; i < iterations; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - syncCache.Add(item) - } - }() - } - - // Wait for goroutines to finish. - launchWg.Done() - doneWg.Wait() - }) -} - -func BenchmarkSyncLRUCache(b *testing.B) { - b.Run("Sequential reads", func(b *testing.B) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - - // Populate cache. - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - exists := syncCache.Add(item) - assert.False(b, exists) - } - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - _, _ = syncCache.GetItem(key) - } - }) - - b.Run("Sequential writes", func(b *testing.B) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - _ = syncCache.Add(item) - } - }) - - b.Run("Concurrent reads", func(b *testing.B) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - - // Populate cache. - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - exists := syncCache.Add(item) - assert.False(b, exists) - } - - var launchWg, doneWg sync.WaitGroup - launchWg.Add(1) - - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - doneWg.Add(1) - - go func() { - defer doneWg.Done() - launchWg.Wait() - - _, _ = syncCache.GetItem(key) - }() - } - - b.ReportAllocs() - b.ResetTimer() - launchWg.Done() - doneWg.Wait() - }) - - b.Run("Concurrent writes", func(b *testing.B) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - var launchWg, doneWg sync.WaitGroup - launchWg.Add(1) - - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - doneWg.Add(1) - - go func(i int) { - defer doneWg.Done() - launchWg.Wait() - - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - _ = syncCache.Add(item) - }(i) - } - - b.ReportAllocs() - b.ResetTimer() - launchWg.Done() - doneWg.Wait() - }) - - b.Run("Concurrent reads and writes of existing keys", func(b *testing.B) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - var launchWg, doneWg sync.WaitGroup - launchWg.Add(1) - - // Populate cache. - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - exists := syncCache.Add(item) - assert.False(b, exists) - } - - for i := 0; i < b.N; i++ { - key := strconv.Itoa(i) - doneWg.Add(2) - - go func() { - defer doneWg.Done() - launchWg.Wait() - - _, _ = syncCache.GetItem(key) - }() - - go func(i int) { - defer doneWg.Done() - launchWg.Wait() - - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - _ = syncCache.Add(item) - }(i) - } - - b.ReportAllocs() - b.ResetTimer() - launchWg.Done() - doneWg.Wait() - }) - - b.Run("Concurrent reads and writes of non-existent keys", func(b *testing.B) { - syncCache := gubernator.NewSyncLRUCache(0) - defer syncCache.Close() - expireAt := clock.Now().Add(1 * time.Hour).UnixMilli() - var launchWg, doneWg sync.WaitGroup - launchWg.Add(1) - - for i := 0; i < b.N; i++ { - doneWg.Add(2) - - go func() { - defer doneWg.Done() - launchWg.Wait() - - key := strconv.Itoa(i) - _, _ = syncCache.GetItem(key) - }() - - go func(i int) { - defer doneWg.Done() - launchWg.Wait() - - key := "z" + strconv.Itoa(i) - item := &gubernator.CacheItem{ - Key: key, - Value: i, - ExpireAt: expireAt, - } - _ = syncCache.Add(item) - }(i) - } - - b.ReportAllocs() - b.ResetTimer() - launchWg.Done() - doneWg.Wait() - }) -}