Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
PIP-2675: Disable Batching and Force Global configuration (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge authored Oct 10, 2023
1 parent 835fddb commit d6d8f17
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 397 deletions.
112 changes: 112 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
linters:
# Please, do not use `enable-all`: it's deprecated and will be removed soon.
# Inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint.
# Full list of linters - https://golangci-lint.run/usage/linters
disable-all: true
enable:
- bodyclose # https://github.com/timakin/bodyclose
# - gomodguard
- errcheck # Mandatory. Do not disable.
# - gocritic
- goimports
# - gosec
- gosimple
- govet
# - noctx
- nolintlint
- ineffassign # Mandatory. Do not disable.
- staticcheck # Mandatory. Do not disable.
- stylecheck
- typecheck
- unused

# Other linters:
# - dogsled
# - dupl
# - exportloopref
# - exhaustive # e.g. missing cases in switch of type
# - funlen
# - gochecknoinits
# - gocognit
# - goconst
# - gocyclo
# - goerr113
# - gofmt
# - goprintffuncname
# - lll
# - misspell
# - nakedret
# - nlreturn
# - prealloc
# - revive
# - rowserrcheck
# - stylecheck
# - unconvert
# - unparam

linters-settings:
gocritic:
enabled-tags:
- diagnostic
- experimental
- opinionated
- performance
- style
disabled-checks:
- dupImport # https://github.com/go-critic/go-critic/issues/845
- whyNoLint # checked by nolintlint linter
- hugeParam # TODO(vtopc): configure(80 bytes is probably not so much) and enable.
- rangeValCopy # TODO(vtopc): configure(disable for tests) and enable.

errcheck:
# List of functions to exclude from checking, where each entry is a single function to exclude.
# See https://github.com/kisielk/errcheck#excluding-functions for details.
exclude-functions:
- (io.Closer).Close
- (io.ReadCloser).Close

govet:
enable-all: true
disable:
- shadow
- fieldalignment

gomodguard:
blocked:
# List of blocked modules.
# Default: []
modules:
- github.com/golang/protobuf:
recommendations:
- google.golang.org/protobuf
reason: "see https://developers.google.com/protocol-buffers/docs/reference/go/faq#modules"
- github.com/pkg/errors:
recommendations:
- errors
- github.com/mailgun/errors
reason: "Deprecated"

stylecheck:
# Select the Go version to target.
# Default: 1.13
go: "1.19"
# https://staticcheck.io/docs/options#checks
checks: ["all"]

issues:
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0

# Maximum count of issues with the same text. Set to 0 to disable. Default is 3.
max-same-issues: 50

run:
# include test files or not, default is true
tests: true

# Timeout for analysis, e.g. 30s, 5m.
# Default: 1m
timeout: 5m

skip-dirs:
- googleapis
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
.DEFAULT_GOAL := release
VERSION=$(shell cat version)
LDFLAGS="-X main.Version=$(VERSION)"
GOLINT = $(GOPATH)/bin/golangci-lint
GOLANGCI_LINT = $(GOPATH)/bin/golangci-lint
GOLANGCI_LINT_VERSION = 1.54.2

$(GOLINT):
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.54.1
$(GOLANGCI_LINT):
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin $(GOLANGCI_LINT_VERSION)

.PHONY: lint
lint: $(GOLINT)
$(GOLINT) run --out-format tab --path-prefix `pwd`
lint: $(GOLANGCI_LINT)
$(GOLANGCI_LINT) run

.PHONY: test
test:
Expand Down
35 changes: 10 additions & 25 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -29,9 +28,6 @@ import (

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucket")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()
Expand All @@ -52,7 +48,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if ok {
if item.Value == nil {
msgPart := "tokenBucket: Invalid cache item; Value is nil"
span.AddEvent(msgPart, trace.WithAttributes(
trace.SpanFromContext(ctx).AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
attribute.String("name", r.Name),
Expand All @@ -61,7 +57,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ok = false
} else if item.Key != hashKey {
msgPart := "tokenBucket: Invalid cache item; key mismatch"
span.AddEvent(msgPart, trace.WithAttributes(
trace.SpanFromContext(ctx).AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
attribute.String("name", r.Name),
Expand Down Expand Up @@ -95,7 +91,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
span.AddEvent("Client switched algorithms; perhaps due to a migration?")
trace.SpanFromContext(ctx).AddEvent("Client switched algorithms; perhaps due to a migration?")

c.Remove(hashKey)

Expand Down Expand Up @@ -125,6 +121,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If the duration config changed, update the new ExpireAt.
if t.Duration != r.Duration {
span := trace.SpanFromContext(ctx)
span.AddEvent("Duration changed")
expire := t.CreatedAt + r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand Down Expand Up @@ -163,7 +160,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If we are already at the limit.
if rl.Remaining == 0 && r.Hits > 0 {
span.AddEvent("Already over the limit")
trace.SpanFromContext(ctx).AddEvent("Already over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
t.Status = rl.Status
Expand All @@ -172,7 +169,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder.
if t.Remaining == r.Hits {
span.AddEvent("At the limit")
trace.SpanFromContext(ctx).AddEvent("At the limit")
t.Remaining = 0
rl.Remaining = 0
return rl, nil
Expand All @@ -181,13 +178,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// If requested is more than available, then return over the limit
// without updating the cache.
if r.Hits > t.Remaining {
span.AddEvent("Over the limit")
trace.SpanFromContext(ctx).AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
return rl, nil
}

span.AddEvent("Under the limit")
t.Remaining -= r.Hits
rl.Remaining = t.Remaining
return rl, nil
Expand All @@ -199,10 +195,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucketNewItem")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

now := MillisecondNow()
expire := now + r.Duration

Expand Down Expand Up @@ -237,7 +229,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Client could be requesting that we always return OVER_LIMIT.
if r.Hits > r.Limit {
span.AddEvent("Over the limit")
trace.SpanFromContext(ctx).AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
rl.Remaining = r.Limit
Expand All @@ -255,10 +247,6 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucket")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

Expand All @@ -284,7 +272,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if ok {
if item.Value == nil {
msgPart := "leakyBucket: Invalid cache item; Value is nil"
span.AddEvent(msgPart, trace.WithAttributes(
trace.SpanFromContext(ctx).AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
attribute.String("name", r.Name),
Expand All @@ -293,7 +281,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ok = false
} else if item.Key != hashKey {
msgPart := "leakyBucket: Invalid cache item; key mismatch"
span.AddEvent(msgPart, trace.WithAttributes(
trace.SpanFromContext(ctx).AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
attribute.String("name", r.Name),
Expand Down Expand Up @@ -425,9 +413,6 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucketNewItem")
defer func() { tracing.EndScope(ctx, err) }()

now := MillisecondNow()
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
Expand Down
1 change: 1 addition & 0 deletions cmd/healthcheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func main() {
if err != nil {
panic(err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ type BehaviorConfig struct {
BatchWait time.Duration
// The max number of requests we can batch into a single peer request
BatchLimit int
// DisableBatching disables batching behavior for all ratelimits.
DisableBatching bool

// How long a non-owning peer should wait before syncing hits to the owning peer
GlobalSyncWait time.Duration
// How long we should wait for global sync responses from peers
GlobalTimeout time.Duration
// The max number of global updates we can batch into a single peer request
GlobalBatchLimit int
// ForceGlobal forces global mode on all rate limit checks.
ForceGlobal bool
}

// Config for a gubernator instance
Expand Down Expand Up @@ -120,12 +124,14 @@ func (c *Config) SetDefaults() error {

setter.SetDefault(&c.Behaviors.GlobalTimeout, time.Millisecond*500)
setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize)
setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Microsecond*500)
setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*500)

setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas))
setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil))

setter.SetDefault(&c.CacheSize, 50_000)
setter.SetDefault(&c.Workers, runtime.NumCPU())
setter.SetDefault(&c.Logger, logrus.New().WithField("category", "gubernator"))

if c.CacheFactory == nil {
c.CacheFactory = func(maxSize int) Cache {
Expand Down Expand Up @@ -333,10 +339,12 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
setter.SetDefault(&conf.Behaviors.BatchTimeout, getEnvDuration(log, "GUBER_BATCH_TIMEOUT"))
setter.SetDefault(&conf.Behaviors.BatchLimit, getEnvInteger(log, "GUBER_BATCH_LIMIT"))
setter.SetDefault(&conf.Behaviors.BatchWait, getEnvDuration(log, "GUBER_BATCH_WAIT"))
setter.SetDefault(&conf.Behaviors.DisableBatching, getEnvBool(log, "GUBER_DISABLE_BATCHING"))

setter.SetDefault(&conf.Behaviors.GlobalTimeout, getEnvDuration(log, "GUBER_GLOBAL_TIMEOUT"))
setter.SetDefault(&conf.Behaviors.GlobalBatchLimit, getEnvInteger(log, "GUBER_GLOBAL_BATCH_LIMIT"))
setter.SetDefault(&conf.Behaviors.GlobalSyncWait, getEnvDuration(log, "GUBER_GLOBAL_SYNC_WAIT"))
setter.SetDefault(&conf.Behaviors.ForceGlobal, getEnvBool(log, "GUBER_FORCE_GLOBAL"))

// TLS Config
if anyHasPrefix("GUBER_TLS_", os.Environ()) {
Expand Down
31 changes: 20 additions & 11 deletions docs/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@ Finally, configure a Prometheus job to scrape the server's `/metrics` URI.

| Metric | Type | Description |
| -------------------------------------- | ------- | ----------- |
| `gubernator_async_durations` | Summary | The timings of GLOBAL async sends in seconds. |
| `gubernator_asyncrequest_retries` | Counter | The count of retries occurred in asyncRequests() forwarding a request to another peer. |
| `gubernator_batch_send_duration` | Summary | The timings of batch send operations to a remote peer. |
| `gubernator_broadcast_durations` | Summary | The timings of GLOBAL broadcasts to peers in seconds. |
| `gubernator_cache_access_count` | Counter | The count of LRUCache accesses during rate checks. |
| `gubernator_cache_size` | Gauge | The number of items in LRU Cache which holds the rate limits. |
| `gubernator_check_counter` | Counter | The number of rate limits checked. |
| `gubernator_check_error_counter` | Counter | The number of errors while checking rate limits. |
| `gubernator_concurrent_checks_counter` | Summary | 99th quantile of concurrent rate checks. This includes rate checks processed locally and forwarded to other peers. |
| `gubernator_func_duration` | Summary | The 99th quantile of key function timings in seconds. |
| `gubernator_getratelimit_counter` | Counter | The count of getRateLimit() calls. Label \"calltype\" may be \"local\" for calls handled by the same peer, \"forward\" for calls forwarded to another peer, or \"global\" for global rate limits. |
| `gubernator_command_counter` | Counter | The count of commands processed by each worker in WorkerPool. |
| `gubernator_concurrent_checks_counter` | Gauge | The number of concurrent GetRateLimits API calls. |
| `gubernator_func_duration` | Summary | The timings of key functions in Gubernator in seconds. |
| `gubernator_getratelimit_counter` | Counter | The count of getLocalRateLimit() calls. Label \"calltype\" may be \"local\" for calls handled by the same peer, \"forward\" for calls forwarded to another peer, or \"global\" for global rate limits. |
| `gubernator_grpc_request_counts` | Counter | The count of gRPC requests. |
| `gubernator_grpc_request_duration` | Summary | The 99th quantile timings of gRPC requests in seconds. |
| `gubernator_grpc_request_duration` | Summary | The timings of gRPC requests in seconds. |
| `gubernator_over_limit_counter` | Counter | The number of rate limit checks that are over the limit. |
| `gubernator_pool_queue_length` | Summary | The 99th quantile of rate check requests queued up in GubernatorPool. The is the work queue for local rate checks. |
| `gubernator_queue_length` | Summary | The 99th quantile of rate check requests queued up for batching to other peers by getPeerRateLimitsBatch(). This is the work queue for remote rate checks. Label "peerAddr" indicates queued requests to that peer. |
| `gubernator_worker_queue_length` | Gauge | The count of requests queued up in WorkerPool. |

### Global Behavior
| Metric | Type | Description |
| -------------------------------------- | ------- | ----------- |
| `gubernator_broadcast_counter` | Counter | The count of broadcasts. |
| `gubernator_broadcast_duration` | Summary | The timings of GLOBAL broadcasts to peers in seconds. |
| `gubernator_global_queue_length` | Gauge | The count of requests queued up for global broadcast. This is only used for GetRateLimit requests using global behavior. |

### Batch Behavior
| Metric | Type | Description |
| -------------------------------------- | ------- | ----------- |
| `gubernator_batch_queue_length` | Gauge | The getRateLimitsBatch() queue length in PeerClient. This represents rate checks queued by for batching to a remote peer. |
| `gubernator_batch_send_duration` | Summary | The timings of batch send operations to a remote peer. |
| `gubernator_batch_send_retries` | Counter | The count of retries occurred in asyncRequests() forwarding a request to another peer. |
Loading

0 comments on commit d6d8f17

Please sign in to comment.