Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Added trace context propagation and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Aug 4, 2023
1 parent 0c1daa9 commit bcb8447
Show file tree
Hide file tree
Showing 24 changed files with 893 additions and 1,333 deletions.
81 changes: 44 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,48 +225,43 @@ POST /v1/GetRateLimits
Example Payload
```json
{
"requests":[
{
"name": "requests_per_sec",
"unique_key": "account.id=1234",
"hits": 1,
"duration": 60000,
"limit": 10
}
]
"requests": [
{
"name": "requests_per_sec",
"uniqueKey": "account:12345",
"hits": "1",
"limit": "10",
"duration": "1000"
}
]
}
```

Example response:

```json
{
"responses":[
"responses": [
{
"status": 0,
"status": "UNDER_LIMIT",
"limit": "10",
"remaining": "7",
"reset_time": "1551309219226"
"remaining": "9",
"reset_time": "1690855128786",
"error": "",
"metadata": {
"owner": "gubernator:81"
}
}
]
}
```

### Deployment
NOTE: Gubernator uses `etcd` or Kubernetes or round-robin DNS to discover peers and
NOTE: Gubernator uses `etcd`, Kubernetes or round-robin DNS to discover peers and
establish a cluster. If you don't have either, the docker-compose method is the
simplest way to try gubernator out.

##### Docker with existing etcd cluster
```bash
$ docker run -p 8081:81 -p 9080:80 -e GUBER_ETCD_ENDPOINTS=etcd1:2379,etcd2:2379 \
ghcr.io/mailgun/gubernator:latest

# Hit the HTTP API at localhost:9080
$ curl http://localhost:9080/v1/HealthCheck
```

##### Docker compose
##### Docker compose cluster
The docker compose file uses member-list for peer discovery
```bash
# Download the docker-compose file
Expand All @@ -280,6 +275,29 @@ $ docker-compose up -d

# Hit the HTTP API at localhost:9080 (GRPC is at 9081)
$ curl http://localhost:9080/v1/HealthCheck

# Make a rate limit request
$ curl http://localhost:9080/v1/GetRateLimits \
--header 'Content-Type: application/json' \
--data '{
"requests": [
{
"name": "requests_per_sec",
"uniqueKey": "account:12345",
"hits": "1",
"limit": "10",
"duration": "1000"
}
]
}'
```
##### Docker with existing etcd cluster
```bash
$ docker run -p 8081:81 -p 9080:80 -e GUBER_ETCD_ENDPOINTS=etcd1:2379,etcd2:2379 \
ghcr.io/mailgun/gubernator:latest

# Hit the HTTP API at localhost:9080
$ curl http://localhost:9080/v1/HealthCheck
```

##### Kubernetes
Expand All @@ -299,16 +317,6 @@ If your DNS service supports auto-registration, for example AWS Route53 service
you can use same fully-qualified domain name to both let your business logic containers or
instances to find `gubernator` and for `gubernator` containers/instances to find each other.

###### Local Testing

