Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1490: Refactor getRateLimits() to call checkHandlerPool.
Browse files Browse the repository at this point in the history
`checkHandlerPool` is a worker pool equivalent of `getRateLimits()` that uses a consistent hash to assign keys to workers.
The workers don't share data and process requests sequentially, so there is no need for mutex locking.
  • Loading branch information
Baliedge committed Dec 7, 2021
1 parent c6dfa8a commit 26fb787
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
gubernator
gubernator-cli
5 changes: 5 additions & 0 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (

"github.com/mailgun/gubernator/v2/tracing"
"github.com/mailgun/holster/v4/clock"
"github.com/prometheus/client_golang/prometheus"
)

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
tokenBucketTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

getSpan, _ := tracing.StartNamedSpan(ctx, "c.GetItem()")
item, ok := c.GetItem(r.HashKey())
Expand Down Expand Up @@ -231,6 +234,8 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
leakyBucketTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

if r.Burst == 0 {
r.Burst = r.Limit
Expand Down
3 changes: 3 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Cache interface {
Lock()

Close() error

// Create a new instance of the current cache implementation.
New() Cache
}

type CacheItem struct {
Expand Down
196 changes: 196 additions & 0 deletions check_handler_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package gubernator

// Threadsafe worker pool for handling concurrent GetRateLimit requests.
// Ensures requests are synchronized to avoid caching conflicts.
// Handle concurrent requests by sharding cache key space across multiple
// workers.
// Uses hash ring design pattern to distribute requests to an assigned worker.
// No mutex locking necessary because each worker has its own data space and
// processes requests sequentially.

import (
"context"
"fmt"
"io"
"sort"

"github.com/OneOfOne/xxhash"
"github.com/mailgun/gubernator/v2/tracing"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type checkHandlerPool struct {
// List of channels. Must be sorted by hash value.
reqChans []checkRequestChannel

done chan struct{}
}

type checkRequestChannel struct {
hash uint64
ch *chan *request
}

var _ io.Closer = &checkHandlerPool{}

func newCheckHandlerPool(conf Config, concurrency int) *checkHandlerPool {
const HandlerChannelSize = 1000

chp := &checkHandlerPool{
done: make(chan struct{}),
}

for i := 0; i < concurrency; i++ {
ch := make(chan *request, HandlerChannelSize)
go chp.worker(conf, ch)
chp.addChannel(&ch)
}

return chp
}

func (chp *checkHandlerPool) Close() error {
close(chp.done)
return nil
}

// Pool worker for processing GetRateLimit requests.
// Each worker maintains its own cache.
// A hash ring will distribute requests to an assigned worker by key.
func (chp *checkHandlerPool) worker(conf Config, ch chan *request) {
localCache := conf.Cache.New()

for {
select {
case handlerReq, ok := <-ch:
if !ok {
// Channel closed.
// Unexpected, but should be handled.
logrus.Error("checkHandlerPool worker stopped because channel closed")
return
}

chp.handleRequest(&conf, localCache, handlerReq)

case <-chp.done:
// Clean up.
return
}
}
}

// Handle request received by worker.
func (chp *checkHandlerPool) handleRequest(conf *Config, cache Cache, handlerRequest *request) {
span, ctx := tracing.StartSpan(handlerRequest.ctx)
defer span.Finish()

var rlResponse *RateLimitResp
var err error

switch handlerRequest.request.Algorithm {
case Algorithm_TOKEN_BUCKET:
rlResponse, err = tokenBucket(ctx, conf.Store, cache, handlerRequest.request)
if err != nil {
msg := "Error in tokenBucket"
countCheckError(err, msg)
err = errors.Wrap(err, msg)
ext.LogError(span, err)
}

case Algorithm_LEAKY_BUCKET:
rlResponse, err = leakyBucket(ctx, conf.Store, cache, handlerRequest.request)
if err != nil {
msg := "Error in leakyBucket"
countCheckError(err, msg)
err = errors.Wrap(err, msg)
ext.LogError(span, err)
}

default:
err = errors.Errorf("Invalid rate limit algorithm '%d'", handlerRequest.request.Algorithm)
ext.LogError(span, err)
checkErrorCounter.WithLabelValues("Invalid algorithm").Add(1)
}

handlerResponse := &response{
rl: rlResponse,
err: err,
}

select {
case handlerRequest.resp <- handlerResponse:
// Success.
case <-ctx.Done():
// Silently exit on context cancel. Caller must handle error.
}
}

// Send a GetRateLimit request to worker pool.
func (chp *checkHandlerPool) Handle(ctx context.Context, rlRequest *RateLimitReq) (*RateLimitResp, error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

// Delegate request to assigned channel based on request key.
ch := chp.getChannel(rlRequest.UniqueKey)
handlerRequest := &request{
resp: make(chan *response, 1),
request: rlRequest,
ctx: ctx,
}

// Send request.
select {
case *ch <- handlerRequest:
// Successfully sent request.
case <-ctx.Done():
checkErrorCounter.WithLabelValues("Timeout").Add(1)
return nil, errors.Wrap(ctx.Err(), "Error sending request to checkHandlerPool")
}

// Wait for response.
select {
case handlerResponse := <-handlerRequest.resp:
// Successfully read response.
return handlerResponse.rl, handlerResponse.err
case <-ctx.Done():
checkErrorCounter.WithLabelValues("Timeout").Add(1)
return nil, errors.Wrap(ctx.Err(), "Error reading response from checkHandlerPool")
}
}

// Add a request channel to the worker pool.
func (chp *checkHandlerPool) addChannel(ch *chan *request) {
// Generate a hash based off the channel pointer.
// This hash value is the beginning range of a hash ring node.
key := fmt.Sprintf("%x", ch)
hash := xxhash.ChecksumString64S(key, 0)
chp.reqChans = append(chp.reqChans, checkRequestChannel{
hash: hash,
ch: ch,
})

// Ensure keys array is sorted by hash value.
sort.Slice(chp.reqChans, func(a, b int) bool {
return chp.reqChans[a].hash < chp.reqChans[b].hash
})
}

// Returns the request channel associated with the key.
// Hash the key, then lookup hash ring to find the channel.
func (chp *checkHandlerPool) getChannel(key string) *chan *request {
hash := xxhash.ChecksumString64S(key, 0)

// Binary search for appropriate channel.
idx := sort.Search(len(chp.reqChans), func(i int) bool {
return chp.reqChans[i].hash >= hash
})

// Means we have cycled back to the first.
if idx == len(chp.reqChans) {
idx = 0
}

return chp.reqChans[idx].ch
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
)

require (
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/ahmetb/go-linq v3.0.0+incompatible h1:qQkjjOXKrKOTy83X8OpRmnKflXKQIL/mC/gMVVDMhOA=
Expand Down
65 changes: 28 additions & 37 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package gubernator
import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -51,6 +52,7 @@ type V1Instance struct {
conf Config
isClosed bool
getRateLimitsCounter int64
checkHandlerPool *checkHandlerPool
}

var getRateLimitCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -130,6 +132,8 @@ func NewV1Instance(conf Config) (*V1Instance, error) {
}
setter.SetDefault(&s.log, logrus.WithField("category", "gubernator"))

numCpus := runtime.NumCPU()
s.checkHandlerPool = newCheckHandlerPool(conf, numCpus)
s.global = newGlobalManager(conf.Behaviors, &s)
s.mutliRegion = newMultiRegionManager(conf.Behaviors, &s)

Expand Down Expand Up @@ -167,15 +171,34 @@ func (s *V1Instance) Close() error {
s.global.Close()
s.mutliRegion.Close()

// Write cache to store.
err := s.storeCache()
if err != nil {
logrus.WithError(err).Error("Error in storeCache")
return errors.Wrap(err, "Error in storeCache")
}

err = s.checkHandlerPool.Close()
if err != nil {
logrus.WithError(err).Error("Error in checkHandlerPool.Close")
return errors.Wrap(err, "Error in checkHandlerPool.Close")
}

s.isClosed = true
return nil
}

// Write cache to store.
func (s *V1Instance) storeCache() error {
// TODO: Update for checkHandlerPool.
out := make(chan CacheItem, 500)

go func() {
for item := range s.conf.Cache.Each() {
out <- item
}
close(out)
}()
s.isClosed = true

return s.conf.Loader.Save(out)
}

Expand Down Expand Up @@ -399,6 +422,7 @@ func (s *V1Instance) asyncRequests(ctx context.Context, req *AsyncReq) {
// getGlobalRateLimit handles rate limits that are marked as `Behavior = GLOBAL`. Rate limit responses
// are returned from the local cache and the hits are queued to be sent to the owning peer.
func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) (*RateLimitResp, error) {
// TODO: Update for checkHandlerPool.
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -441,6 +465,7 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq)
// UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only
// be called by a peer who is the owner of a global rate limit.
func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) {
// TODO: Update for checkHandlerPool.
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -556,9 +581,6 @@ func (s *V1Instance) getRateLimit(ctx context.Context, r *RateLimitReq) (*RateLi
defer requestTimer.ObserveDuration()
checkCounter.Add(1)

s.conf.Cache.Lock()
defer s.conf.Cache.Unlock()

if HasBehavior(r.Behavior, Behavior_GLOBAL) {
s.global.QueueUpdate(r)
tracing.LogInfo(span, "s.global.QueueUpdate(r)")
Expand All @@ -569,38 +591,7 @@ func (s *V1Instance) getRateLimit(ctx context.Context, r *RateLimitReq) (*RateLi
tracing.LogInfo(span, "s.mutliRegion.QueueHits(r)")
}

switch r.Algorithm {
case Algorithm_TOKEN_BUCKET:
tokenBucketTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.getRateLimit_tokenBucket"))
defer tokenBucketTimer.ObserveDuration()
resp, err := tokenBucket(ctx, s.conf.Store, s.conf.Cache, r)
if err != nil {
msg := "Error in tokenBucket"
err2 := errors.Wrap(err, msg)
ext.LogError(span, err2)
countCheckError(err, msg)
return nil, err2
}
return resp, nil

case Algorithm_LEAKY_BUCKET:
leakyBucketTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()
resp, err := leakyBucket(ctx, s.conf.Store, s.conf.Cache, r)
if err != nil {
msg := "Error in leakyBucket"
err2 := errors.Wrap(err, msg)
ext.LogError(span, err2)
countCheckError(err, msg)
return nil, err2
}
return resp, nil
}

err := fmt.Errorf("Invalid rate limit algorithm '%d'", r.Algorithm)
ext.LogError(span, err)
checkErrorCounter.WithLabelValues("Invalid algorithm").Add(1)
return nil, err
return s.checkHandlerPool.Handle(ctx, r)
}

// SetPeers is called by the implementor to indicate the pool of peers has changed
Expand Down
4 changes: 4 additions & 0 deletions lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,7 @@ func (c *LRUCache) Close() error {
c.ll = nil
return nil
}

func (c *LRUCache) New() Cache {
return NewLRUCache(c.cacheSize)
}
Loading

0 comments on commit 26fb787

Please sign in to comment.