Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Refactor tracing using holster OpenTelemetry tooling. (#125)
Browse files Browse the repository at this point in the history
* Migrate OpenTracing to OpenTelemetry.

Breaking change to environment variable configuration.
See: jaegertracing.md.
  • Loading branch information
Baliedge authored Aug 1, 2022
1 parent 510d93b commit 8dcc74b
Show file tree
Hide file tree
Showing 17 changed files with 786 additions and 764 deletions.
136 changes: 76 additions & 60 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,68 +19,74 @@ package gubernator
import (
"context"

"github.com/mailgun/gubernator/v2/tracing"
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
ctx = tracing.StartScope(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
span := trace.SpanFromContext(ctx)

tokenBucketTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

// Get rate limit from cache.
hashKey := r.HashKey()
item, ok := c.GetItem(hashKey)
tracing.LogInfo(span, "c.GetItem()")
span.AddEvent("c.GetItem()")

if s != nil && !ok {
// Cache miss.
// Check our store for the item.
if item, ok = s.Get(ctx, r); ok {
tracing.LogInfo(span, "Check store for rate limit")
span.AddEvent("Check store for rate limit")
c.Add(item)
tracing.LogInfo(span, "c.Add()")
span.AddEvent("c.Add()")
}
}

// Sanity checks.
if ok {
if item.Value == nil {
msgPart := "tokenBucket: Invalid cache item; Value is nil"
tracing.LogInfo(span, msgPart,
"hashKey", hashKey,
"key", r.UniqueKey,
"name", r.Name,
)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
attribute.String("name", r.Name),
))
logrus.Error(msgPart)
ok = false
} else if item.Key != hashKey {
msgPart := "tokenBucket: Invalid cache item; key mismatch"
tracing.LogInfo(span, msgPart,
"itemKey", item.Key,
"hashKey", hashKey,
"name", r.Name,
)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
attribute.String("name", r.Name),
))
logrus.Error(msgPart)
ok = false
}
}

