Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-1490: Expand Opentracing instrumentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Nov 19, 2021
1 parent 0aac075 commit 702dcd7
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 14 deletions.
12 changes: 6 additions & 6 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
Limit: b.Limit,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
ResetTime: now + (b.Limit - int64(b.Remaining)) * int64(rate),
ResetTime: now + (b.Limit-int64(b.Remaining))*int64(rate),
}

if s != nil {
Expand All @@ -317,7 +317,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
rl.ResetTime = now + (rl.Limit - rl.Remaining) * int64(rate)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

Expand All @@ -335,8 +335,8 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit - rl.Remaining) * int64(rate)
c.UpdateExpiration(r.HashKey(), now + duration)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
c.UpdateExpiration(r.HashKey(), now+duration)
return rl, nil
}

Expand Down Expand Up @@ -366,14 +366,14 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
Status: Status_UNDER_LIMIT,
Limit: b.Limit,
Remaining: r.Burst - r.Hits,
ResetTime: now + (b.Limit - (r.Burst - r.Hits)) * int64(rate),
ResetTime: now + (b.Limit-(r.Burst-r.Hits))*int64(rate),
}

// Client could be requesting that we start with the bucket OVER_LIMIT
if r.Hits > r.Burst {
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
rl.ResetTime = now + (rl.Limit - rl.Remaining) * int64(rate)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
b.Remaining = 0
}

Expand Down
6 changes: 6 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package gubernator
import (
"container/list"
"sync"
"sync/atomic"

"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/setter"
Expand Down Expand Up @@ -68,6 +69,9 @@ type LRUCache struct {
// Stats
sizeMetric *prometheus.Desc
accessMetric *prometheus.Desc

LockCounter uint64
UnlockCounter uint64
}

type CacheItem struct {
Expand Down Expand Up @@ -102,10 +106,12 @@ func NewLRUCache(maxSize int) *LRUCache {
}

func (c *LRUCache) Lock() {
atomic.AddUint64(&c.LockCounter, 1)
c.mutex.Lock()
}

func (c *LRUCache) Unlock() {
atomic.AddUint64(&c.UnlockCounter, 1)
c.mutex.Unlock()
}

Expand Down
4 changes: 2 additions & 2 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func TestLeakyBucket(t *testing.T) {
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(10), rl.Limit)
assert.Equal(t, clock.Now().Unix() + (rl.Limit - rl.Remaining) * 3, rl.ResetTime/1000)
assert.Equal(t, clock.Now().Unix()+(rl.Limit-rl.Remaining)*3, rl.ResetTime/1000)
clock.Advance(test.Sleep)
})
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestLeakyBucketWithBurst(t *testing.T) {
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(10), rl.Limit)
assert.Equal(t, clock.Now().Unix() + (rl.Limit - rl.Remaining) * 3, rl.ResetTime/1000)
assert.Equal(t, clock.Now().Unix()+(rl.Limit-rl.Remaining)*3, rl.ResetTime/1000)
clock.Advance(test.Sleep)
})
}
Expand Down
22 changes: 21 additions & 1 deletion gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/mailgun/gubernator/v2/tracing"
"github.com/mailgun/holster/v4/setter"
Expand Down Expand Up @@ -80,6 +81,18 @@ var funcTimeMetric = prometheus.NewSummaryVec(prometheus.SummaryOpts{
var asyncRequestsRetriesCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "baliedge_asyncrequests_retries",
}, []string{"name"})
var queueLengthMetric = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "baliedge_queue_length",
Objectives: map[float64]float64{
0.99: 0.001,
},
}, []string{"peerAddr"})
var lockCounterMetric = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "baliedge_lock_counter",
Objectives: map[float64]float64{
0.99: 0.001,
},
})

// NewV1Instance instantiate a single instance of a gubernator peer and registers this
// instance with the provided GRPCServer.
Expand Down Expand Up @@ -514,7 +527,10 @@ func (s *V1Instance) getRateLimit(ctx context.Context, r *RateLimitReq) (*RateLi
s.conf.Cache.Lock()
defer s.conf.Cache.Unlock()
lockTimer.ObserveDuration()
tracing.LogInfo(span, "conf.Cache.Lock()")
lruCache := s.conf.Cache.(*LRUCache)
lockCounter := atomic.LoadUint64(&lruCache.LockCounter) - atomic.LoadUint64(&lruCache.UnlockCounter)
lockCounterMetric.Observe(float64(lockCounter))
tracing.LogInfo(span, "conf.Cache.Lock()", "lockCounter", lockCounter)

if HasBehavior(r.Behavior, Behavior_GLOBAL) {
s.global.QueueUpdate(r)
Expand Down Expand Up @@ -682,6 +698,8 @@ func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) {
getPeerRateLimitLockDurationMetric.Describe(ch)
funcTimeMetric.Describe(ch)
asyncRequestsRetriesCounter.Describe(ch)
queueLengthMetric.Describe(ch)
lockCounterMetric.Describe(ch)
}

// Collect fetches metrics from the server for use by prometheus
Expand All @@ -693,6 +711,8 @@ func (s *V1Instance) Collect(ch chan<- prometheus.Metric) {
getPeerRateLimitLockDurationMetric.Collect(ch)
funcTimeMetric.Collect(ch)
asyncRequestsRetriesCounter.Collect(ch)
queueLengthMetric.Collect(ch)
lockCounterMetric.Collect(ch)
}

// HasBehavior returns true if the provided behavior is set
Expand Down
1 change: 1 addition & 0 deletions kubernetesconfig.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !local
// +build !local

package gubernator
Expand Down
1 change: 1 addition & 0 deletions kubernetesconfig_local.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build local
// +build local

package gubernator
Expand Down
14 changes: 9 additions & 5 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (*Ra
logrus.
WithError(errors.WithStack(err)).
WithFields(logrus.Fields{
"request": r,
"request": r,
"peerAddr": c.conf.Info.GRPCAddress,
}).
Error(errPart)
Expand Down Expand Up @@ -330,12 +330,16 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
}
req := request{
request: r,
resp: make(chan *response, 1),
ctx: ctx,
resp: make(chan *response, 1),
ctx: ctx,
}

// Enqueue the request to be sent
span2, _ := tracing.StartNamedSpan(ctx, "Enqueue request")
span2.SetTag("queueLen", len(c.queue))
srcPeerAddr := c.Info().GRPCAddress
queueLengthMetric.WithLabelValues(srcPeerAddr).Observe(float64(len(c.queue)))

c.queue <- &req
span2.Finish()

Expand Down Expand Up @@ -396,7 +400,7 @@ func (c *PeerClient) run() {
if len(queue) == c.conf.Behavior.BatchLimit {
logMsg := "run() reached batch limit"
logrus.WithFields(logrus.Fields{
"queueLen": len(queue),
"queueLen": len(queue),
"batchLimit": c.conf.Behavior.BatchLimit,
}).Info(logMsg)
tracing.LogInfo(reqSpan, logMsg)
Expand Down Expand Up @@ -454,7 +458,7 @@ func (c *PeerClient) sendQueue(ctx context.Context, queue []*request) {
logrus.
WithError(err).
WithFields(logrus.Fields{
"queueLen": len(queue),
"queueLen": len(queue),
"batchTimeout": c.conf.Behavior.BatchTimeout.String(),
}).
Error(logPart)
Expand Down

0 comments on commit 702dcd7

Please sign in to comment.