Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1490: Rewrite unit tests for tokenBucket() and leakyBucket().
Browse files Browse the repository at this point in the history
Uses improved mocking tools.
  • Loading branch information
Baliedge committed Jan 17, 2022
1 parent 9b2b8ec commit 2e51c7c
Show file tree
Hide file tree
Showing 3 changed files with 406 additions and 314 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ Examples when using `Behavior = DURATION_IS_GREGORIAN`
* If `Duration = 0` (Minutes) then the rate limit will reset to `Current = 0` at the end of the minute the rate limit was created.
* If `Duration = 4` (Months) then the rate limit will reset to `Current = 0` at the end of the month the rate limit was created.

## Reset Remaining Behavior
Users may add behavior `Behavior_RESET_REMAINING` to the rate check request.
This will reset the rate limit as if created new on first use.

When using Reset Remaining, the `Hits` field should be 0.

## Gubernator as a library
If you are using golang, you can use Gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
Expand Down
83 changes: 67 additions & 16 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
tokenBucketTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

// Get definition from cache.
// Get rate limit from cache.
hashKey := r.HashKey()
item, ok := c.GetItem(hashKey)
tracing.LogInfo(span, "c.GetItem()")
Expand Down Expand Up @@ -214,7 +214,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
expire := now + r.Duration

item := CacheItem{
Algorithm: r.Algorithm,
Algorithm: Algorithm_TOKEN_BUCKET,
Key: r.HashKey(),
Value: &TokenBucketItem{
Limit: r.Limit,
Expand Down Expand Up @@ -274,33 +274,61 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

now := MillisecondNow()
item, ok := c.GetItem(r.HashKey())

if s != nil {
if !ok {
// Check our store for the item
item, ok = s.Get(r)
tracing.LogInfo(span, "c.Get()")
if ok {
c.Add(item)
tracing.LogInfo(span, "c.Add()")
}
// Get rate limit from cache.
hashKey := r.HashKey()
item, ok := c.GetItem(hashKey)
tracing.LogInfo(span, "c.GetItem()")

if s != nil && !ok {
// Cache miss.
// Check our store for the item.
if item, ok = s.Get(r); ok {
tracing.LogInfo(span, "Check store for rate limit")
c.Add(item)
tracing.LogInfo(span, "c.Add()")
}
}

// Sanity checks.
if ok {
if item.Value == nil {
msgPart := "leakyBucket: Invalid cache item; Value is nil"
tracing.LogInfo(span, msgPart,
"hashKey", hashKey,
"key", r.UniqueKey,
"name", r.Name,
)
logrus.Error(msgPart)
ok = false
} else if item.Key != hashKey {
msgPart := "leakyBucket: Invalid cache item; key mismatch"
tracing.LogInfo(span, msgPart,
"itemKey", item.Key,
"hashKey", hashKey,
"name", r.Name,
)
logrus.Error(msgPart)
ok = false
}
}

if ok {
// Item found in cache or store.
tracing.LogInfo(span, "Update existing rate limit")

b, ok := item.Value.(*LeakyBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(r.HashKey())
c.Remove(hashKey)
tracing.LogInfo(span, "c.Remove()")

if s != nil {
s.Remove(r.HashKey())
s.Remove(hashKey)
tracing.LogInfo(span, "s.Remove()")
}

return leakyBucket(ctx, s, c, r)
return leakyBucketNewItem(ctx, s, c, r)
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
Expand Down Expand Up @@ -399,10 +427,33 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
c.UpdateExpiration(r.HashKey(), now+duration)
c.UpdateExpiration(hashKey, now+duration)
return rl, nil
}

return leakyBucketNewItem(ctx, s, c, r)
}

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

now := MillisecondNow()
expire := now + r.Duration

item := CacheItem{
Algorithm: Algorithm_LEAKY_BUCKET,
Key: r.HashKey(),
Value: &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: now,
},
ExpireAt: expire,
}

duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand Down
Loading

0 comments on commit 2e51c7c

Please sign in to comment.