diff --git a/Makefile b/Makefile index 0b1c82a6..fe9f2da1 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,9 @@ VERSION=$(shell cat version) LDFLAGS="-X main.Version=$(VERSION)" +test: + go test ./... -race -count=1 + docker: docker build --build-arg VERSION=$(VERSION) -t thrawn01/gubernator:$(VERSION) . docker tag thrawn01/gubernator:$(VERSION) thrawn01/gubernator:latest diff --git a/global.go b/global.go index c7fcd289..f07bb7dd 100644 --- a/global.go +++ b/global.go @@ -195,13 +195,15 @@ func (gm *globalManager) updatePeers(updates map[string]*RateLimitReq) { var req UpdatePeerGlobalsReq start := time.Now() - for _, rl := range updates { + for _, r := range updates { + // Copy the original since we removing the GLOBAL behavior + rl := *r // We are only sending the status of the rate limit so // we clear the behavior flag so we don't get queued for update again. SetBehavior(&rl.Behavior, Behavior_GLOBAL, false) rl.Hits = 0 - status, err := gm.instance.getRateLimit(rl) + status, err := gm.instance.getRateLimit(&rl) if err != nil { gm.log.WithError(err).Errorf("while sending global updates to peers for: '%s'", rl.HashKey()) continue diff --git a/interval.go b/interval.go index 97121fd3..c6985ad3 100644 --- a/interval.go +++ b/interval.go @@ -38,7 +38,7 @@ func NewInterval(d time.Duration) *Interval { C: make(chan struct{}, 1), in: make(chan struct{}), } - go i.run(d) + i.run(d) return &i } diff --git a/peers.go b/peers.go index 92ff3cba..4cd1ea98 100644 --- a/peers.go +++ b/peers.go @@ -99,11 +99,15 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits c.mutex.RUnlock() return nil, ErrClosing } - c.mutex.RUnlock() + // NOTE: This must be done within the RLock since calling Wait() in Shutdown() causes + // a race condition if called within a separate go routine if the internal wg is `0` + // when Wait() is called then Add(1) is called concurrently. c.wg.Add(1) defer c.wg.Done() + c.mutex.RUnlock() + resp, err := c.client.GetPeerRateLimits(ctx, r) if err != nil { return nil, err @@ -123,11 +127,13 @@ func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals c.mutex.RUnlock() return nil, ErrClosing } - c.mutex.RUnlock() + // See NOTE above about RLock and wg.Add(1) c.wg.Add(1) defer c.wg.Done() + c.mutex.RUnlock() + return c.client.UpdatePeerGlobals(ctx, r) } @@ -143,12 +149,13 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq // Enqueue the request to be sent c.queue <- &req - // Unlock to prevent the chan from being closed - c.mutex.RUnlock() - + // See NOTE above about RLock and wg.Add(1) c.wg.Add(1) defer c.wg.Done() + // Unlock to prevent the chan from being closed + c.mutex.RUnlock() + // Wait for a response or context cancel select { case resp := <-req.resp: @@ -260,13 +267,12 @@ func (c *PeerClient) Shutdown(ctx context.Context) error { c.mutex.Unlock() return nil } + defer c.mutex.Unlock() c.isClosing = true // We need to close the chan here to prevent a possible race close(c.queue) - c.mutex.Unlock() - defer func() { if c.conn != nil { c.conn.Close()