From 50e8c0803f346d2e8b9e0b0bfeae95d4aa3f4190 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Wed, 6 Feb 2019 16:07:41 -0600 Subject: [PATCH] Fixed race condition and peer client reference issue --- cache/lru.go | 2 ++ hash.go | 8 +++----- peers.go | 4 +++- server.go | 5 ++--- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cache/lru.go b/cache/lru.go index f1103f4f..6390198d 100644 --- a/cache/lru.go +++ b/cache/lru.go @@ -254,6 +254,8 @@ func (c *LRUCache) Size() int { // Returns stats about the current state of the cache func (c *LRUCache) Stats(clear bool) Stats { + c.mutex.Lock() + defer c.mutex.Unlock() if clear { defer func() { c.stats = Stats{} diff --git a/hash.go b/hash.go index 72788527..bcc48ec8 100644 --- a/hash.go +++ b/hash.go @@ -61,9 +61,9 @@ func (ch *ConsistantHash) GetPeer(host string) *PeerClient { } // Given a key, return the peer that key is assigned too -func (ch *ConsistantHash) Get(key string, peerInfo *PeerClient) error { +func (ch *ConsistantHash) Get(key string) (*PeerClient, error) { if ch.Size() == 0 { - return errors.New("unable to pick a peer; pool is empty") + return nil, errors.New("unable to pick a peer; pool is empty") } hash := int(ch.hashFunc([]byte(key))) @@ -76,7 +76,5 @@ func (ch *ConsistantHash) Get(key string, peerInfo *PeerClient) error { idx = 0 } - item := ch.peerMap[ch.peerKeys[idx]] - *peerInfo = *item - return nil + return ch.peerMap[ch.peerKeys[idx]], nil } diff --git a/peers.go b/peers.go index 50ad1cfe..823969da 100644 --- a/peers.go +++ b/peers.go @@ -2,6 +2,7 @@ package gubernator import ( "context" + "fmt" "github.com/mailgun/gubernator/pb" "github.com/pkg/errors" "google.golang.org/grpc" @@ -10,7 +11,7 @@ import ( type PeerPicker interface { GetPeer(host string) *PeerClient Peers() []*PeerClient - Get(string, *PeerClient) error + Get(string) (*PeerClient, error) New() PeerPicker Add(*PeerClient) Size() int @@ -58,6 +59,7 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *pb.RateLimitReque // Dial to a peer and initialize the GRPC client func (c *PeerClient) dialPeer() error { // TODO: Allow TLS connections + fmt.Printf("Dial: %s\n", c.host) var err error c.conn, err = grpc.Dial(c.host, grpc.WithInsecure()) diff --git a/server.go b/server.go index 98605c03..e1fe8a21 100644 --- a/server.go +++ b/server.go @@ -133,15 +133,14 @@ func (s *Server) GetRateLimits(ctx context.Context, reqs *pb.RateLimitRequestLis globalKey := req.Namespace + "_" + req.UniqueKey s.peerMutex.RLock() - var peer PeerClient - if err := s.conf.Picker.Get(globalKey, &peer); err != nil { + peer, err := s.conf.Picker.Get(globalKey) + if err != nil { s.peerMutex.RUnlock() return nil, errors.Wrapf(err, "while finding peer that owns key '%s'", globalKey) } s.peerMutex.RUnlock() var resp *pb.RateLimitResponse - var err error // If our server instance is the owner of this rate limit if peer.isOwner {