Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
fixed registration race on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Feb 6, 2019
1 parent 50e8c08 commit 4e56e94
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 39 deletions.
5 changes: 4 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
)

func BenchmarkServer_GetRateLimitByKey(b *testing.B) {
client := gubernator.NewPeerClient(gubernator.RandomPeer(peers))
client, err := gubernator.NewPeerClient(gubernator.RandomPeer(peers))
if err != nil {
b.Errorf("NewPeerClient err: %s", err)
}

b.Run("GetPeerRateLimits", func(b *testing.B) {
for n := 0; n < b.N; n++ {
Expand Down
3 changes: 2 additions & 1 deletion cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ func (c *LRUCache) inspectAndResize() {
ele = ele.Prev()
}

c.log.Debugf("Inspected cache [Size: %d, Expired: %d, Inspected: %d]", c.cacheSize, expired, inspectSize)
c.log.Debugf("Inspected cache [Size: %d, Cap: %d, Expired: %d, Inspected: %d]", c.Size(), c.cacheSize, expired, inspectSize)

// If all the elements expired, we can shrink the cache size
if expired == inspectSize {
// TODO: Will never be called, since this code doesn't execute unless the cache is at capacity
// Increase the cache size by 30%
newSize := c.cacheSize - int(float32(c.cacheSize)*0.30)
// Don't shrink beyond the initial cache size
Expand Down
28 changes: 18 additions & 10 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/davecgh/go-spew/spew"
"github.com/mailgun/gubernator"
"github.com/mailgun/holster"
)

func checkErr(err error) {
Expand All @@ -29,24 +30,31 @@ func main() {
// Generate a selection of rate limits with random limits
var rateLimits []*gubernator.Request

for i := 0; i < 10000; i++ {
for i := 0; i < 2000; i++ {
rateLimits = append(rateLimits, &gubernator.Request{
Namespace: fmt.Sprintf("ID-%d", i),
UniqueKey: gubernator.RandomString(10),
Hits: 1,
Limit: randInt(10, 300),
Duration: time.Duration(randInt(500, 6000)),
Limit: randInt(1, 10),
Duration: time.Duration(randInt(int(time.Millisecond*500), int(time.Second*6))),
Algorithm: gubernator.TokenBucket,
})
}

for _, rateLimit := range rateLimits {
// Now hit our cluster with the rate limits
resp, err := client.GetRateLimit(context.Background(), rateLimit)
checkErr(err)

if resp.Status == gubernator.OverLimit {
spew.Dump(resp)
fan := holster.NewFanOut(10)
for {
for _, rateLimit := range rateLimits {
fan.Run(func(obj interface{}) error {
r := obj.(*gubernator.Request)
// Now hit our cluster with the rate limits
resp, err := client.GetRateLimit(context.Background(), r)
checkErr(err)

if resp.Status == gubernator.OverLimit {
spew.Dump(resp)
}
return nil
}, rateLimit)
}
}
}
4 changes: 2 additions & 2 deletions etc/server-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ statsd:
# Gubernator config
ListenAddress: localhost:9040
LRUCache:
MaxCacheSize: 5000
InitialCacheSize: 100
MaxCacheSize: 1000
InitialCacheSize: 50
4 changes: 2 additions & 2 deletions etc/server-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ statsd:
# Gubernator config
ListenAddress: localhost:9041
LRUCache:
MaxCacheSize: 5000
InitialCacheSize: 100
MaxCacheSize: 1000
InitialCacheSize: 50
2 changes: 1 addition & 1 deletion etc/server-3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ statsd:
# Gubernator config
ListenAddress: localhost:9042
LRUCache:
MaxCacheSize: 100
MaxCacheSize: 1000
InitialCacheSize: 50
8 changes: 4 additions & 4 deletions metrics/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewStatsdMetrics(client StatsdClient) *StatsdMetrics {

func (sd *StatsdMetrics) Start() error {
sd.reqChan = make(chan *RequestStats, 10000)
methods := make(map[string]RequestStats)
methods := make(map[string]*RequestStats)

tick := time.NewTicker(time.Second)
sd.wg.Until(func(done chan struct{}) bool {
Expand All @@ -60,7 +60,7 @@ func (sd *StatsdMetrics) Start() error {
return true
}
stat.Called = 1
methods[stat.Method] = *stat
methods[stat.Method] = stat
case <-tick.C:
// Emit stats about GRPC method calls
for k, v := range methods {
Expand All @@ -70,12 +70,12 @@ func (sd *StatsdMetrics) Start() error {
sd.client.Inc(fmt.Sprintf("api.%s.failed", method), v.Failed)
}
// Clear the current method stats
methods = make(map[string]RequestStats, len(methods))
methods = make(map[string]*RequestStats, len(methods))

// Emit stats about our cache
if sd.cacheStats != nil {
stats := sd.cacheStats.Stats(true)
sd.client.Inc("cache.size", stats.Size)
sd.client.Gauge("cache.size", stats.Size)
sd.client.Inc("cache.hit", stats.Hit)
sd.client.Inc("cache.miss", stats.Miss)
}
Expand Down
17 changes: 5 additions & 12 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package gubernator

import (
"context"
"fmt"
"github.com/mailgun/gubernator/pb"
"github.com/pkg/errors"
"google.golang.org/grpc"
"sync"
)

type PeerPicker interface {
Expand All @@ -22,25 +22,21 @@ type PeerClient struct {
conn *grpc.ClientConn
host string
isOwner bool // true if this peer refers to this server instance
mutex sync.Mutex
}

func NewPeerClient(host string) *PeerClient {
return &PeerClient{
func NewPeerClient(host string) (*PeerClient, error) {
c := &PeerClient{
host: host,
}
return c, c.dialPeer()
}

// TODO: Eventually this will take multiple requests made concurrently and batch them into a single request
// TODO: This will reduce the propagation of thundering heard effect to peers in our cluster.
// TODO: When implementing batching, allow upstream context cancels to abort processing for an request, not the entire
// TODO: batch request
func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *pb.RateLimitRequest) (*pb.RateLimitResponse, error) {
if c.conn == nil {
if err := c.dialPeer(); err != nil {
return nil, err
}
}

resp, err := c.client.GetPeerRateLimits(ctx, &pb.PeerRateLimitRequest{
RateLimits: []*pb.RateLimitRequest{r},
})
Expand All @@ -58,9 +54,6 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *pb.RateLimitReque

// Dial to a peer and initialize the GRPC client
func (c *PeerClient) dialPeer() error {
// TODO: Allow TLS connections
fmt.Printf("Dial: %s\n", c.host)

var err error
c.conn, err = grpc.Dial(c.host, grpc.WithInsecure())
if err != nil {
Expand Down
24 changes: 18 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ func (s *Server) Start() error {
return errors.Wrap(err, "failed to start metrics collector")
}

if err := s.conf.PeerSyncer.Start(s.conf.AdvertiseAddress); err != nil {
return errors.Wrap(err, "failed to sync configs with other peers")
}

// Start the GRPC server
errs := make(chan error)
go func() {
Expand All @@ -98,7 +94,18 @@ func (s *Server) Start() error {
})
}()

return <-errs
// Wait until the server starts or errors
err := <-errs
if err != nil {
return err
}

// Now that our service is up, register our server
if err := s.conf.PeerSyncer.Start(s.conf.AdvertiseAddress); err != nil {
return errors.Wrap(err, "failed to sync configs with other peers")
}

return nil
}

func (s *Server) Stop() {
Expand Down Expand Up @@ -209,7 +216,12 @@ func (s *Server) updatePeers(conf *PeerConfig) {
picker := s.conf.Picker.New()

for _, peer := range conf.Peers {
peerInfo := NewPeerClient(peer)
peerInfo, err := NewPeerClient(peer)
if err != nil {
// TODO: Notify someone that we are unhealthy
s.log.Errorf("Unable to connect to peer '%s'; skip add to consistent hash", peer)
continue
}

if info := s.conf.Picker.GetPeer(peer); info != nil {
peerInfo = info
Expand Down

0 comments on commit 4e56e94

Please sign in to comment.