Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1490: Change back to passing CacheItem by pointers instead of b…
Browse files Browse the repository at this point in the history
…y value.
  • Loading branch information
Baliedge committed Jan 17, 2022
1 parent 2e51c7c commit c142870
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 119 deletions.
18 changes: 2 additions & 16 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
now := MillisecondNow()
expire := now + r.Duration

item := CacheItem{
item := &CacheItem{
Algorithm: Algorithm_TOKEN_BUCKET,
Key: r.HashKey(),
Value: &TokenBucketItem{
Expand Down Expand Up @@ -440,20 +440,6 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
defer span.Finish()

now := MillisecondNow()
expire := now + r.Duration

item := CacheItem{
Algorithm: Algorithm_LEAKY_BUCKET,
Key: r.HashKey(),
Value: &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: now,
},
ExpireAt: expire,
}

duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand Down Expand Up @@ -492,7 +478,7 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
b.Remaining = 0
}

item = CacheItem{
item := &CacheItem{
ExpireAt: now + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Expand Down
6 changes: 3 additions & 3 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package gubernator

// So algorithms can interface with different cache implementations
type Cache interface {
Add(item CacheItem) bool
Add(item *CacheItem) bool
UpdateExpiration(key string, expireAt int64) bool
GetItem(key string) (value CacheItem, ok bool)
Each() chan CacheItem
GetItem(key string) (value *CacheItem, ok bool)
Each() chan *CacheItem
Remove(key string)
Size() int64
Close() error
Expand Down
2 changes: 1 addition & 1 deletion gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals
defer span.Finish()

for _, g := range r.Globals {
item := CacheItem{
item := &CacheItem{
ExpireAt: g.Status.ResetTime,
Algorithm: g.Algorithm,
Value: g.Status,
Expand Down
26 changes: 13 additions & 13 deletions gubernator_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,23 @@ type poolHashRingNode struct {
type poolStoreRequest struct {
ctx context.Context
response chan poolStoreResponse
out chan<- CacheItem
out chan<- *CacheItem
}

type poolStoreResponse struct{}

type poolLoadRequest struct {
ctx context.Context
response chan poolLoadResponse
in <-chan CacheItem
in <-chan *CacheItem
}

type poolLoadResponse struct{}

type poolAddCacheItemRequest struct {
ctx context.Context
response chan poolAddCacheItemResponse
item CacheItem
item *CacheItem
}

type poolAddCacheItemResponse struct {
Expand All @@ -100,7 +100,7 @@ type poolGetCacheItemRequest struct {
}

type poolGetCacheItemResponse struct {
item CacheItem
item *CacheItem
ok bool
}

Expand Down Expand Up @@ -340,7 +340,7 @@ func (chp *GubernatorPool) Load(ctx context.Context) error {
}

type loadChannel struct {
ch chan CacheItem
ch chan *CacheItem
worker *poolWorker
respChan chan poolLoadResponse
}
Expand All @@ -351,7 +351,7 @@ func (chp *GubernatorPool) Load(ctx context.Context) error {
// Send each item to assigned channel's cache.
mainloop:
for {
var item CacheItem
var item *CacheItem
var ok bool

select {
Expand All @@ -372,7 +372,7 @@ mainloop:
loadCh, exist := loadChMap[worker]
if !exist {
loadCh = loadChannel{
ch: make(chan CacheItem),
ch: make(chan *CacheItem),
worker: worker,
respChan: make(chan poolLoadResponse),
}
Expand Down Expand Up @@ -422,7 +422,7 @@ func (chp *GubernatorPool) handleLoad(request poolLoadRequest, cache Cache) {

mainloop:
for {
var item CacheItem
var item *CacheItem
var ok bool

select {
Expand Down Expand Up @@ -460,7 +460,7 @@ func (chp *GubernatorPool) Store(ctx context.Context) error {
defer span.Finish()

var wg sync.WaitGroup
out := make(chan CacheItem, 500)
out := make(chan *CacheItem, 500)

// Iterate each worker's cache to `out` channel.
for _, worker := range chp.workers {
Expand Down Expand Up @@ -543,7 +543,7 @@ func (chp *GubernatorPool) handleStore(request poolStoreRequest, cache Cache) {
}

// Add to worker's cache.
func (chp *GubernatorPool) AddCacheItem(ctx context.Context, key string, item CacheItem) error {
func (chp *GubernatorPool) AddCacheItem(ctx context.Context, key string, item *CacheItem) error {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand Down Expand Up @@ -596,7 +596,7 @@ func (chp *GubernatorPool) handleAddCacheItem(request poolAddCacheItemRequest, c
}

// Get item from worker's cache.
func (chp *GubernatorPool) GetCacheItem(ctx context.Context, key string) (CacheItem, bool, error) {
func (chp *GubernatorPool) GetCacheItem(ctx context.Context, key string) (*CacheItem, bool, error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()

Expand All @@ -621,13 +621,13 @@ func (chp *GubernatorPool) GetCacheItem(ctx context.Context, key string) (CacheI
case <-ctx.Done():
// Context canceled.
ext.LogError(span, ctx.Err())
return CacheItem{}, false, ctx.Err()
return nil, false, ctx.Err()
}

case <-ctx.Done():
// Context canceled.
ext.LogError(span, ctx.Err())
return CacheItem{}, false, ctx.Err()
return nil, false, ctx.Err()
}
}

Expand Down
12 changes: 6 additions & 6 deletions gubernator_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestGubernatorPool(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
// Setup mock data.
const NumCacheItems = 100
cacheItems := []guber.CacheItem{}
cacheItems := []*guber.CacheItem{}
for i := 0; i < NumCacheItems; i++ {
cacheItems = append(cacheItems, guber.CacheItem{
cacheItems = append(cacheItems, &guber.CacheItem{
Key: fmt.Sprintf("Foobar%04d", i),
Value: fmt.Sprintf("Stuff%04d", i),
ExpireAt: 4131978658000,
Expand All @@ -65,7 +65,7 @@ func TestGubernatorPool(t *testing.T) {
chp := guber.NewGubernatorPool(conf, testCase.concurrency)

// Mock Loader.
fakeLoadCh := make(chan guber.CacheItem, NumCacheItems)
fakeLoadCh := make(chan *guber.CacheItem, NumCacheItems)
for _, item := range cacheItems {
fakeLoadCh <- item
}
Expand Down Expand Up @@ -100,8 +100,8 @@ func TestGubernatorPool(t *testing.T) {
mockLoader.On("Save", mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
// Verify items sent over the channel passed to Save().
saveCh := args.Get(0).(chan guber.CacheItem)
savedItems := []guber.CacheItem{}
saveCh := args.Get(0).(chan *guber.CacheItem)
savedItems := []*guber.CacheItem{}
for item := range saveCh {
savedItems = append(savedItems, item)
}
Expand All @@ -114,7 +114,7 @@ func TestGubernatorPool(t *testing.T) {
})

// Mock Cache.
eachCh := make(chan guber.CacheItem, NumCacheItems)
eachCh := make(chan *guber.CacheItem, NumCacheItems)
for _, item := range cacheItems {
eachCh <- item
}
Expand Down
16 changes: 8 additions & 8 deletions lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,19 @@ func NewLRUCache(maxSize int) *LRUCache {
// It would be safer if this were done using an iterator or delegate pattern
// that doesn't require a goroutine.
// May need to reassess functional requirements.
func (c *LRUCache) Each() chan CacheItem {
out := make(chan CacheItem)
func (c *LRUCache) Each() chan *CacheItem {
out := make(chan *CacheItem)
go func() {
for _, ele := range c.cache {
out <- ele.Value.(CacheItem)
out <- ele.Value.(*CacheItem)
}
close(out)
}()
return out
}

// Adds a value to the cache.
func (c *LRUCache) Add(item CacheItem) bool {
func (c *LRUCache) Add(item *CacheItem) bool {
// If the key already exist, set the new value
if ee, ok := c.cache[item.Key]; ok {
c.ll.MoveToFront(ee)
Expand All @@ -105,9 +105,9 @@ func MillisecondNow() int64 {
}

// GetItem returns the item stored in the cache
func (c *LRUCache) GetItem(key string) (item CacheItem, ok bool) {
func (c *LRUCache) GetItem(key string) (item *CacheItem, ok bool) {
if ele, hit := c.cache[key]; hit {
entry := ele.Value.(CacheItem)
entry := ele.Value.(*CacheItem)

now := MillisecondNow()
// If the entry is invalidated
Expand Down Expand Up @@ -150,7 +150,7 @@ func (c *LRUCache) removeOldest() {

func (c *LRUCache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(CacheItem)
kv := e.Value.(*CacheItem)
delete(c.cache, kv.Key)
atomic.StoreInt64(&c.cacheLen, int64(c.ll.Len()))
}
Expand All @@ -163,7 +163,7 @@ func (c *LRUCache) Size() int64 {
// Update the expiration time for the key
func (c *LRUCache) UpdateExpiration(key string, expireAt int64) bool {
if ele, hit := c.cache[key]; hit {
entry := ele.Value.(CacheItem)
entry := ele.Value.(*CacheItem)
entry.ExpireAt = expireAt
return true
}
Expand Down
Loading

0 comments on commit c142870

Please sign in to comment.