Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1490: Respect cache max size as total size across all workers, no…
Browse files Browse the repository at this point in the history
…t per worker.
  • Loading branch information
Baliedge committed Jan 19, 2022
1 parent 5bae456 commit 129e250
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 19 deletions.
2 changes: 1 addition & 1 deletion algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
Algorithm: Algorithm_TOKEN_BUCKET,
Key: r.HashKey(),
Value: t,
ExpireAt: expire,
ExpireAt: expire,
}

// Add a new rate limit to the cache.
Expand Down
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Config struct {
Behaviors BehaviorConfig

// (Optional) The cache implementation
CacheFactory func() Cache
CacheFactory func(maxSize int) Cache

// (Optional) A persistent store implementation. Allows the implementor the ability to store the rate limits this
// instance of gubernator owns. It's up to the implementor to decide what rate limits to persist.
Expand Down Expand Up @@ -134,8 +134,8 @@ func (c *Config) SetDefaults() error {
setter.SetDefault(&c.PoolWorkers, numCpus)

if c.CacheFactory == nil {
c.CacheFactory = func() Cache {
return NewSyncLRUCache(0)
c.CacheFactory = func(maxSize int) Cache {
return NewSyncLRUCache(maxSize)
}
}

Expand Down
4 changes: 2 additions & 2 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (s *Daemon) Start(ctx context.Context) error {
cacheCollector := NewLRUCacheCollector()
s.promRegister.Register(cacheCollector)

cacheFactory := func() Cache {
cache := NewSyncLRUCache(s.conf.CacheSize)
cacheFactory := func(maxSize int) Cache {
cache := NewSyncLRUCache(maxSize)
cacheCollector.AddCache(cache)
return cache
}
Expand Down
6 changes: 3 additions & 3 deletions grpc_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func NewGRPCStatsHandler() *GRPCStatsHandler {
Help: "The count of gRPC requests.",
}, []string{"status", "method"}),
grpcRequestDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "gubernator_grpc_request_duration",
Help: "The timings of gRPC requests in seconds",
Name: "gubernator_grpc_request_duration",
Help: "The timings of gRPC requests in seconds",
Objectives: map[float64]float64{
0.5: 0.05,
0.5: 0.05,
0.99: 0.001,
},
}, []string{"method"}),
Expand Down
2 changes: 1 addition & 1 deletion gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewV1Instance(conf Config) (*V1Instance, error) {
}
setter.SetDefault(&s.log, logrus.WithField("category", "gubernator"))

s.gubernatorPool = NewGubernatorPool(&conf, conf.PoolWorkers)
s.gubernatorPool = NewGubernatorPool(&conf, conf.PoolWorkers, 0)
s.global = newGlobalManager(conf.Behaviors, &s)
s.mutliRegion = newMultiRegionManager(conf.Behaviors, &s)

Expand Down
15 changes: 10 additions & 5 deletions gubernator_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ import (

"github.com/OneOfOne/xxhash"
"github.com/mailgun/gubernator/v2/tracing"
"github.com/mailgun/holster/v4/setter"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type GubernatorPool struct {
workers []*poolWorker
workers []*poolWorker
workerCacheSize int

// Workers in the hash ring. Must be sorted by hash value.
hashRing []poolHashRingNode
Expand Down Expand Up @@ -107,10 +109,13 @@ type poolGetCacheItemResponse struct {
var _ io.Closer = &GubernatorPool{}
var poolWorkerCounter int64

func NewGubernatorPool(conf *Config, concurrency int) *GubernatorPool {
func NewGubernatorPool(conf *Config, concurrency int, cacheSize int) *GubernatorPool {
setter.SetDefault(&cacheSize, 50_000)

chp := &GubernatorPool{
conf: conf,
done: make(chan struct{}),
workerCacheSize: cacheSize / concurrency,
conf: conf,
done: make(chan struct{}),
}

for i := 0; i < concurrency; i++ {
Expand All @@ -131,7 +136,7 @@ func (chp *GubernatorPool) addWorker() *poolWorker {
const commandChannelSize = 10000

worker := &poolWorker{
cache: chp.conf.CacheFactory(),
cache: chp.conf.CacheFactory(chp.workerCacheSize),
getRateLimitRequest: make(chan *request, commandChannelSize),
storeRequest: make(chan poolStoreRequest, commandChannelSize),
loadRequest: make(chan poolLoadRequest, commandChannelSize),
Expand Down
8 changes: 4 additions & 4 deletions gubernator_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ func TestGubernatorPool(t *testing.T) {
mockLoader := &MockLoader2{}
mockCache := &MockCache{}
conf := &guber.Config{
CacheFactory: func() guber.Cache {
CacheFactory: func(maxSize int) guber.Cache {
return mockCache
},
Loader: mockLoader,
}
conf.SetDefaults()
chp := guber.NewGubernatorPool(conf, testCase.concurrency)
chp := guber.NewGubernatorPool(conf, testCase.concurrency, 0)

// Mock Loader.
fakeLoadCh := make(chan *guber.CacheItem, NumCacheItems)
Expand All @@ -88,13 +88,13 @@ func TestGubernatorPool(t *testing.T) {
mockLoader := &MockLoader2{}
mockCache := &MockCache{}
conf := &guber.Config{
CacheFactory: func() guber.Cache {
CacheFactory: func(maxSize int) guber.Cache {
return mockCache
},
Loader: mockLoader,
}
conf.SetDefaults()
chp := guber.NewGubernatorPool(conf, testCase.concurrency)
chp := guber.NewGubernatorPool(conf, testCase.concurrency, 0)

// Mock Loader.
mockLoader.On("Save", mock.Anything).Once().Return(nil).
Expand Down

0 comments on commit 129e250

Please sign in to comment.