Skip to content
This repository was archived by the owner on Apr 19, 2024. It is now read-only.

Commit 63fd9ff

Browse files
authored
fix: mutex deadlocks in PeerClient (#223)
* fix race issuers in PeerClient * undo changes in comments * more fixes * final fixes * do not create 1 connection every time * add comment * remove comment * fix lint * add ctx back to shutdown * move conn.Close out of goroutine * fix lint * oops * make it blocking * put cap of 1 second to NewPeerClient connect * undo put cap of 1 second to NewPeerClient connect * fix another leak * on peer shutdown, clear all errors * add time.sleep to end of TestHealthCheck * use testutil.UntilPass * check health of every instance * use cluster.GetAllPeers
1 parent f740f2b commit 63fd9ff

8 files changed

+134
-203
lines changed

benchmark_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ func BenchmarkServer(b *testing.B) {
3333
require.NoError(b, err, "Error in conf.SetDefaults")
3434

3535
b.Run("GetPeerRateLimit() with no batching", func(b *testing.B) {
36-
client := guber.NewPeerClient(guber.PeerConfig{
36+
client, err := guber.NewPeerClient(guber.PeerConfig{
3737
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
3838
Behavior: conf.Behaviors,
3939
})
40+
if err != nil {
41+
b.Errorf("Error building client: %s", err)
42+
}
4043

4144
b.ResetTimer()
4245

functional_test.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"github.com/prometheus/common/model"
3737
"github.com/stretchr/testify/assert"
3838
"github.com/stretchr/testify/require"
39-
4039
"google.golang.org/grpc"
4140
"google.golang.org/grpc/credentials/insecure"
4241
json "google.golang.org/protobuf/encoding/protojson"
@@ -1618,6 +1617,16 @@ func TestHealthCheck(t *testing.T) {
16181617
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*15)
16191618
defer cancel()
16201619
require.NoError(t, cluster.Restart(ctx))
1620+
1621+
// wait for every peer instance to come back online
1622+
for _, peer := range cluster.GetPeers() {
1623+
peerClient, err := guber.DialV1Server(peer.GRPCAddress, nil)
1624+
require.NoError(t, err)
1625+
testutil.UntilPass(t, 10, clock.Millisecond*300, func(t testutil.TestingT) {
1626+
healthResp, err = peerClient.HealthCheck(context.Background(), &guber.HealthCheckReq{})
1627+
assert.Equal(t, "healthy", healthResp.GetStatus())
1628+
})
1629+
}
16211630
}
16221631

16231632
func TestLeakyBucketDivBug(t *testing.T) {
@@ -1723,9 +1732,10 @@ func TestGRPCGateway(t *testing.T) {
17231732

17241733
func TestGetPeerRateLimits(t *testing.T) {
17251734
ctx := context.Background()
1726-
peerClient := guber.NewPeerClient(guber.PeerConfig{
1735+
peerClient, err := guber.NewPeerClient(guber.PeerConfig{
17271736
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
17281737
})
1738+
require.NoError(t, err)
17291739

17301740
t.Run("Stable rate check request order", func(t *testing.T) {
17311741
// Ensure response order matches rate check request order.

global.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121

2222
"github.com/mailgun/holster/v4/syncutil"
23+
"github.com/pkg/errors"
2324
"github.com/prometheus/client_golang/prometheus"
2425
)
2526

@@ -31,7 +32,7 @@ type globalManager struct {
3132
wg syncutil.WaitGroup
3233
conf BehaviorConfig
3334
log FieldLogger
34-
instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager
35+
instance *V1Instance // TODO circular import? V1Instance also holds a reference to globalManager
3536
metricGlobalSendDuration prometheus.Summary
3637
metricBroadcastDuration prometheus.Summary
3738
metricBroadcastCounter *prometheus.CounterVec
@@ -249,8 +250,8 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
249250
cancel()
250251

251252
if err != nil {
252-
// Skip peers that are not in a ready state
253-
if !IsNotReady(err) {
253+
// Only log if it's an unknown error
254+
if !errors.Is(err, context.Canceled) && errors.Is(err, context.DeadlineExceeded) {
254255
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
255256
}
256257
}
@@ -260,6 +261,10 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
260261
fan.Wait()
261262
}
262263

264+
// Close stops all goroutines and shuts down all the peers.
263265
func (gm *globalManager) Close() {
264266
gm.wg.Stop()
267+
for _, peer := range gm.instance.GetPeerList() {
268+
_ = peer.Shutdown(context.Background())
269+
}
265270
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
go.opentelemetry.io/otel/trace v1.21.0
2525
go.uber.org/goleak v1.3.0
2626
golang.org/x/net v0.18.0
27+
golang.org/x/sync v0.3.0
2728
golang.org/x/time v0.3.0
2829
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b
2930
google.golang.org/grpc v1.59.0

go.sum

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gubernator.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) {
343343
// Make an RPC call to the peer that owns this rate limit
344344
r, err := req.Peer.GetPeerRateLimit(ctx, req.Req)
345345
if err != nil {
346-
if IsNotReady(err) {
346+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
347347
attempts++
348348
metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc()
349349
req.Peer, err = s.GetPeer(ctx, req.Key)
@@ -528,7 +528,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health
528528
localPeers := s.conf.LocalPicker.Peers()
529529
for _, peer := range localPeers {
530530
for _, errMsg := range peer.GetLastErr() {
531-
err := fmt.Errorf("Error returned from local peer.GetLastErr: %s", errMsg)
531+
err := fmt.Errorf("error returned from local peer.GetLastErr: %s", errMsg)
532532
span.RecordError(err)
533533
errs = append(errs, err.Error())
534534
}
@@ -538,7 +538,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health
538538
regionPeers := s.conf.RegionPicker.Peers()
539539
for _, peer := range regionPeers {
540540
for _, errMsg := range peer.GetLastErr() {
541-
err := fmt.Errorf("Error returned from region peer.GetLastErr: %s", errMsg)
541+
err := fmt.Errorf("error returned from region peer.GetLastErr: %s", errMsg)
542542
span.RecordError(err)
543543
errs = append(errs, err.Error())
544544
}
@@ -586,7 +586,8 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_
586586
return resp, nil
587587
}
588588

589-
// SetPeers is called by the implementor to indicate the pool of peers has changed
589+
// SetPeers replaces the peers and shuts down all the previous peers.
590+
// TODO this should return an error if we failed to connect to any of the new peers
590591
func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
591592
localPicker := s.conf.LocalPicker.New()
592593
regionPicker := s.conf.RegionPicker.New()
@@ -597,27 +598,37 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
597598
peer := s.conf.RegionPicker.GetByPeerInfo(info)
598599
// If we don't have an existing PeerClient create a new one
599600
if peer == nil {
600-
peer = NewPeerClient(PeerConfig{
601+
var err error
602+
peer, err = NewPeerClient(PeerConfig{
601603
TraceGRPC: s.conf.PeerTraceGRPC,
602604
Behavior: s.conf.Behaviors,
603605
TLS: s.conf.PeerTLS,
604606
Log: s.log,
605607
Info: info,
606608
})
609+
if err != nil {
610+
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
611+
return
612+
}
607613
}
608614
regionPicker.Add(peer)
609615
continue
610616
}
611617
// If we don't have an existing PeerClient create a new one
612618
peer := s.conf.LocalPicker.GetByPeerInfo(info)
613619
if peer == nil {
614-
peer = NewPeerClient(PeerConfig{
620+
var err error
621+
peer, err = NewPeerClient(PeerConfig{
615622
TraceGRPC: s.conf.PeerTraceGRPC,
616623
Behavior: s.conf.Behaviors,
617624
TLS: s.conf.PeerTLS,
618625
Log: s.log,
619626
Info: info,
620627
})
628+
if err != nil {
629+
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
630+
return
631+
}
621632
}
622633
localPicker.Add(peer)
623634
}

0 commit comments

Comments
 (0)