1. terminal, start the environment: `docker-compose -f docker-compose-dns.yml up --build`
2. terminal, drop into test container: `docker exec -it gubernator_shell_1 /bin/sh`
3. in the test container: `curl -v gubernator/v1/GetRateLimits --data '{"requests":[{"name":"requests_per_sec","unique_key":"account.id=1234","hits":1,"duration":60000,"limit":10}]}'

###### AWS

TBD

##### TLS
Gubernator supports TLS for both HTTP and GRPC connections. You can see an example with
self signed certs by running `docker-compose-tls.yaml`
Expand All @@ -334,6 +342,5 @@ workings of gubernator.
Gubernator publishes Prometheus metrics for realtime monitoring. See
[prometheus.md](prometheus.md) for details.

## Jaeger Tracing
Gubernator supports tracing using Jaeger Tracing tools. See
[jaegertracing.md](jaegertracing.md) for details.
## OpenTelemetry Tracing (OTEL)
Gubernator supports OpenTelemetry. See [tracing.md](tracing.md) for details.
24 changes: 8 additions & 16 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import (

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartScopeDebug(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucket")
defer tracing.EndScope(ctx, err)
span := trace.SpanFromContext(ctx)

tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
Expand Down Expand Up @@ -201,10 +199,8 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartScopeDebug(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucketNewItem")
defer tracing.EndScope(ctx, err)
span := trace.SpanFromContext(ctx)

now := MillisecondNow()
Expand Down Expand Up @@ -259,10 +255,8 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartScopeDebug(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucket")
defer tracing.EndScope(ctx, err)
span := trace.SpanFromContext(ctx)

leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
Expand Down Expand Up @@ -431,10 +425,8 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartScopeDebug(ctx)
defer func() {
tracing.EndScope(ctx, err)
}()
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucketNewItem")
defer tracing.EndScope(ctx, err)

now := MillisecondNow()
duration := r.Duration
Expand Down
17 changes: 11 additions & 6 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/mailgun/holster/v4/ctxutil"
"github.com/mailgun/holster/v4/tracing"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"k8s.io/klog"
)

Expand All @@ -53,27 +55,30 @@ func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")

// Initialize tracing.
res, err := tracing.NewResource("gubernator", Version)
res, err := tracing.NewResource("gubernator", Version, resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceInstanceID(gubernator.GetInstanceID()),
))
if err != nil {
log.WithError(err).Fatal("Error in tracing.NewResource")
log.WithError(err).Fatal("during tracing.NewResource()")
}

// Initialize tracing.
ctx := context.Background()
err = tracing.InitTracing(ctx,
"github.com/mailgun/gubernator/v2",
tracing.WithLevel(gubernator.GetTracingLevel()),
tracing.WithResource(res),
)
if err != nil {
log.WithError(err).Warn("Error in tracing.InitTracing")
log.WithError(err).Fatal("during tracing.InitTracing()")
}

// Read our config from the environment or optional environment config file
conf, err := gubernator.SetupDaemonConfig(logrus.StandardLogger(), configFile)
checkErr(err, "while getting config")

ctx, cancel := ctxutil.WithTimeout(ctx, clock.Second*10)
defer cancel()

// Start the daemon
daemon, err := gubernator.SpawnDaemon(ctx, conf)
Expand All @@ -86,7 +91,7 @@ func main() {
for range c {
log.Info("caught signal; shutting down")
daemon.Close()
tracing.CloseTracing(context.Background())
_ = tracing.CloseTracing(context.Background())
exit(0)
}
}
Expand Down
89 changes: 88 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package gubernator

import (
"bufio"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"fmt"
"io/ioutil"
"net"
Expand All @@ -32,11 +35,13 @@ import (
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/setter"
"github.com/mailgun/holster/v4/slice"
"github.com/mailgun/holster/v4/tracing"
"github.com/pkg/errors"
"github.com/segmentio/fasthash/fnv1"
"github.com/segmentio/fasthash/fnv1a"
"github.com/sirupsen/logrus"
etcd "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -230,8 +235,15 @@ type DaemonConfig struct {
// attempt to build a complete TLS config if one is not provided.
TLS *TLSConfig

// (Optional) Metrics Flags which enable or disable collection of some types of metrics
// (Optional) Metrics Flags which enable or disable a collection of some metric types
MetricFlags MetricFlags

// (Optional) Instance ID which is a unique id that identifies this instance of gubernator
InstanceID string

// (Optional) TraceLevel sets the tracing level, this controls the number of spans included in a single trace.
// Valid options are (tracing.InfoLevel, tracing.DebugLevel) Defaults to tracing.InfoLevel
TraceLevel tracing.Level
}

func (d *DaemonConfig) ClientTLS() *tls.Config {
Expand Down Expand Up @@ -297,6 +309,7 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
fmt.Sprintf("%s:81", LocalHost()))
setter.SetDefault(&conf.HTTPListenAddress, os.Getenv("GUBER_HTTP_ADDRESS"),
fmt.Sprintf("%s:80", LocalHost()))
setter.SetDefault(&conf.InstanceID, GetInstanceID())
setter.SetDefault(&conf.HTTPStatusListenAddress, os.Getenv("GUBER_STATUS_HTTP_ADDRESS"), "")
setter.SetDefault(&conf.GRPCMaxConnectionAgeSeconds, getEnvInteger(log, "GUBER_GRPC_MAX_CONN_AGE_SEC"), 0)
setter.SetDefault(&conf.CacheSize, getEnvInteger(log, "GUBER_CACHE_SIZE"), 50_000)
Expand Down Expand Up @@ -457,6 +470,8 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
log.Debug(spew.Sdump(conf))
}

setter.SetDefault(&conf.TraceLevel, GetTracingLevel())

return conf, nil
}

Expand Down Expand Up @@ -659,3 +674,75 @@ func validHash64Keys(m map[string]HashString64) string {
}
return strings.Join(rs, ",")
}

// GetInstanceID attempts to source a unique id from the environment,
// if none is found, then it will generate a random instance id.
func GetInstanceID() string {
var id string

// Use in order, if the result is "" (empty string) then the next option will be used
// 1. The environment variable `GUBER_INSTANCE_ID`
// 2. The id of the docker container we are running in
// 3. Generate a random id
setter.SetDefault(&id, os.Getenv("GUBER_INSTANCE_ID"), getDockerCID(), generateID())

return id
}

func generateID() string {
token := make([]byte, 5)
_, _ = rand.Read(token)
return hex.EncodeToString(token)
}

func getDockerCID() string {
f, err := os.Open("/proc/self/cgroup")
if err != nil {
return ""
}
defer f.Close()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
parts := strings.Split(line, "/docker/")
if len(parts) != 2 {
continue
}

fullDockerCID := parts[1]
return fullDockerCID[:12]
}
return ""
}

func GetTracingLevel() tracing.Level {
s := os.Getenv("GUBER_TRACING_LEVEL")
lvl, ok := map[string]tracing.Level{
"ERROR": tracing.ErrorLevel,
"INFO": tracing.InfoLevel,
"DEBUG": tracing.DebugLevel,
}[s]
if ok {
return lvl
}
return tracing.InfoLevel
}

var TraceLevelInfoFilter = otelgrpc.Filter(func(info *otelgrpc.InterceptorInfo) bool {
if info.UnaryServerInfo != nil {
if info.UnaryServerInfo.FullMethod == "/pb.gubernator.PeersV1/GetPeerRateLimits" {
return false
}
if info.UnaryServerInfo.FullMethod == "/pb.gubernator.V1/HealthCheck" {
return false
}
}
if info.Method == "/pb.gubernator.PeersV1/GetPeerRateLimits" {
return false
}
if info.Method == "/pb.gubernator.V1/HealthCheck" {
return false
}
return true
})
Loading

0 comments on commit bcb8447

Please sign in to comment.