Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1490: Refactor unit tests on tokenBucket() in store_test.go.
Browse files Browse the repository at this point in the history
TODO: Need to build equivelent tests for `leakyBucket()`.
  • Loading branch information
Baliedge committed Jan 14, 2022
1 parent ea4af60 commit 9b2b8ec
Show file tree
Hide file tree
Showing 4 changed files with 432 additions and 177 deletions.
81 changes: 42 additions & 39 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

if ok {
// Item found in cache or store.
tracing.LogInfo(span, "Update existing rate limit")

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
Expand Down Expand Up @@ -107,8 +108,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
tracing.LogInfo(span, "s.Remove()")
}

// FIXME: Eliminate recursion.
return tokenBucket(ctx, s, c, r)
return tokenBucketNewItem(ctx, s, c, r)
}

if s != nil {
Expand All @@ -118,10 +118,10 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}()
}

// Update the limit if it changed
// Update the limit if it changed.
tracing.LogInfo(span, "Update the limit if changed")
if t.Limit != r.Limit {
// Add difference to remaining
// Add difference to remaining.
t.Remaining += r.Limit - t.Limit
if t.Remaining < 0 {
t.Remaining = 0
Expand All @@ -136,9 +136,9 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ResetTime: item.ExpireAt,
}

// If the duration config changed, update the new ExpireAt
// If the duration config changed, update the new ExpireAt.
if t.Duration != r.Duration {
tracing.LogInfo(span, "Duration config changed")
tracing.LogInfo(span, "Duration changed")
expire := t.CreatedAt + r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(clock.Now(), r.Duration)
Expand All @@ -147,31 +147,29 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}
}

// If our new duration means we are currently expired
if expire <= MillisecondNow() {
// Update this so s.OnChange() will get the new expire change
// If our new duration means we are currently expired.
now := MillisecondNow()
if expire <= now {
// Renew item.
tracing.LogInfo(span, "Limit has expired")
item.ExpireAt = expire

c.Remove(item.Key)
tracing.LogInfo(span, "c.Remove()")

// FIXME: tokenBucketNewItem creates a new item. But, we want to
// preserve this item for its CreatedAt timestamp.
return tokenBucketNewItem(ctx, s, c, r)
expire = now + r.Duration
t.CreatedAt = now
t.Remaining = t.Limit
}

item.ExpireAt = expire
t.Duration = r.Duration
rl.ResetTime = expire
}

// Client is only interested in retrieving the current status or updating the rate limit config
// Client is only interested in retrieving the current status or
// updating the rate limit config.
if r.Hits == 0 {
tracing.LogInfo(span, "Return current status, apply no change")
return rl, nil
}

// If we are already at the limit
// If we are already at the limit.
if rl.Remaining == 0 {
tracing.LogInfo(span, "Already over the limit")
overLimitCounter.Add(1)
Expand All @@ -180,15 +178,16 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
return rl, nil
}

// If requested hits takes the remainder
// If requested hits takes the remainder.
if t.Remaining == r.Hits {
tracing.LogInfo(span, "At the limit")
t.Remaining = 0
rl.Remaining = 0
return rl, nil
}

// If requested is more than available, then return over the limit without updating the cache.
// If requested is more than available, then return over the limit
// without updating the cache.
if r.Hits > t.Remaining {
tracing.LogInfo(span, "Over the limit")
overLimitCounter.Add(1)
Expand All @@ -202,40 +201,48 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
return rl, nil
}

// Item is not found in cache or store, create new.
return tokenBucketNewItem(ctx, s, c, r)
}

// Called by tokenBucket() when the requested item is not found in cache or store.
// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

// Add a new rate limit to the cache
tracing.LogInfo(span, "Add a new rate limit to the cache")
now := MillisecondNow()
expire := now + r.Duration

item := CacheItem{
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: now,
},
ExpireAt: expire,
}

// Add a new rate limit to the cache.
tracing.LogInfo(span, "Add a new rate limit to the cache")
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(clock.Now(), r.Duration)
if err != nil {
return nil, err
}
}

t := &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: now,
}

t := item.Value.(*TokenBucketItem)
rl := &RateLimitResp{
Status: Status_UNDER_LIMIT,
Limit: r.Limit,
Remaining: t.Remaining,
ResetTime: expire,
}

// Client could be requesting that we always return OVER_LIMIT
// Client could be requesting that we always return OVER_LIMIT.
if r.Hits > r.Limit {
tracing.LogInfo(span, "Over the limit")
overLimitCounter.Add(1)
Expand All @@ -244,20 +251,14 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
t.Remaining = r.Limit
}

item := CacheItem{
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: t,
ExpireAt: expire,
}

c.Add(item)
tracing.LogInfo(span, "c.Add()")

if s != nil {
s.OnChange(r, item)
tracing.LogInfo(span, "s.OnChange()")
}

return rl, nil
}

Expand Down Expand Up @@ -358,6 +359,8 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ResetTime: now + (b.Limit-int64(b.Remaining))*int64(rate),
}

// TODO: Feature missing: check for Duration change between item/request.

if s != nil {
defer func() {
s.OnChange(r, item)
Expand Down
2 changes: 1 addition & 1 deletion gubernator_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestGubernatorPool(t *testing.T) {
CacheFactory: func() guber.Cache {
return mockCache
},
Loader: mockLoader,
Loader: mockLoader,
}
conf.SetDefaults()
chp := guber.NewGubernatorPool(conf, testCase.concurrency)
Expand Down
47 changes: 47 additions & 0 deletions mock_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2018-2022 Mailgun Technologies Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gubernator_test

// Mock implementation of Store.

import (
guber "github.com/mailgun/gubernator/v2"
"github.com/stretchr/testify/mock"
)

type MockStore2 struct {
mock.Mock
}

var _ guber.Store = &MockStore2{}

func (m *MockStore2) OnChange(r *guber.RateLimitReq, item guber.CacheItem) {
m.Called(r, item)
}

func (m *MockStore2) Get(r *guber.RateLimitReq) (guber.CacheItem, bool) {
args := m.Called(r)
var retval guber.CacheItem
if retval2, ok := args.Get(0).(guber.CacheItem); ok {
retval = retval2
}
return retval, args.Bool(1)
}

func (m *MockStore2) Remove(key string) {
m.Called(key)
}
Loading

0 comments on commit 9b2b8ec

Please sign in to comment.