Skip to content

Commit 042bba3

Browse files
committed
fix: flaky test by skipping connectivity checks
By skipping connectivity checks we reduce the chances of simultaneously opening a stream that will block connection establishment. Context: libp2p/go-libp2p#2589
1 parent 509eee4 commit 042bba3

File tree

5 files changed

+47
-11
lines changed

5 files changed

+47
-11
lines changed

v2/config.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -395,16 +395,21 @@ type QueryConfig struct {
395395
// operation. A DefaultQuorum of 0 means that we search the network until
396396
// we have exhausted the keyspace.
397397
DefaultQuorum int
398+
399+
// SkipConnectivityCheck defines whether we do a connectivity check before
400+
// we add peers to the routing table.
401+
SkipConnectivityCheck bool
398402
}
399403

400404
// DefaultQueryConfig returns the default query configuration options for a DHT.
401405
func DefaultQueryConfig() *QueryConfig {
402406
return &QueryConfig{
403-
Concurrency: 3, // MAGIC
404-
Timeout: 5 * time.Minute, // MAGIC
405-
RequestConcurrency: 3, // MAGIC
406-
RequestTimeout: time.Minute, // MAGIC
407-
DefaultQuorum: 0, // MAGIC
407+
Concurrency: 3, // MAGIC
408+
Timeout: 5 * time.Minute, // MAGIC
409+
RequestConcurrency: 3, // MAGIC
410+
RequestTimeout: time.Minute, // MAGIC
411+
DefaultQuorum: 0, // MAGIC
412+
SkipConnectivityCheck: false,
408413
}
409414
}
410415

v2/dht.go

+1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func New(h host.Host, cfg *Config) (*DHT, error) {
128128
coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing")
129129
coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
130130
coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)
131+
coordCfg.Routing.IncludeSkipCheck = cfg.Query.SkipConnectivityCheck
131132

132133
rtr := &router{
133134
host: h,

v2/internal/coord/routing.go

+21
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type RoutingConfig struct {
6262
// ProbeCheckInterval is the time interval the behaviour should use between connectivity checks for the same node in the routing table.
6363
ProbeCheckInterval time.Duration
6464

65+
// IncludeSkipCheck indicates whether we perform connectivity checks before we add a peer to the routing table.
66+
IncludeSkipCheck bool
67+
6568
// IncludeQueueCapacity is the maximum number of nodes the behaviour should keep queued as candidates for inclusion in the routing table.
6669
IncludeQueueCapacity int
6770

@@ -268,6 +271,7 @@ func DefaultRoutingConfig() *RoutingConfig {
268271
ProbeRequestConcurrency: 3, // MAGIC
269272
ProbeCheckInterval: 6 * time.Hour, // MAGIC
270273

274+
IncludeSkipCheck: false,
271275
IncludeRequestConcurrency: 3, // MAGIC
272276
IncludeQueueCapacity: 128, // MAGIC
273277

@@ -307,6 +311,22 @@ type RoutingBehaviour struct {
307311
ready chan struct{}
308312
}
309313

314+
type Recording2SM[E any, S any] struct {
315+
State S
316+
Received E
317+
}
318+
319+
func NewRecording2SM[E any, S any](response S) *Recording2SM[E, S] {
320+
return &Recording2SM[E, S]{
321+
State: response,
322+
}
323+
}
324+
325+
func (r *Recording2SM[E, S]) Advance(ctx context.Context, e E) S {
326+
r.Received = e
327+
return r.State
328+
}
329+
310330
func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *RoutingConfig) (*RoutingBehaviour, error) {
311331
if cfg == nil {
312332
cfg = DefaultRoutingConfig()
@@ -331,6 +351,7 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key,
331351
includeCfg.Clock = cfg.Clock
332352
includeCfg.Tracer = cfg.Tracer
333353
includeCfg.Meter = cfg.Meter
354+
includeCfg.SkipCheck = cfg.IncludeSkipCheck
334355
includeCfg.Timeout = cfg.ConnectivityCheckTimeout
335356
includeCfg.QueueCapacity = cfg.IncludeQueueCapacity
336357
includeCfg.Concurrency = cfg.IncludeRequestConcurrency

v2/internal/coord/routing/include.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,9 @@ type IncludeConfig struct {
6161
Concurrency int // the maximum number of include checks that may be in progress at any one time
6262
Timeout time.Duration // the time to wait before terminating a check that is not making progress
6363
Clock clock.Clock // a clock that may replaced by a mock when testing
64-
65-
// Tracer is the tracer that should be used to trace execution.
66-
Tracer trace.Tracer
67-
68-
// Meter is the meter that should be used to record metrics.
69-
Meter metric.Meter
64+
SkipCheck bool // whether to skip connectivity checks and add any node passed to this state machine
65+
Tracer trace.Tracer // Tracer is the tracer that should be used to trace execution.
66+
Meter metric.Meter // Meter is the meter that should be used to record metrics.
7067
}
7168

7269
// Validate checks the configuration options and returns an error if any have invalid values.
@@ -124,6 +121,7 @@ func DefaultIncludeConfig() *IncludeConfig {
124121
Tracer: tele.NoopTracer(),
125122
Meter: tele.NoopMeter(),
126123

124+
SkipCheck: false,
127125
Concurrency: 3,
128126
Timeout: time.Minute,
129127
QueueCapacity: 128,
@@ -209,6 +207,15 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl
209207

210208
switch tev := ev.(type) {
211209
case *EventIncludeAddCandidate[K, N]:
210+
211+
if in.cfg.SkipCheck {
212+
if in.rt.AddNode(tev.NodeID) {
213+
return &StateIncludeRoutingUpdated[K, N]{NodeID: tev.NodeID}
214+
} else {
215+
return &StateIncludeIdle{}
216+
}
217+
}
218+
212219
// Ignore if already running a check
213220
_, checking := in.checks[key.HexString(tev.NodeID.Key())]
214221
if checking {

v2/routing_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,8 @@ func (suite *SearchValueQuorumTestSuite) SetupTest() {
682682

683683
cfg := DefaultConfig()
684684
cfg.Clock = clk
685+
cfg.Query.SkipConnectivityCheck = true
686+
685687
top := NewTopology(t)
686688

687689
// init privileged DHT server

0 commit comments

Comments
 (0)