Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1490: Normalize unit tests to use "_test" package suffix.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Jan 14, 2022
1 parent 58432d5 commit ea4af60
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 59 deletions.
27 changes: 17 additions & 10 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
tracing.LogInfo(span, "Update existing rate limit")

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
c.Remove(r.HashKey())
c.Remove(hashKey)
tracing.LogInfo(span, "c.Remove()")

if s != nil {
s.Remove(r.HashKey())
tracing.LogInfo(span, "c.Remove()")
s.Remove(hashKey)
tracing.LogInfo(span, "s.Remove()")
}
return &RateLimitResp{
Status: Status_UNDER_LIMIT,
Expand All @@ -99,14 +99,15 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// Client switched algorithms; perhaps due to a migration?
tracing.LogInfo(span, "Client switched algorithms; perhaps due to a migration?")

c.Remove(r.HashKey())
c.Remove(hashKey)
tracing.LogInfo(span, "c.Remove()")

if s != nil {
s.Remove(r.HashKey())
s.Remove(hashKey)
tracing.LogInfo(span, "s.Remove()")
}

// FIXME: Eliminate recursion.
return tokenBucket(ctx, s, c, r)
}

Expand Down Expand Up @@ -145,16 +146,21 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
return nil, err
}
}

// If our new duration means we are currently expired
if expire <= MillisecondNow() {
// Update this so s.OnChange() will get the new expire change
tracing.LogInfo(span, "Limit has expired")
item.ExpireAt = expire

c.Remove(item.Key)
tracing.LogInfo(span, "c.Remove()")

return tokenBucketNewItem(ctx, s, c, r, item)
// FIXME: tokenBucketNewItem creates a new item. But, we want to
// preserve this item for its CreatedAt timestamp.
return tokenBucketNewItem(ctx, s, c, r)
}

item.ExpireAt = expire
rl.ResetTime = expire
}
Expand Down Expand Up @@ -196,10 +202,11 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
return rl, nil
}

return tokenBucketNewItem(ctx, s, c, r, item)
return tokenBucketNewItem(ctx, s, c, r)
}

func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, item CacheItem) (resp *RateLimitResp, err error) {
// Called by tokenBucket() when the requested item is not found in cache or store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -237,7 +244,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
t.Remaining = r.Limit
}

