Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Reduce tracing detail (#177)
Browse files Browse the repository at this point in the history
* PIP-2499: Update holster.

* PIP-2499: Simplify tracing.

* PIP-2499: Remove unnecessary `AddEvent()` calls.

* PIP-2499: Reduce detail.

---------

Co-authored-by: Shawn Poulson <[email protected]>
  • Loading branch information
Baliedge and Baliedge authored Jun 5, 2023
1 parent ea50d91 commit eabc5e6
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 45 deletions.
25 changes: 0 additions & 25 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// Get rate limit from cache.
hashKey := r.HashKey()
item, ok := c.GetItem(hashKey)
span.AddEvent("c.GetItem()")

if s != nil && !ok {
// Cache miss.
// Check our store for the item.
if item, ok = s.Get(ctx, r); ok {
span.AddEvent("Check store for rate limit")
c.Add(item)
span.AddEvent("c.Add()")
}
}

Expand Down Expand Up @@ -78,15 +75,11 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

if ok {
// Item found in cache or store.
span.AddEvent("Update existing rate limit")

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
c.Remove(hashKey)
span.AddEvent("c.Remove()")

if s != nil {
s.Remove(ctx, hashKey)
span.AddEvent("s.Remove()")
}
return &RateLimitResp{
Status: Status_UNDER_LIMIT,
Expand All @@ -107,18 +100,15 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
span.AddEvent("Client switched algorithms; perhaps due to a migration?")

c.Remove(hashKey)
span.AddEvent("c.Remove()")

if s != nil {
s.Remove(ctx, hashKey)
span.AddEvent("s.Remove()")
}

return tokenBucketNewItem(ctx, s, c, r)
}

// Update the limit if it changed.
span.AddEvent("Update the limit if changed")
if t.Limit != r.Limit {
// Add difference to remaining.
t.Remaining += r.Limit - t.Limit
Expand Down Expand Up @@ -164,14 +154,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if s != nil {
defer func() {
s.OnChange(ctx, r, item)
span.AddEvent("defer s.OnChange()")
}()
}

// Client is only interested in retrieving the current status or
// updating the rate limit config.
if r.Hits == 0 {
span.AddEvent("Return current status, apply no change")
return rl, nil
}

Expand Down Expand Up @@ -230,7 +218,6 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
}

// Add a new rate limit to the cache.
span.AddEvent("Add a new rate limit to the cache")
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(clock.Now(), r.Duration)
if err != nil {
Expand Down Expand Up @@ -262,11 +249,9 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
}

c.Add(item)
span.AddEvent("c.Add()")

if s != nil {
s.OnChange(ctx, r, item)
span.AddEvent("s.OnChange()")
}

return rl, nil
Expand All @@ -292,15 +277,12 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// Get rate limit from cache.
hashKey := r.HashKey()
item, ok := c.GetItem(hashKey)
span.AddEvent("c.GetItem()")

if s != nil && !ok {
// Cache miss.
// Check our store for the item.
if item, ok = s.Get(ctx, r); ok {
span.AddEvent("Check store for rate limit")
c.Add(item)
span.AddEvent("c.Add()")
}
}

Expand Down Expand Up @@ -329,17 +311,14 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

if ok {
// Item found in cache or store.
span.AddEvent("Update existing rate limit")

b, ok := item.Value.(*LeakyBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(hashKey)
span.AddEvent("c.Remove()")

if s != nil {
s.Remove(ctx, hashKey)
span.AddEvent("s.Remove()")
}

return leakyBucketNewItem(ctx, s, c, r)
Expand Down Expand Up @@ -410,7 +389,6 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if s != nil {
defer func() {
s.OnChange(ctx, r, item)
span.AddEvent("s.OnChange()")
}()
}

Expand Down Expand Up @@ -457,7 +435,6 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
defer func() {
tracing.EndScope(ctx, err)
}()
span := trace.SpanFromContext(ctx)

now := MillisecondNow()
duration := r.Duration
Expand Down Expand Up @@ -506,11 +483,9 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
}

c.Add(item)
span.AddEvent("c.Add()")

if s != nil {
s.OnChange(ctx, r, item)
span.AddEvent("s.OnChange()")
}

return &rl, nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3
github.com/hashicorp/memberlist v0.5.0
github.com/mailgun/holster/v4 v4.12.0
github.com/mailgun/holster/v4 v4.12.4
github.com/miekg/dns v1.1.50
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailgun/holster/v4 v4.12.0 h1:SsorK7fkIdkyW7pE1Vg4aCXWiwZEz15cf8FT31IPlZU=
github.com/mailgun/holster/v4 v4.12.0/go.mod h1:s2H6HX3iMCub1K1Y7qjf0RgpzS2+mRxyj+i3vlb4FK0=
github.com/mailgun/holster/v4 v4.12.4 h1:U9wHgacx+s1/mPhl9ELMhdnMofg2HZHBmeMWnur3HJ0=
github.com/mailgun/holster/v4 v4.12.4/go.mod h1:s2H6HX3iMCub1K1Y7qjf0RgpzS2+mRxyj+i3vlb4FK0=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
Expand Down
13 changes: 5 additions & 8 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ func (s *V1Instance) Close() (reterr error) {
// peer that does.
func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (retval *GetRateLimitsResp, reterr error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.Int("num.items", len(r.Requests)),
)

funcTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.GetRateLimits"))
defer funcTimer.ObserveDuration()
Expand All @@ -217,7 +220,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (re

// For each item in the request body
for i, req := range r.Requests {
_ = tracing.NamedScope(ctx, "Iterate requests", func(ctx context.Context) error {
_ = tracing.NamedScopeDebug(ctx, "Iterate requests", func(ctx context.Context) error {
key := req.Name + "_" + req.UniqueKey
var peer *PeerClient
var err error
Expand Down Expand Up @@ -325,7 +328,7 @@ func (s *V1Instance) asyncRequests(ctx context.Context, req *AsyncReq) {
var attempts int
var err error

ctx = tracing.StartScope(ctx)
ctx = tracing.StartScopeDebug(ctx)
defer tracing.EndScope(ctx, nil)
span := trace.SpanFromContext(ctx)
span.SetAttributes(
Expand Down Expand Up @@ -548,7 +551,6 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (retval

s.peerMutex.RLock()
defer s.peerMutex.RUnlock()
span.AddEvent("peerMutex.RLock()")

// Iterate through local peers and get their last errors
localPeers := s.conf.LocalPicker.Peers()
Expand Down Expand Up @@ -615,12 +617,10 @@ func (s *V1Instance) getRateLimit(ctx context.Context, r *RateLimitReq) (retval

if HasBehavior(r.Behavior, Behavior_GLOBAL) {
s.global.QueueUpdate(r)
span.AddEvent("s.global.QueueUpdate(r)")
}

if HasBehavior(r.Behavior, Behavior_MULTI_REGION) {
s.mutliRegion.QueueHits(r)
span.AddEvent("s.mutliRegion.QueueHits(r)")
}

resp, err := s.gubernatorPool.GetRateLimit(ctx, r)
Expand Down Expand Up @@ -719,15 +719,12 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {

// GetPeer returns a peer client for the hash key provided
func (s *V1Instance) GetPeer(ctx context.Context, key string) (retval *PeerClient, reterr error) {
span := trace.SpanFromContext(ctx)

funcTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.GetPeer"))
defer funcTimer.ObserveDuration()
lockTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.GetPeer_RLock"))

s.peerMutex.RLock()
defer s.peerMutex.RUnlock()
span.AddEvent("peerMutex.RLock()")
lockTimer.ObserveDuration()

peer, err := s.conf.LocalPicker.Get(key)
Expand Down
11 changes: 2 additions & 9 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (c *PeerClient) connect(ctx context.Context) (reterr error) {
defer func() {
tracing.EndScope(ctx, reterr)
}()
span := trace.SpanFromContext(ctx)

// NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and
// handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected.
Expand All @@ -112,7 +111,6 @@ func (c *PeerClient) connect(ctx context.Context) (reterr error) {

c.mutex.RLock()
lockTimer.ObserveDuration()
span.AddEvent("mutex.RLock()")

if c.status == peerClosing {
c.mutex.RUnlock()
Expand All @@ -128,7 +126,6 @@ func (c *PeerClient) connect(ctx context.Context) (reterr error) {
c.mutex.RUnlock()
c.mutex.Lock()
defer c.mutex.Unlock()
span.AddEvent("mutex.Lock()")

// Now that we have the RW lock, ensure no else got here ahead of us.
if c.status == peerConnected {
Expand Down Expand Up @@ -169,7 +166,7 @@ func (c *PeerClient) Info() PeerInfo {
// GetPeerRateLimit forwards a rate limit request to a peer. If the rate limit has `behavior == BATCHING` configured
// this method will attempt to batch the rate limits
func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (retval *RateLimitResp, reterr error) {
ctx = tracing.StartScope(ctx)
ctx = tracing.StartScopeDebug(ctx)
defer func() {
tracing.EndScope(ctx, reterr)
}()
Expand Down Expand Up @@ -222,7 +219,6 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits
// a race condition if called within a separate go routine if the internal wg is `0`
// when Wait() is called then Add(1) is called concurrently.
c.mutex.RLock()
span.AddEvent("mutex.RLock()")
c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
Expand Down Expand Up @@ -251,15 +247,13 @@ func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals
defer func() {
tracing.EndScope(ctx, reterr)
}()
span := trace.SpanFromContext(ctx)

if err := c.connect(ctx); err != nil {
return nil, c.setLastErr(err)
}

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
span.AddEvent("mutex.RLock()")
c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
Expand Down Expand Up @@ -328,7 +322,6 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
span.AddEvent("mutex.RLock()")
if c.status == peerClosing {
err := &PeerErr{err: errors.New("already disconnecting")}
return nil, c.setLastErr(err)
Expand Down Expand Up @@ -396,7 +389,7 @@ func (c *PeerClient) run() {
return
}

_ = tracing.CallNamedScope(r.ctx, "Send batch", func(reqCtx context.Context) error {
_ = tracing.CallNamedScopeDebug(r.ctx, "Send batch", func(reqCtx context.Context) error {
span := trace.SpanFromContext(reqCtx)
span.SetAttributes(
attribute.String("peer.grpcAddress", c.conf.Info.GRPCAddress),
Expand Down

0 comments on commit eabc5e6

Please sign in to comment.