Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Performance optimizations around batching behavior (#126)
Browse files Browse the repository at this point in the history
* Async calls to `sendQueue()` to reduce possibility of backlog of clients trying to enqueue batched rate checks.
* Async processing in `V1Instance.GetPeerRateLimits()`.
* Add Prometheus metric for batch send timings.
* `GetPeerRateLimits` to return response with rate checks in same order as request.
* `GetPeerRateLimits` concurrency limit matched that of `GubernatorPool` so as not to overload.
  • Loading branch information
Baliedge authored Feb 17, 2022
1 parent 0feb47f commit 47b7b2d
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 50 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.0.0-rc.14] - 2022-02-17
## Changes
* Added performance optimizations to ensure batching behavior does not cause
additional performance bottlenecks at scale.

## [2.0.0-rc.13] - 2022-01-19
## Changes
* Added Opentracing support in gRPC service and various critical functions.
Expand Down
34 changes: 19 additions & 15 deletions cmd/gubernator-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"strings"
"time"

"github.com/davecgh/go-spew/spew"
guber "github.com/mailgun/gubernator/v2"
Expand All @@ -38,23 +39,26 @@ import (
"golang.org/x/time/rate"
)

var log *logrus.Logger
var configFile, grpcAddress string
var concurrency uint64
var checksPerRequest uint64
var reqRate float64
var quiet bool
var (
log *logrus.Logger
configFile, grpcAddress string
concurrency uint64
timeout time.Duration
checksPerRequest uint64
reqRate float64
quiet bool
)

func main() {
log = logrus.StandardLogger()
flags := flag.NewFlagSet("gubernator", flag.ContinueOnError)
flags.StringVar(&configFile, "config", "", "Environment config file")
flags.StringVar(&grpcAddress, "e", "", "Gubernator GRPC endpoint address")
flags.Uint64Var(&concurrency, "concurrency", 1, "Concurrent threads (default 1)")
flags.Uint64Var(&checksPerRequest, "checks", 1, "Rate checks per request (default 1)")
flags.Float64Var(&reqRate, "rate", 0, "Request rate overall, 0 = no rate limit")
flags.BoolVar(&quiet, "q", false, "Quiet logging")
checkErr(flags.Parse(os.Args[1:]))
flag.StringVar(&configFile, "config", "", "Environment config file")
flag.StringVar(&grpcAddress, "e", "", "Gubernator GRPC endpoint address")
flag.Uint64Var(&concurrency, "concurrency", 1, "Concurrent threads (default 1)")
flag.DurationVar(&timeout, "timeout", 100*time.Millisecond, "Request timeout (default 100ms)")
flag.Uint64Var(&checksPerRequest, "checks", 1, "Rate checks per request (default 1)")
flag.Float64Var(&reqRate, "rate", 0, "Request rate overall, 0 = no rate limit")
flag.BoolVar(&quiet, "q", false, "Quiet logging")
flag.Parse()

ctx := context.Background()
err := initTracing()
Expand Down Expand Up @@ -154,7 +158,7 @@ func randInt(min, max int) int {
func sendRequest(ctx context.Context, client guber.V1Client, req *guber.GetRateLimitsReq) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
ctx, cancel := tracing.ContextWithTimeout(ctx, clock.Millisecond*500)
ctx, cancel := tracing.ContextWithTimeout(ctx, timeout)

// Now hit our cluster with the rate limits
resp, err := client.GetRateLimits(ctx, req)
Expand Down
47 changes: 47 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"testing"

"github.com/mailgun/gubernator/v2"
guber "github.com/mailgun/gubernator/v2"
"github.com/mailgun/gubernator/v2/cluster"
"github.com/mailgun/holster/v4/clock"
Expand Down Expand Up @@ -1028,6 +1029,52 @@ func TestGRPCGateway(t *testing.T) {
require.NoError(t, err)
}

func TestGetPeerRateLimits(t *testing.T) {
ctx := context.Background()
peerClient := gubernator.NewPeerClient(gubernator.PeerConfig{
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
})

t.Run("Stable rate check request order", func(t *testing.T) {
// Ensure response order matches rate check request order.
// Try various batch sizes.
testCases := []int{1, 2, 5, 10, 100, 1000}

for _, n := range testCases {
t.Run(fmt.Sprintf("Batch size %d", n), func(t *testing.T) {
// Build request.
req := &gubernator.GetPeerRateLimitsReq{
Requests: make([]*gubernator.RateLimitReq, n),
}
for i := 0; i < n; i++ {
req.Requests[i] = &gubernator.RateLimitReq{
Name: "Foobar",
UniqueKey: fmt.Sprintf("%08x", i),
Hits: 0,
Limit: 1000 + int64(i),
Duration: 1000,
Algorithm: gubernator.Algorithm_TOKEN_BUCKET,
Behavior: gubernator.Behavior_BATCHING,
}
}

// Send request.
resp, err := peerClient.GetPeerRateLimits(ctx, req)

// Verify.
require.NoError(t, err)
require.NotNil(t, resp)
assert.Len(t, resp.RateLimits, n)

for i, item := range resp.RateLimits {
// Identify response by its unique limit.
assert.Equal(t, req.Requests[i].Limit, item.Limit)
}
})
}
})
}

// TODO: Add a test for sending no rate limits RateLimitReqList.RateLimits = nil

func getMetric(t testutil.TestingT, in io.Reader, name string) *model.Sample {
Expand Down
87 changes: 69 additions & 18 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var asyncRequestRetriesCounter = prometheus.NewCounterVec(prometheus.CounterOpts
}, []string{"name"})
var queueLengthMetric = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "gubernator_queue_length",
Help: "The getRateLimitsBatch() queue length in PeerClient.",
Help: "The getRateLimitsBatch() queue length in PeerClient. This represents rate checks queued by for batching to a remote peer.",
Objectives: map[float64]float64{
0.99: 0.001,
},
Expand Down Expand Up @@ -102,6 +102,13 @@ var poolWorkerQueueLength = prometheus.NewSummaryVec(prometheus.SummaryOpts{
0.99: 0.001,
},
}, []string{"method", "worker"})
var batchSendDurationMetric = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "gubernator_batch_send_duration",
Help: "The timings of batch send operations to a remote peer.",
Objectives: map[float64]float64{
0.99: 0.001,
},
}, []string{"peerAddr"})