if ok {
// Item found in cache or store.
tracing.LogInfo(span, "Update existing rate limit")
span.AddEvent("Update existing rate limit")

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
c.Remove(hashKey)
tracing.LogInfo(span, "c.Remove()")
span.AddEvent("c.Remove()")

if s != nil {
s.Remove(ctx, hashKey)
tracing.LogInfo(span, "s.Remove()")
span.AddEvent("s.Remove()")
}
return &RateLimitResp{
Status: Status_UNDER_LIMIT,
Expand All @@ -98,21 +104,21 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
tracing.LogInfo(span, "Client switched algorithms; perhaps due to a migration?")
span.AddEvent("Client switched algorithms; perhaps due to a migration?")

c.Remove(hashKey)
tracing.LogInfo(span, "c.Remove()")
span.AddEvent("c.Remove()")

if s != nil {
s.Remove(ctx, hashKey)
tracing.LogInfo(span, "s.Remove()")
span.AddEvent("s.Remove()")
}

return tokenBucketNewItem(ctx, s, c, r)
}

// Update the limit if it changed.
tracing.LogInfo(span, "Update the limit if changed")
span.AddEvent("Update the limit if changed")
if t.Limit != r.Limit {
// Add difference to remaining.
t.Remaining += r.Limit - t.Limit
Expand All @@ -131,7 +137,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If the duration config changed, update the new ExpireAt.
if t.Duration != r.Duration {
tracing.LogInfo(span, "Duration changed")
span.AddEvent("Duration changed")
expire := t.CreatedAt + r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(clock.Now(), r.Duration)
Expand All @@ -144,7 +150,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
now := MillisecondNow()
if expire <= now {
// Renew item.
tracing.LogInfo(span, "Limit has expired")
span.AddEvent("Limit has expired")
expire = now + r.Duration
t.CreatedAt = now
t.Remaining = t.Limit
Expand All @@ -158,20 +164,20 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if s != nil {
defer func() {
s.OnChange(ctx, r, item)
tracing.LogInfo(span, "defer s.OnChange()")
span.AddEvent("defer s.OnChange()")
}()
}

// Client is only interested in retrieving the current status or
// updating the rate limit config.
if r.Hits == 0 {
tracing.LogInfo(span, "Return current status, apply no change")
span.AddEvent("Return current status, apply no change")
return rl, nil
}

// If we are already at the limit.
if rl.Remaining == 0 {
tracing.LogInfo(span, "Already over the limit")
span.AddEvent("Already over the limit")
overLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
t.Status = rl.Status
Expand All @@ -180,7 +186,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder.
if t.Remaining == r.Hits {
tracing.LogInfo(span, "At the limit")
span.AddEvent("At the limit")
t.Remaining = 0
rl.Remaining = 0
return rl, nil
Expand All @@ -189,13 +195,13 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// If requested is more than available, then return over the limit
// without updating the cache.
if r.Hits > t.Remaining {
tracing.LogInfo(span, "Over the limit")
span.AddEvent("Over the limit")
overLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
return rl, nil
}

tracing.LogInfo(span, "Under the limit")
span.AddEvent("Under the limit")
t.Remaining -= r.Hits
rl.Remaining = t.Remaining
return rl, nil
Expand All @@ -207,8 +213,11 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
ctx = tracing.StartScope(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
span := trace.SpanFromContext(ctx)

now := MillisecondNow()
expire := now + r.Duration
Expand All @@ -227,7 +236,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
}

// Add a new rate limit to the cache.
tracing.LogInfo(span, "Add a new rate limit to the cache")
span.AddEvent("Add a new rate limit to the cache")
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(clock.Now(), r.Duration)
if err != nil {
Expand All @@ -244,28 +253,32 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Client could be requesting that we always return OVER_LIMIT.
if r.Hits > r.Limit {
tracing.LogInfo(span, "Over the limit")
span.AddEvent("Over the limit")
overLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
rl.Remaining = r.Limit
t.Remaining = r.Limit
}

c.Add(item)
tracing.LogInfo(span, "c.Add()")
span.AddEvent("c.Add()")

if s != nil {
s.OnChange(ctx, r, item)
tracing.LogInfo(span, "s.OnChange()")
span.AddEvent("s.OnChange()")
}

return rl, nil
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
ctx = tracing.StartScope(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
span := trace.SpanFromContext(ctx)

leakyBucketTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

Expand All @@ -278,54 +291,54 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// Get rate limit from cache.
hashKey := r.HashKey()
item, ok := c.GetItem(hashKey)
tracing.LogInfo(span, "c.GetItem()")
span.AddEvent("c.GetItem()")

if s != nil && !ok {
// Cache miss.
// Check our store for the item.
if item, ok = s.Get(ctx, r); ok {
tracing.LogInfo(span, "Check store for rate limit")
span.AddEvent("Check store for rate limit")
c.Add(item)
tracing.LogInfo(span, "c.Add()")
span.AddEvent("c.Add()")
}
}

// Sanity checks.
if ok {
if item.Value == nil {
msgPart := "leakyBucket: Invalid cache item; Value is nil"
tracing.LogInfo(span, msgPart,
"hashKey", hashKey,
"key", r.UniqueKey,
"name", r.Name,
)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
attribute.String("name", r.Name),
))
logrus.Error(msgPart)
ok = false
} else if item.Key != hashKey {
msgPart := "leakyBucket: Invalid cache item; key mismatch"
tracing.LogInfo(span, msgPart,
"itemKey", item.Key,
"hashKey", hashKey,
"name", r.Name,
)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
attribute.String("name", r.Name),
))
logrus.Error(msgPart)
ok = false
}
}

if ok {
// Item found in cache or store.
tracing.LogInfo(span, "Update existing rate limit")
span.AddEvent("Update existing rate limit")

b, ok := item.Value.(*LeakyBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(hashKey)
tracing.LogInfo(span, "c.Remove()")
span.AddEvent("c.Remove()")

if s != nil {
s.Remove(ctx, hashKey)
tracing.LogInfo(span, "s.Remove()")
span.AddEvent("s.Remove()")
}

return leakyBucketNewItem(ctx, s, c, r)
Expand Down Expand Up @@ -396,7 +409,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if s != nil {
defer func() {
s.OnChange(ctx, r, item)
tracing.LogInfo(span, "s.OnChange()")
span.AddEvent("s.OnChange()")
}()
}

Expand Down Expand Up @@ -439,8 +452,11 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
ctx = tracing.StartScope(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
span := trace.SpanFromContext(ctx)

now := MillisecondNow()
duration := r.Duration
Expand Down Expand Up @@ -489,11 +505,11 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
}

c.Add(item)
tracing.LogInfo(span, "c.Add()")
span.AddEvent("c.Add()")

if s != nil {
s.OnChange(ctx, r, item)
tracing.LogInfo(span, "s.OnChange()")
span.AddEvent("s.OnChange()")
}

return &rl, nil
Expand Down
13 changes: 4 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
"time"

"github.com/mailgun/holster/v4/clock"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
Expand All @@ -45,14 +44,10 @@ func DialV1Server(server string, tls *tls.Config) (V1Client, error) {
return nil, errors.New("server is empty; must provide a server")
}

// Setup Opentracing interceptor to propagate spans.
tracer := opentracing.GlobalTracer()
tracingUnaryInterceptor := otgrpc.OpenTracingClientInterceptor(tracer)
tracingStreamInterceptor := otgrpc.OpenTracingStreamClientInterceptor(tracer)

// Setup OpenTelemetry interceptor to propagate spans.
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(tracingUnaryInterceptor),
grpc.WithStreamInterceptor(tracingStreamInterceptor),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
}
if tls != nil {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tls)))
Expand Down
Loading

0 comments on commit 8dcc74b

Please sign in to comment.