item = CacheItem{
item := CacheItem{
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: t,
Expand Down Expand Up @@ -289,7 +296,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

if s != nil {
s.Remove(r.HashKey())
tracing.LogInfo(span, "c.Remove()")
tracing.LogInfo(span, "s.Remove()")
}

return leakyBucket(ctx, s, c, r)
Expand Down
4 changes: 2 additions & 2 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type V1Instance struct {
conf Config
isClosed bool
getRateLimitsCounter int64
gubernatorPool *gubernatorPool
gubernatorPool *GubernatorPool
}

var getRateLimitCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -125,7 +125,7 @@ func NewV1Instance(conf Config) (*V1Instance, error) {
setter.SetDefault(&s.log, logrus.WithField("category", "gubernator"))

numCpus := runtime.NumCPU()
s.gubernatorPool = newGubernatorPool(&conf, numCpus)
s.gubernatorPool = NewGubernatorPool(&conf, numCpus)
s.global = newGlobalManager(conf.Behaviors, &s)
s.mutliRegion = newMultiRegionManager(conf.Behaviors, &s)

Expand Down
36 changes: 18 additions & 18 deletions gubernator_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/sirupsen/logrus"
)

type gubernatorPool struct {
type GubernatorPool struct {
workers []*poolWorker

// Workers in the hash ring. Must be sorted by hash value.
Expand Down Expand Up @@ -104,11 +104,11 @@ type poolGetCacheItemResponse struct {
ok bool
}

var _ io.Closer = &gubernatorPool{}
var _ io.Closer = &GubernatorPool{}
var poolWorkerCounter int64

func newGubernatorPool(conf *Config, concurrency int) *gubernatorPool {
chp := &gubernatorPool{
func NewGubernatorPool(conf *Config, concurrency int) *GubernatorPool {
chp := &GubernatorPool{
conf: conf,
done: make(chan struct{}),
}
Expand All @@ -121,13 +121,13 @@ func newGubernatorPool(conf *Config, concurrency int) *gubernatorPool {
return chp
}

func (chp *gubernatorPool) Close() error {
func (chp *GubernatorPool) Close() error {
close(chp.done)
return nil
}

// Add a request channel to the worker pool.
func (chp *gubernatorPool) addWorker() *poolWorker {
func (chp *GubernatorPool) addWorker() *poolWorker {
const commandChannelSize = 10000

worker := &poolWorker{
Expand Down Expand Up @@ -166,7 +166,7 @@ func (chp *gubernatorPool) addWorker() *poolWorker {

// Returns the request channel associated with the key.
// Hash the key, then lookup hash ring to find the channel.
func (chp *gubernatorPool) getWorker(key string) *poolWorker {
func (chp *GubernatorPool) getWorker(key string) *poolWorker {
hash := xxhash.ChecksumString64S(key, 0)

// Binary search for appropriate channel.
Expand All @@ -186,7 +186,7 @@ func (chp *gubernatorPool) getWorker(key string) *poolWorker {
// Each worker maintains its own state.
// A hash ring will distribute requests to an assigned worker by key.
// See: getWorker()
func (chp *gubernatorPool) worker(worker *poolWorker) {
func (chp *GubernatorPool) worker(worker *poolWorker) {
for {
// Dispatch requests from each channel.
select {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (chp *gubernatorPool) worker(worker *poolWorker) {
}

// Send a GetRateLimit request to worker pool.
func (chp *gubernatorPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) (*RateLimitResp, error) {
func (chp *GubernatorPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) (*RateLimitResp, error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -280,7 +280,7 @@ func (chp *gubernatorPool) GetRateLimit(ctx context.Context, rlRequest *RateLimi
}

// Handle request received by worker.
func (chp *gubernatorPool) handleGetRateLimit(handlerRequest *request, cache Cache) {
func (chp *GubernatorPool) handleGetRateLimit(handlerRequest *request, cache Cache) {
span, ctx := tracing.StartSpan(handlerRequest.ctx)
defer span.Finish()

Expand Down Expand Up @@ -330,7 +330,7 @@ func (chp *gubernatorPool) handleGetRateLimit(handlerRequest *request, cache Cac
// Atomically load cache from persistent storage.
// Read from persistent storage. Load into each appropriate worker's cache.
// Workers are locked during this load operation to prevent race conditions.
func (chp *gubernatorPool) Load(ctx context.Context) error {
func (chp *GubernatorPool) Load(ctx context.Context) error {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -416,7 +416,7 @@ mainloop:
return nil
}

func (chp *gubernatorPool) handleLoad(request poolLoadRequest, cache Cache) {
func (chp *GubernatorPool) handleLoad(request poolLoadRequest, cache Cache) {
span, ctx := tracing.StartSpan(request.ctx)
defer span.Finish()

Expand Down Expand Up @@ -455,7 +455,7 @@ mainloop:
// Atomically store cache to persistent storage.
// Save all workers' caches to persistent storage.
// Workers are locked during this store operation to prevent race conditions.
func (chp *gubernatorPool) Store(ctx context.Context) error {
func (chp *GubernatorPool) Store(ctx context.Context) error {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -514,7 +514,7 @@ func (chp *gubernatorPool) Store(ctx context.Context) error {
return chp.conf.Loader.Save(out)
}

func (chp *gubernatorPool) handleStore(request poolStoreRequest, cache Cache) {
func (chp *GubernatorPool) handleStore(request poolStoreRequest, cache Cache) {
span, ctx := tracing.StartSpan(request.ctx)
defer span.Finish()

Expand Down Expand Up @@ -543,7 +543,7 @@ func (chp *gubernatorPool) handleStore(request poolStoreRequest, cache Cache) {
}

// Add to worker's cache.
func (chp *gubernatorPool) AddCacheItem(ctx context.Context, key string, item CacheItem) error {
func (chp *GubernatorPool) AddCacheItem(ctx context.Context, key string, item CacheItem) error {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -578,7 +578,7 @@ func (chp *gubernatorPool) AddCacheItem(ctx context.Context, key string, item Ca
}
}

func (chp *gubernatorPool) handleAddCacheItem(request poolAddCacheItemRequest, cache Cache) {
func (chp *GubernatorPool) handleAddCacheItem(request poolAddCacheItemRequest, cache Cache) {
span, ctx := tracing.StartSpan(request.ctx)
defer span.Finish()

Expand All @@ -596,7 +596,7 @@ func (chp *gubernatorPool) handleAddCacheItem(request poolAddCacheItemRequest, c
}

// Get item from worker's cache.
func (chp *gubernatorPool) GetCacheItem(ctx context.Context, key string) (CacheItem, bool, error) {
func (chp *GubernatorPool) GetCacheItem(ctx context.Context, key string) (CacheItem, bool, error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -631,7 +631,7 @@ func (chp *gubernatorPool) GetCacheItem(ctx context.Context, key string) (CacheI
}
}

func (chp *gubernatorPool) handleGetCacheItem(request poolGetCacheItemRequest, cache Cache) {
func (chp *GubernatorPool) handleGetCacheItem(request poolGetCacheItemRequest, cache Cache) {
span, ctx := tracing.StartSpan(request.ctx)
defer span.Finish()

Expand Down
27 changes: 14 additions & 13 deletions gubernator_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package gubernator
package gubernator_test

import (
"context"
"fmt"
"sort"
"testing"

guber "github.com/mailgun/gubernator/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -42,9 +43,9 @@ func TestGubernatorPool(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
// Setup mock data.
const NumCacheItems = 100
cacheItems := []CacheItem{}
cacheItems := []guber.CacheItem{}
for i := 0; i < NumCacheItems; i++ {
cacheItems = append(cacheItems, CacheItem{
cacheItems = append(cacheItems, guber.CacheItem{
Key: fmt.Sprintf("Foobar%04d", i),
Value: fmt.Sprintf("Stuff%04d", i),
ExpireAt: 4131978658000,
Expand All @@ -54,17 +55,17 @@ func TestGubernatorPool(t *testing.T) {
t.Run("Load()", func(t *testing.T) {
mockLoader := &MockLoader2{}
mockCache := &MockCache{}
conf := &Config{
CacheFactory: func() Cache {
conf := &guber.Config{
CacheFactory: func() guber.Cache {
return mockCache
},
Loader: mockLoader,
}
conf.SetDefaults()
chp := newGubernatorPool(conf, testCase.concurrency)
chp := guber.NewGubernatorPool(conf, testCase.concurrency)

// Mock Loader.
fakeLoadCh := make(chan CacheItem, NumCacheItems)
fakeLoadCh := make(chan guber.CacheItem, NumCacheItems)
for _, item := range cacheItems {
fakeLoadCh <- item
}
Expand All @@ -86,21 +87,21 @@ func TestGubernatorPool(t *testing.T) {
t.Run("Store()", func(t *testing.T) {
mockLoader := &MockLoader2{}
mockCache := &MockCache{}
conf := &Config{
CacheFactory: func() Cache {
conf := &guber.Config{
CacheFactory: func() guber.Cache {
return mockCache
},
Loader: mockLoader,
}
conf.SetDefaults()
chp := newGubernatorPool(conf, testCase.concurrency)
chp := guber.NewGubernatorPool(conf, testCase.concurrency)

// Mock Loader.
mockLoader.On("Save", mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
// Verify items sent over the channel passed to Save().
saveCh := args.Get(0).(chan CacheItem)
savedItems := []CacheItem{}
saveCh := args.Get(0).(chan guber.CacheItem)
savedItems := []guber.CacheItem{}
for item := range saveCh {
savedItems = append(savedItems, item)
}
Expand All @@ -113,7 +114,7 @@ func TestGubernatorPool(t *testing.T) {
})

// Mock Cache.
eachCh := make(chan CacheItem, NumCacheItems)
eachCh := make(chan guber.CacheItem, NumCacheItems)
for _, item := range cacheItems {
eachCh <- item
}
Expand Down
23 changes: 13 additions & 10 deletions mock_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package gubernator
package gubernator_test

// Mock implementation of Cache.

import "github.com/stretchr/testify/mock"
import (
guber "github.com/mailgun/gubernator/v2"
"github.com/stretchr/testify/mock"
)

type MockCache struct {
mock.Mock
}

var _ Cache = &MockCache{}
var _ guber.Cache = &MockCache{}

func (m *MockCache) Add(item CacheItem) bool {
func (m *MockCache) Add(item guber.CacheItem) bool {
args := m.Called(item)
return args.Bool(0)
}
Expand All @@ -36,19 +39,19 @@ func (m *MockCache) UpdateExpiration(key string, expireAt int64) bool {
return args.Bool(0)
}

func (m *MockCache) GetItem(key string) (value CacheItem, ok bool) {
func (m *MockCache) GetItem(key string) (value guber.CacheItem, ok bool) {
args := m.Called(key)
var retval CacheItem
if retval2, ok := args.Get(0).(CacheItem); ok {
var retval guber.CacheItem
if retval2, ok := args.Get(0).(guber.CacheItem); ok {
retval = retval2
}
return retval, args.Bool(1)
}

func (m *MockCache) Each() chan CacheItem {
func (m *MockCache) Each() chan guber.CacheItem {
args := m.Called()
var retval chan CacheItem
if retval2, ok := args.Get(0).(chan CacheItem); ok {
var retval chan guber.CacheItem
if retval2, ok := args.Get(0).(chan guber.CacheItem); ok {
retval = retval2
}
return retval
Expand Down
Loading

0 comments on commit ea4af60

Please sign in to comment.