// NewV1Instance instantiate a single instance of a gubernator peer and registers this
// instance with the provided GRPCServer.
Expand Down Expand Up @@ -228,6 +235,15 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
return
}

if ctx2.Err() != nil {
err = errors.Wrap(ctx2.Err(), "Error while iterating request items")
ext.LogError(span, err)
resp.Responses[i] = &RateLimitResp{
Error: err.Error(),
}
return
}

peer, err = s.GetPeer(ctx2, key)
if err != nil {
countError(err, "Error in GetPeer")
Expand Down Expand Up @@ -481,27 +497,65 @@ func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits

span.SetTag("numRequests", len(r.Requests))

var resp GetPeerRateLimitsResp

if len(r.Requests) > maxBatchSize {
err2 := fmt.Errorf("'PeerRequest.rate_limits' list too large; max size is '%d'", maxBatchSize)
ext.LogError(span, err2)
checkErrorCounter.WithLabelValues("Request too large").Add(1)
return nil, status.Error(codes.OutOfRange, err2.Error())
}

for _, req := range r.Requests {
rl, err := s.getRateLimit(ctx, req)
if err != nil {
// Return the error for this request
err2 := errors.Wrap(err, "Error in getRateLimit")
ext.LogError(span, err2)
rl = &RateLimitResp{Error: err2.Error()}
// checkErrorCounter is updated within getRateLimit().
// Invoke each rate limit request.
type reqIn struct {
idx int
req *RateLimitReq
}
type respOut struct {
idx int
rl *RateLimitResp
}

resp := &GetPeerRateLimitsResp{
RateLimits: make([]*RateLimitResp, len(r.Requests)),
}
respChan := make(chan respOut)
var respWg sync.WaitGroup
respWg.Add(1)

go func() {
// Capture each response and keep in stable order.
for out := range respChan {
resp.RateLimits[out.idx] = out.rl
}
resp.RateLimits = append(resp.RateLimits, rl)

respWg.Done()
}()

// Fan out requests.
concurrencyLimit := s.conf.PoolWorkers
fan := syncutil.NewFanOut(concurrencyLimit)
for idx, req := range r.Requests {
fan.Run(func(in interface{}) error {
rin := in.(reqIn)
rl, err := s.getRateLimit(ctx, rin.req)
if err != nil {
// Return the error for this request
err2 := errors.Wrap(err, "Error in getRateLimit")
ext.LogError(span, err2)
rl = &RateLimitResp{Error: err2.Error()}
// checkErrorCounter is updated within getRateLimit().
}

respChan <- respOut{rin.idx, rl}
return nil
}, reqIn{idx, req})
}
return &resp, nil

// Wait for all requests to be handled, then clean up.
_ = fan.Wait()
close(respChan)
respWg.Wait()

return resp, nil
}

// HealthCheck Returns the health of our instance.
Expand Down Expand Up @@ -682,11 +736,6 @@ func (s *V1Instance) GetPeer(ctx context.Context, key string) (*PeerClient, erro
defer funcTimer.ObserveDuration()
lockTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("V1Instance.GetPeer_RLock"))

if ctx.Err() != nil {
ext.LogError(span, ctx.Err())
return nil, ctx.Err()
}

s.peerMutex.RLock()
defer s.peerMutex.RUnlock()
tracing.LogInfo(span, "peerMutex.RLock()")
Expand Down Expand Up @@ -727,6 +776,7 @@ func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) {
overLimitCounter.Describe(ch)
checkCounter.Describe(ch)
poolWorkerQueueLength.Describe(ch)
batchSendDurationMetric.Describe(ch)
}

// Collect fetches metrics from the server for use by prometheus
Expand All @@ -742,6 +792,7 @@ func (s *V1Instance) Collect(ch chan<- prometheus.Metric) {
overLimitCounter.Collect(ch)
checkCounter.Collect(ch)
poolWorkerQueueLength.Collect(ch)
batchSendDurationMetric.Collect(ch)
}

// HasBehavior returns true if the provided behavior is set
Expand Down
2 changes: 2 additions & 0 deletions interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/mailgun/holster/v4/syncutil"
)

// Interval is a one-shot ticker. Call `Next()` to trigger the start of an
// interval. Read the `C` channel for tick event.
type Interval struct {
C chan struct{}
in chan struct{}
Expand Down
21 changes: 21 additions & 0 deletions interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,33 @@ package gubernator_test

import (
"testing"
"time"

"github.com/mailgun/gubernator/v2"
"github.com/mailgun/holster/v4/clock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestInterval(t *testing.T) {
t.Run("Happy path", func(t *testing.T) {
interval := gubernator.NewInterval(10 * time.Millisecond)
defer interval.Stop()
interval.Next()

assert.Empty(t, interval.C)

time.Sleep(10 * time.Millisecond)

// Wait for tick.
select {
case <-interval.C:
case <-time.After(100 * time.Millisecond):
require.Fail(t, "timeout")
}
})
}

func TestGregorianExpirationMinute(t *testing.T) {
// Validate calculation assumption
now := clock.Date(2019, clock.November, 11, 00, 00, 00, 00, clock.UTC)
Expand Down
32 changes: 16 additions & 16 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
}
return resp.rl, nil
case <-ctx2.Done():
ext.LogError(span, ctx2.Err())
return nil, ctx2.Err()
err := errors.Wrap(ctx2.Err(), "Error while waiting for response")
ext.LogError(span, err)
return nil, err
}
}

Expand Down Expand Up @@ -391,42 +392,38 @@ func (c *PeerClient) run() {
// Wrap logic in anon function so we can use defer.
func() {
// Use context of the request for opentracing span.
reqSpan, reqCtx := tracing.StartSpan(r.ctx)
defer reqSpan.Finish()
flushSpan, _ := tracing.StartNamedSpan(r.ctx, "Enqueue batched request")
defer flushSpan.Finish()
flushSpan.SetTag("peer.grpcAddress", c.conf.Info.GRPCAddress)

queue = append(queue, r)

// Send the queue if we reached our batch limit
if len(queue) == c.conf.Behavior.BatchLimit {
if len(queue) >= c.conf.Behavior.BatchLimit {
logMsg := "run() reached batch limit"
logrus.WithFields(logrus.Fields{
"queueLen": len(queue),
"batchLimit": c.conf.Behavior.BatchLimit,
}).Info(logMsg)
tracing.LogInfo(reqSpan, logMsg)
tracing.LogInfo(flushSpan, logMsg)

c.sendQueue(reqCtx, queue)
go c.sendQueue(ctx, queue)
queue = nil
interval.Next()
return
}

// If this is our first queued item since last send
// queue the next interval
// If this is our first enqueued item since last
// sendQueue, reset interval timer.
if len(queue) == 1 {
interval.Next()
}
}()

case <-interval.C:
if len(queue) != 0 {
intervalSpan, ctx2 := tracing.StartSpan(ctx)
intervalSpan.SetTag("queueLen", len(queue))
intervalSpan.SetTag("batchWait", c.conf.Behavior.BatchWait.String())

c.sendQueue(ctx2, queue)
go c.sendQueue(ctx, queue)
queue = nil

intervalSpan.Finish()
}
}
}
Expand All @@ -438,7 +435,10 @@ func (c *PeerClient) sendQueue(ctx context.Context, queue []*request) {
span, ctx := tracing.StartSpan(ctx)
defer span.Finish()
span.SetTag("queueLen", len(queue))
span.SetTag("peer.grpcAddress", c.conf.Info.GRPCAddress)

batchSendTimer := prometheus.NewTimer(batchSendDurationMetric.WithLabelValues(c.conf.Info.GRPCAddress))
defer batchSendTimer.ObserveDuration()
funcTimer := prometheus.NewTimer(funcTimeMetric.WithLabelValues("PeerClient.sendQueue"))
defer funcTimer.ObserveDuration()

Expand Down
Loading

0 comments on commit 47b7b2d

Please sign in to comment.