Skip to content

Commit af0161e

Browse files
StebalienguillaumemichelsukunrtMarcoPolo
authoredMay 8, 2024··
Add a "Limited" network connectivity state (#2696)
Rename the Transient state on connection to Limited. This is more appropriate and also doesn't conflict with the transient resource manager scope. Adds a Limited connectedness state for peers connected to us over Limited connections. This allows users to ignore such peers if they are interested in only peers connected to us over Unlimited connections. For some peers who disconnect before we sent a Connectedness event, we will now only send a Disconnected event. --------- Co-authored-by: guillaumemichel <guillaume@michel.id> Co-authored-by: sukun <sukunrt@gmail.com> Co-authored-by: Marco Munizaga <git@marcopolo.io>
1 parent 0385ec9 commit af0161e

File tree

20 files changed

+520
-107
lines changed

20 files changed

+520
-107
lines changed
 

‎core/network/context.go

+24-4
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ var DialPeerTimeout = 60 * time.Second
1313
type noDialCtxKey struct{}
1414
type dialPeerTimeoutCtxKey struct{}
1515
type forceDirectDialCtxKey struct{}
16-
type useTransientCtxKey struct{}
16+
type allowLimitedConnCtxKey struct{}
1717
type simConnectCtxKey struct{ isClient bool }
1818

1919
var noDial = noDialCtxKey{}
2020
var forceDirectDial = forceDirectDialCtxKey{}
21-
var useTransient = useTransientCtxKey{}
21+
var allowLimitedConn = allowLimitedConnCtxKey{}
2222
var simConnectIsServer = simConnectCtxKey{}
2323
var simConnectIsClient = simConnectCtxKey{isClient: true}
2424

@@ -94,15 +94,35 @@ func WithDialPeerTimeout(ctx context.Context, timeout time.Duration) context.Con
9494
return context.WithValue(ctx, dialPeerTimeoutCtxKey{}, timeout)
9595
}
9696

97+
// WithAllowLimitedConn constructs a new context with an option that instructs
98+
// the network that it is acceptable to use a limited connection when opening a
99+
// new stream.
100+
func WithAllowLimitedConn(ctx context.Context, reason string) context.Context {
101+
return context.WithValue(ctx, allowLimitedConn, reason)
102+
}
103+
97104
// WithUseTransient constructs a new context with an option that instructs the network
98105
// that it is acceptable to use a transient connection when opening a new stream.
106+
//
107+
// Deprecated: Use WithAllowLimitedConn instead.
99108
func WithUseTransient(ctx context.Context, reason string) context.Context {
100-
return context.WithValue(ctx, useTransient, reason)
109+
return context.WithValue(ctx, allowLimitedConn, reason)
110+
}
111+
112+
// GetAllowLimitedConn returns true if the allow limited conn option is set in the context.
113+
func GetAllowLimitedConn(ctx context.Context) (usetransient bool, reason string) {
114+
v := ctx.Value(allowLimitedConn)
115+
if v != nil {
116+
return true, v.(string)
117+
}
118+
return false, ""
101119
}
102120

103121
// GetUseTransient returns true if the use transient option is set in the context.
122+
//
123+
// Deprecated: Use GetAllowLimitedConn instead.
104124
func GetUseTransient(ctx context.Context) (usetransient bool, reason string) {
105-
v := ctx.Value(useTransient)
125+
v := ctx.Value(allowLimitedConn)
106126
if v != nil {
107127
return true, v.(string)
108128
}

‎core/network/errors.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@ var ErrNoConn = errors.New("no usable connection to peer")
2222

2323
// ErrTransientConn is returned when attempting to open a stream to a peer with only a transient
2424
// connection, without specifying the UseTransient option.
25-
var ErrTransientConn = errors.New("transient connection to peer")
25+
//
26+
// Deprecated: Use ErrLimitedConn instead.
27+
var ErrTransientConn = ErrLimitedConn
28+
29+
// ErrLimitedConn is returned when attempting to open a stream to a peer with only a conn
30+
// connection, without specifying the AllowLimitedConn option.
31+
var ErrLimitedConn = errors.New("limited connection to peer")
2632

2733
// ErrResourceLimitExceeded is returned when attempting to perform an operation that would
2834
// exceed system resource limits.

‎core/network/network.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,23 @@ const (
5555
// Connected means has an open, live connection to peer
5656
Connected
5757

58+
// Deprecated: CanConnect is deprecated and will be removed in a future release.
59+
//
5860
// CanConnect means recently connected to peer, terminated gracefully
5961
CanConnect
6062

63+
// Deprecated: CannotConnect is deprecated and will be removed in a future release.
64+
//
6165
// CannotConnect means recently attempted connecting but failed to connect.
6266
// (should signal "made effort, failed")
6367
CannotConnect
68+
69+
// Limited means we have a transient connection to the peer, but aren't fully connected.
70+
Limited
6471
)
6572

6673
func (c Connectedness) String() string {
67-
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect"}
74+
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Limited"}
6875
if c < 0 || int(c) >= len(str) {
6976
return unrecognized
7077
}
@@ -111,8 +118,10 @@ type Stats struct {
111118
Direction Direction
112119
// Opened is the timestamp when this connection was opened.
113120
Opened time.Time
114-
// Transient indicates that this connection is transient and may be closed soon.
115-
Transient bool
121+
// Limited indicates that this connection is Limited. It maybe limited by
122+
// bytes or time. In practice, this is a connection formed over a circuit v2
123+
// relay.
124+
Limited bool
116125
// Extra stores additional metadata about this connection.
117126
Extra map[interface{}]interface{}
118127
}

‎p2p/host/basic/basic_host.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -724,8 +724,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
724724
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
725725

726726
forceDirect, _ := network.GetForceDirectDial(ctx)
727+
canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx)
727728
if !forceDirect {
728-
if h.Network().Connectedness(pi.ID) == network.Connected {
729+
connectedness := h.Network().Connectedness(pi.ID)
730+
if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) {
729731
return nil
730732
}
731733
}

‎p2p/host/blank/blank.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
6363
}
6464

6565
bh := &BlankHost{
66-
n: n,
67-
cmgr: cfg.cmgr,
68-
mux: mstream.NewMultistreamMuxer[protocol.ID](),
66+
n: n,
67+
cmgr: cfg.cmgr,
68+
mux: mstream.NewMultistreamMuxer[protocol.ID](),
69+
eventbus: cfg.eventBus,
6970
}
7071
if bh.eventbus == nil {
7172
bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))

‎p2p/host/pstoremanager/pstoremanager.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,16 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio
103103
ev := e.(event.EvtPeerConnectednessChanged)
104104
p := ev.Peer
105105
switch ev.Connectedness {
106-
case network.NotConnected:
106+
case network.Connected, network.Limited:
107+
// If we reconnect to the peer before we've cleared the information,
108+
// keep it. This is an optimization to keep the disconnected map
109+
// small. We still need to check that a peer is actually
110+
// disconnected before removing it from the peer store.
111+
delete(disconnected, p)
112+
default:
107113
if _, ok := disconnected[p]; !ok {
108114
disconnected[p] = time.Now()
109115
}
110-
case network.Connected:
111-
// If we reconnect to the peer before we've cleared the information, keep it.
112-
// This is an optimization to keep the disconnected map small.
113-
// We still need to check that a peer is actually disconnected before removing it from the peer store.
114-
delete(disconnected, p)
115116
}
116117
case <-ticker.C:
117118
now := time.Now()

‎p2p/host/routed/routed.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost {
4848
func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
4949
// first, check if we're already connected unless force direct dial.
5050
forceDirect, _ := network.GetForceDirectDial(ctx)
51+
canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx)
5152
if !forceDirect {
52-
if rh.Network().Connectedness(pi.ID) == network.Connected {
53+
connectedness := rh.Network().Connectedness(pi.ID)
54+
if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) {
5355
return nil
5456
}
5557
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package swarm
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/libp2p/go-libp2p/core/event"
8+
"github.com/libp2p/go-libp2p/core/network"
9+
"github.com/libp2p/go-libp2p/core/peer"
10+
)
11+
12+
// connectednessEventEmitter emits PeerConnectednessChanged events.
13+
// We ensure that for any peer we connected to we always sent atleast 1 NotConnected Event after
14+
// the peer disconnects. This is because peers can observe a connection before they are notified
15+
// of the connection by a peer connectedness changed event.
16+
type connectednessEventEmitter struct {
17+
mx sync.RWMutex
18+
// newConns is the channel that holds the peerIDs we recently connected to
19+
newConns chan peer.ID
20+
removeConnsMx sync.Mutex
21+
// removeConns is a slice of peerIDs we have recently closed connections to
22+
removeConns []peer.ID
23+
// lastEvent is the last connectedness event sent for a particular peer.
24+
lastEvent map[peer.ID]network.Connectedness
25+
// connectedness is the function that gives the peers current connectedness state
26+
connectedness func(peer.ID) network.Connectedness
27+
// emitter is the PeerConnectednessChanged event emitter
28+
emitter event.Emitter
29+
wg sync.WaitGroup
30+
removeConnNotif chan struct{}
31+
ctx context.Context
32+
cancel context.CancelFunc
33+
}
34+
35+
func newConnectednessEventEmitter(connectedness func(peer.ID) network.Connectedness, emitter event.Emitter) *connectednessEventEmitter {
36+
ctx, cancel := context.WithCancel(context.Background())
37+
c := &connectednessEventEmitter{
38+
newConns: make(chan peer.ID, 32),
39+
lastEvent: make(map[peer.ID]network.Connectedness),
40+
removeConnNotif: make(chan struct{}, 1),
41+
connectedness: connectedness,
42+
emitter: emitter,
43+
ctx: ctx,
44+
cancel: cancel,
45+
}
46+
c.wg.Add(1)
47+
go c.runEmitter()
48+
return c
49+
}
50+
51+
func (c *connectednessEventEmitter) AddConn(p peer.ID) {
52+
c.mx.RLock()
53+
defer c.mx.RUnlock()
54+
if c.ctx.Err() != nil {
55+
return
56+
}
57+
58+
c.newConns <- p
59+
}
60+
61+
func (c *connectednessEventEmitter) RemoveConn(p peer.ID) {
62+
c.mx.RLock()
63+
defer c.mx.RUnlock()
64+
if c.ctx.Err() != nil {
65+
return
66+
}
67+
68+
c.removeConnsMx.Lock()
69+
// This queue is roughly bounded by the total number of added connections we
70+
// have. If consumers of connectedness events are slow, we apply
71+
// backpressure to AddConn operations.
72+
//
73+
// We purposefully don't block/backpressure here to avoid deadlocks, since it's
74+
// reasonable for a consumer of the event to want to remove a connection.
75+
c.removeConns = append(c.removeConns, p)
76+
77+
c.removeConnsMx.Unlock()
78+
79+
select {
80+
case c.removeConnNotif <- struct{}{}:
81+
default:
82+
}
83+
}
84+
85+
func (c *connectednessEventEmitter) Close() {
86+
c.cancel()
87+
c.wg.Wait()
88+
}
89+
90+
func (c *connectednessEventEmitter) runEmitter() {
91+
defer c.wg.Done()
92+
for {
93+
select {
94+
case p := <-c.newConns:
95+
c.notifyPeer(p, true)
96+
case <-c.removeConnNotif:
97+
c.sendConnRemovedNotifications()
98+
case <-c.ctx.Done():
99+
c.mx.Lock() // Wait for all pending AddConn & RemoveConn operations to complete
100+
defer c.mx.Unlock()
101+
for {
102+
select {
103+
case p := <-c.newConns:
104+
c.notifyPeer(p, true)
105+
case <-c.removeConnNotif:
106+
c.sendConnRemovedNotifications()
107+
default:
108+
return
109+
}
110+
}
111+
}
112+
}
113+
}
114+
115+
// notifyPeer sends the peer connectedness event using the emitter.
116+
// Use forceNotConnectedEvent = true to send a NotConnected event even if
117+
// no Connected event was sent for this peer.
118+
// In case a peer is disconnected before we sent the Connected event, we still
119+
// send the Disconnected event because a connection to the peer can be observed
120+
// in such cases.
121+
func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) {
122+
oldState := c.lastEvent[p]
123+
c.lastEvent[p] = c.connectedness(p)
124+
if c.lastEvent[p] == network.NotConnected {
125+
delete(c.lastEvent, p)
126+
}
127+
if (forceNotConnectedEvent && c.lastEvent[p] == network.NotConnected) || c.lastEvent[p] != oldState {
128+
c.emitter.Emit(event.EvtPeerConnectednessChanged{
129+
Peer: p,
130+
Connectedness: c.lastEvent[p],
131+
})
132+
}
133+
}
134+
135+
func (c *connectednessEventEmitter) sendConnRemovedNotifications() {
136+
c.removeConnsMx.Lock()
137+
removeConns := c.removeConns
138+
c.removeConns = nil
139+
c.removeConnsMx.Unlock()
140+
for _, p := range removeConns {
141+
c.notifyPeer(p, false)
142+
}
143+
}

‎p2p/net/swarm/swarm.go

+56-54
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,10 @@ type Swarm struct {
203203

204204
dialRanker network.DialRanker
205205

206-
udpBlackHoleConfig blackHoleConfig
207-
ipv6BlackHoleConfig blackHoleConfig
208-
bhd *blackHoleDetector
206+
udpBlackHoleConfig blackHoleConfig
207+
ipv6BlackHoleConfig blackHoleConfig
208+
bhd *blackHoleDetector
209+
connectednessEventEmitter *connectednessEventEmitter
209210
}
210211

211212
// NewSwarm constructs a Swarm.
@@ -238,6 +239,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
238239
s.transports.m = make(map[int]transport.Transport)
239240
s.notifs.m = make(map[network.Notifiee]struct{})
240241
s.directConnNotifs.m = make(map[peer.ID][]chan struct{})
242+
s.connectednessEventEmitter = newConnectednessEventEmitter(s.Connectedness, emitter)
241243

242244
for _, opt := range opts {
243245
if err := opt(s); err != nil {
@@ -254,7 +256,6 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
254256
s.backf.init(s.ctx)
255257

256258
s.bhd = newBlackHoleDetector(s.udpBlackHoleConfig, s.ipv6BlackHoleConfig, s.metricsTracer)
257-
258259
return s, nil
259260
}
260261

@@ -271,8 +272,6 @@ func (s *Swarm) Done() <-chan struct{} {
271272
func (s *Swarm) close() {
272273
s.ctxCancel()
273274

274-
s.emitter.Close()
275-
276275
// Prevents new connections and/or listeners from being added to the swarm.
277276
s.listeners.Lock()
278277
listeners := s.listeners.m
@@ -308,6 +307,8 @@ func (s *Swarm) close() {
308307

309308
// Wait for everything to finish.
310309
s.refs.Wait()
310+
s.connectednessEventEmitter.Close()
311+
s.emitter.Close()
311312

312313
// Now close out any transports (if necessary). Do this after closing
313314
// all connections/listeners.
@@ -350,6 +351,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
350351
}
351352
stat.Direction = dir
352353
stat.Opened = time.Now()
354+
isLimited := stat.Limited
353355

354356
// Wrap and register the connection.
355357
c := &Conn{
@@ -390,21 +392,24 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
390392
}
391393

392394
c.streams.m = make(map[*Stream]struct{})
393-
isFirstConnection := len(s.conns.m[p]) == 0
394395
s.conns.m[p] = append(s.conns.m[p], c)
395-
396396
// Add two swarm refs:
397397
// * One will be decremented after the close notifications fire in Conn.doClose
398398
// * The other will be decremented when Conn.start exits.
399399
s.refs.Add(2)
400-
401400
// Take the notification lock before releasing the conns lock to block
402401
// Disconnect notifications until after the Connect notifications done.
402+
// This lock also ensures that swarm.refs.Wait() exits after we have
403+
// enqueued the peer connectedness changed notification.
404+
// TODO: Fix this fragility by taking a swarm ref for dial worker loop
403405
c.notifyLk.Lock()
404406
s.conns.Unlock()
405407

406-
// Notify goroutines waiting for a direct connection
407-
if !c.Stat().Transient {
408+
s.connectednessEventEmitter.AddConn(p)
409+
410+
if !isLimited {
411+
// Notify goroutines waiting for a direct connection
412+
//
408413
// Go routines interested in waiting for direct connection first acquire this lock
409414
// and then acquire s.conns.RLock. Do not acquire this lock before conns.Unlock to
410415
// prevent deadlock.
@@ -415,16 +420,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
415420
delete(s.directConnNotifs.m, p)
416421
s.directConnNotifs.Unlock()
417422
}
418-
419-
// Emit event after releasing `s.conns` lock so that a consumer can still
420-
// use swarm methods that need the `s.conns` lock.
421-
if isFirstConnection {
422-
s.emitter.Emit(event.EvtPeerConnectednessChanged{
423-
Peer: p,
424-
Connectedness: network.Connected,
425-
})
426-
}
427-
428423
s.notifyAll(func(f network.Notifiee) {
429424
f.Connected(s, c)
430425
})
@@ -455,14 +450,14 @@ func (s *Swarm) StreamHandler() network.StreamHandler {
455450

456451
// NewStream creates a new stream on any available connection to peer, dialing
457452
// if necessary.
458-
// Use network.WithUseTransient to open a stream over a transient(relayed)
453+
// Use network.WithAllowLimitedConn to open a stream over a limited(relayed)
459454
// connection.
460455
func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error) {
461456
log.Debugf("[%s] opening stream to peer [%s]", s.local, p)
462457

463458
// Algorithm:
464459
// 1. Find the best connection, otherwise, dial.
465-
// 2. If the best connection is transient, wait for a direct conn via conn
460+
// 2. If the best connection is limited, wait for a direct conn via conn
466461
// reversal or hole punching.
467462
// 3. Try opening a stream.
468463
// 4. If the underlying connection is, in fact, closed, close the outer
@@ -491,8 +486,8 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error
491486
}
492487
}
493488

494-
useTransient, _ := network.GetUseTransient(ctx)
495-
if !useTransient && c.Stat().Transient {
489+
limitedAllowed, _ := network.GetAllowLimitedConn(ctx)
490+
if !limitedAllowed && c.Stat().Limited {
496491
var err error
497492
c, err = s.waitForDirectConn(ctx, p)
498493
if err != nil {
@@ -518,12 +513,12 @@ func (s *Swarm) waitForDirectConn(ctx context.Context, p peer.ID) (*Conn, error)
518513
if c == nil {
519514
s.directConnNotifs.Unlock()
520515
return nil, network.ErrNoConn
521-
} else if !c.Stat().Transient {
516+
} else if !c.Stat().Limited {
522517
s.directConnNotifs.Unlock()
523518
return c, nil
524519
}
525520

526-
// Wait for transient connection to upgrade to a direct connection either by
521+
// Wait for limited connection to upgrade to a direct connection either by
527522
// connection reversal or hole punching.
528523
ch := make(chan struct{})
529524
s.directConnNotifs.m[p] = append(s.directConnNotifs.m[p], ch)
@@ -555,8 +550,8 @@ func (s *Swarm) waitForDirectConn(ctx context.Context, p peer.ID) (*Conn, error)
555550
if c == nil {
556551
return nil, network.ErrNoConn
557552
}
558-
if c.Stat().Transient {
559-
return nil, network.ErrTransientConn
553+
if c.Stat().Limited {
554+
return nil, network.ErrLimitedConn
560555
}
561556
return c, nil
562557
}
@@ -577,11 +572,11 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn {
577572
}
578573

579574
func isBetterConn(a, b *Conn) bool {
580-
// If one is transient and not the other, prefer the non-transient connection.
581-
aTransient := a.Stat().Transient
582-
bTransient := b.Stat().Transient
583-
if aTransient != bTransient {
584-
return !aTransient
575+
// If one is limited and not the other, prefer the unlimited connection.
576+
aLimited := a.Stat().Limited
577+
bLimited := b.Stat().Limited
578+
if aLimited != bLimited {
579+
return !aLimited
585580
}
586581

587582
// If one is direct and not the other, prefer the direct connection.
@@ -632,7 +627,7 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
632627

633628
// bestAcceptableConnToPeer returns the best acceptable connection, considering the passed in ctx.
634629
// If network.WithForceDirectDial is used, it only returns a direct connections, ignoring
635-
// any transient (relayed) connections to the peer.
630+
// any limited (relayed) connections to the peer.
636631
func (s *Swarm) bestAcceptableConnToPeer(ctx context.Context, p peer.ID) *Conn {
637632
conn := s.bestConnToPeer(p)
638633

@@ -652,8 +647,28 @@ func isDirectConn(c *Conn) bool {
652647
// To check if we have an open connection, use `s.Connectedness(p) ==
653648
// network.Connected`.
654649
func (s *Swarm) Connectedness(p peer.ID) network.Connectedness {
655-
if s.bestConnToPeer(p) != nil {
656-
return network.Connected
650+
s.conns.RLock()
651+
defer s.conns.RUnlock()
652+
653+
return s.connectednessUnlocked(p)
654+
}
655+
656+
// connectednessUnlocked returns the connectedness of a peer.
657+
func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness {
658+
var haveLimited bool
659+
for _, c := range s.conns.m[p] {
660+
if c.IsClosed() {
661+
// These will be garbage collected soon
662+
continue
663+
}
664+
if c.Stat().Limited {
665+
haveLimited = true
666+
} else {
667+
return network.Connected
668+
}
669+
}
670+
if haveLimited {
671+
return network.Limited
657672
}
658673
return network.NotConnected
659674
}
@@ -751,24 +766,7 @@ func (s *Swarm) removeConn(c *Conn) {
751766
p := c.RemotePeer()
752767

753768
s.conns.Lock()
754-
755769
cs := s.conns.m[p]
756-
757-
if len(cs) == 1 {
758-
delete(s.conns.m, p)
759-
s.conns.Unlock()
760-
761-
// Emit event after releasing `s.conns` lock so that a consumer can still
762-
// use swarm methods that need the `s.conns` lock.
763-
s.emitter.Emit(event.EvtPeerConnectednessChanged{
764-
Peer: p,
765-
Connectedness: network.NotConnected,
766-
})
767-
return
768-
}
769-
770-
defer s.conns.Unlock()
771-
772770
for i, ci := range cs {
773771
if ci == c {
774772
// NOTE: We're intentionally preserving order.
@@ -780,6 +778,10 @@ func (s *Swarm) removeConn(c *Conn) {
780778
break
781779
}
782780
}
781+
if len(s.conns.m[p]) == 0 {
782+
delete(s.conns.m, p)
783+
}
784+
s.conns.Unlock()
783785
}
784786

785787
// String returns a string representation of Network.

‎p2p/net/swarm/swarm_conn.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ func (c *Conn) doClose() {
7373

7474
c.err = c.conn.Close()
7575

76+
// Send the connectedness event after closing the connection.
77+
// This ensures that both remote connection close and local connection
78+
// close events are sent after the underlying transport connection is closed.
79+
c.swarm.connectednessEventEmitter.RemoveConn(c.RemotePeer())
80+
7681
// This is just for cleaning up state. The connection has already been closed.
7782
// We *could* optimize this but it really isn't worth it.
7883
for s := range streams {
@@ -85,10 +90,11 @@ func (c *Conn) doClose() {
8590
c.notifyLk.Lock()
8691
defer c.notifyLk.Unlock()
8792

93+
// Only notify for disconnection if we notified for connection
8894
c.swarm.notifyAll(func(f network.Notifiee) {
8995
f.Disconnected(c.swarm, c)
9096
})
91-
c.swarm.refs.Done() // taken in Swarm.addConn
97+
c.swarm.refs.Done()
9298
}()
9399
}
94100

@@ -108,7 +114,6 @@ func (c *Conn) start() {
108114
go func() {
109115
defer c.swarm.refs.Done()
110116
defer c.Close()
111-
112117
for {
113118
ts, err := c.conn.AcceptStream()
114119
if err != nil {
@@ -193,9 +198,9 @@ func (c *Conn) Stat() network.ConnStats {
193198

194199
// NewStream returns a new Stream from this connection
195200
func (c *Conn) NewStream(ctx context.Context) (network.Stream, error) {
196-
if c.Stat().Transient {
197-
if useTransient, _ := network.GetUseTransient(ctx); !useTransient {
198-
return nil, network.ErrTransientConn
201+
if c.Stat().Limited {
202+
if useLimited, _ := network.GetAllowLimitedConn(ctx); !useLimited {
203+
return nil, network.ErrLimitedConn
199204
}
200205
}
201206

‎p2p/net/swarm/swarm_event_test.go

+199-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package swarm_test
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

@@ -12,6 +13,7 @@ import (
1213
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
1314

1415
ma "github.com/multiformats/go-multiaddr"
16+
"github.com/stretchr/testify/assert"
1517
"github.com/stretchr/testify/require"
1618
)
1719

@@ -66,6 +68,10 @@ func TestConnectednessEventsSingleConn(t *testing.T) {
6668
}
6769

6870
func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) {
71+
ctx := context.Background()
72+
ctx, cancel := context.WithCancel(ctx)
73+
defer cancel()
74+
6975
dialerEventBus := eventbus.NewBus()
7076
dialer := swarmt.GenSwarm(t, swarmt.OptDialOnly, swarmt.EventBus(dialerEventBus))
7177
defer dialer.Close()
@@ -85,10 +91,6 @@ func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) {
8591
sub, err := dialerEventBus.Subscribe(new(event.EvtPeerConnectednessChanged))
8692
require.NoError(t, err)
8793

88-
ctx := context.Background()
89-
ctx, cancel := context.WithCancel(ctx)
90-
defer cancel()
91-
9294
// A slow consumer
9395
go func() {
9496
for {
@@ -113,3 +115,196 @@ func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) {
113115

114116
// The test should finish without deadlocking
115117
}
118+
119+
func TestConnectednessEvents(t *testing.T) {
120+
s1, sub1 := newSwarmWithSubscription(t)
121+
const N = 100
122+
peers := make([]*Swarm, N)
123+
for i := 0; i < N; i++ {
124+
peers[i] = swarmt.GenSwarm(t)
125+
}
126+
127+
// First check all connected events
128+
done := make(chan struct{})
129+
go func() {
130+
defer close(done)
131+
for i := 0; i < N; i++ {
132+
e := <-sub1.Out()
133+
evt, ok := e.(event.EvtPeerConnectednessChanged)
134+
if !ok {
135+
t.Error("invalid event received", e)
136+
return
137+
}
138+
if evt.Connectedness != network.Connected {
139+
t.Errorf("invalid event received: expected: Connected, got: %s", evt)
140+
return
141+
}
142+
}
143+
}()
144+
for i := 0; i < N; i++ {
145+
s1.Peerstore().AddAddrs(peers[i].LocalPeer(), []ma.Multiaddr{peers[i].ListenAddresses()[0]}, time.Hour)
146+
_, err := s1.DialPeer(context.Background(), peers[i].LocalPeer())
147+
require.NoError(t, err)
148+
}
149+
select {
150+
case <-done:
151+
case <-time.After(10 * time.Second):
152+
t.Fatal("expected all connectedness events to be completed")
153+
}
154+
155+
// Disconnect some peers
156+
done = make(chan struct{})
157+
go func() {
158+
defer close(done)
159+
for i := 0; i < N/2; i++ {
160+
e := <-sub1.Out()
161+
evt, ok := e.(event.EvtPeerConnectednessChanged)
162+
if !ok {
163+
t.Error("invalid event received", e)
164+
return
165+
}
166+
if evt.Connectedness != network.NotConnected {
167+
t.Errorf("invalid event received: expected: NotConnected, got: %s", evt)
168+
return
169+
}
170+
}
171+
}()
172+
for i := 0; i < N/2; i++ {
173+
err := s1.ClosePeer(peers[i].LocalPeer())
174+
require.NoError(t, err)
175+
}
176+
select {
177+
case <-done:
178+
case <-time.After(10 * time.Second):
179+
t.Fatal("expected all disconnected events to be completed")
180+
}
181+
182+
// Check for disconnected events on swarm close
183+
done = make(chan struct{})
184+
go func() {
185+
defer close(done)
186+
for i := N / 2; i < N; i++ {
187+
e := <-sub1.Out()
188+
evt, ok := e.(event.EvtPeerConnectednessChanged)
189+
if !ok {
190+
t.Error("invalid event received", e)
191+
return
192+
}
193+
if evt.Connectedness != network.NotConnected {
194+
t.Errorf("invalid event received: expected: NotConnected, got: %s", evt)
195+
return
196+
}
197+
}
198+
}()
199+
s1.Close()
200+
select {
201+
case <-done:
202+
case <-time.After(10 * time.Second):
203+
t.Fatal("expected all disconnected events after swarm close to be completed")
204+
}
205+
}
206+
207+
func TestConnectednessEventDeadlock(t *testing.T) {
208+
s1, sub1 := newSwarmWithSubscription(t)
209+
const N = 100
210+
peers := make([]*Swarm, N)
211+
for i := 0; i < N; i++ {
212+
peers[i] = swarmt.GenSwarm(t)
213+
}
214+
215+
// First check all connected events
216+
done := make(chan struct{})
217+
go func() {
218+
defer close(done)
219+
count := 0
220+
for count < N {
221+
e := <-sub1.Out()
222+
// sleep to simulate a slow consumer
223+
evt, ok := e.(event.EvtPeerConnectednessChanged)
224+
if !ok {
225+
t.Error("invalid event received", e)
226+
return
227+
}
228+
if evt.Connectedness != network.Connected {
229+
continue
230+
}
231+
count++
232+
s1.ClosePeer(evt.Peer)
233+
}
234+
}()
235+
for i := 0; i < N; i++ {
236+
s1.Peerstore().AddAddrs(peers[i].LocalPeer(), []ma.Multiaddr{peers[i].ListenAddresses()[0]}, time.Hour)
237+
go func(i int) {
238+
_, err := s1.DialPeer(context.Background(), peers[i].LocalPeer())
239+
assert.NoError(t, err)
240+
}(i)
241+
}
242+
select {
243+
case <-done:
244+
case <-time.After(100 * time.Second):
245+
t.Fatal("expected all connectedness events to be completed")
246+
}
247+
}
248+
249+
func TestConnectednessEventDeadlockWithDial(t *testing.T) {
250+
s1, sub1 := newSwarmWithSubscription(t)
251+
const N = 200
252+
peers := make([]*Swarm, N)
253+
for i := 0; i < N; i++ {
254+
peers[i] = swarmt.GenSwarm(t)
255+
}
256+
peers2 := make([]*Swarm, N)
257+
for i := 0; i < N; i++ {
258+
peers2[i] = swarmt.GenSwarm(t)
259+
}
260+
261+
// First check all connected events
262+
done := make(chan struct{})
263+
var subWG sync.WaitGroup
264+
subWG.Add(1)
265+
go func() {
266+
defer subWG.Done()
267+
count := 0
268+
for {
269+
var e interface{}
270+
select {
271+
case e = <-sub1.Out():
272+
case <-done:
273+
return
274+
}
275+
// sleep to simulate a slow consumer
276+
evt, ok := e.(event.EvtPeerConnectednessChanged)
277+
if !ok {
278+
t.Error("invalid event received", e)
279+
return
280+
}
281+
if evt.Connectedness != network.Connected {
282+
continue
283+
}
284+
if count < N {
285+
time.Sleep(10 * time.Millisecond)
286+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
287+
s1.Peerstore().AddAddrs(peers2[count].LocalPeer(), []ma.Multiaddr{peers2[count].ListenAddresses()[0]}, time.Hour)
288+
s1.DialPeer(ctx, peers2[count].LocalPeer())
289+
count++
290+
cancel()
291+
}
292+
}
293+
}()
294+
var wg sync.WaitGroup
295+
wg.Add(N)
296+
for i := 0; i < N; i++ {
297+
s1.Peerstore().AddAddrs(peers[i].LocalPeer(), []ma.Multiaddr{peers[i].ListenAddresses()[0]}, time.Hour)
298+
go func(i int) {
299+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
300+
s1.DialPeer(ctx, peers[i].LocalPeer())
301+
cancel()
302+
wg.Done()
303+
}(i)
304+
}
305+
wg.Wait()
306+
s1.Close()
307+
308+
close(done)
309+
subWG.Wait()
310+
}

‎p2p/protocol/circuitv2/client/dial.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (c *Client) connect(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
179179
// relay connection and we mark the connection as transient.
180180
var stat network.ConnStats
181181
if limit := msg.GetLimit(); limit != nil {
182-
stat.Transient = true
182+
stat.Limited = true
183183
stat.Extra = make(map[interface{}]interface{})
184184
stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second
185185
stat.Extra[StatLimitData] = limit.GetData()

‎p2p/protocol/circuitv2/client/handlers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (c *Client) handleStreamV2(s network.Stream) {
6767
// relay connection and we mark the connection as transient.
6868
var stat network.ConnStats
6969
if limit := msg.GetLimit(); limit != nil {
70-
stat.Transient = true
70+
stat.Limited = true
7171
stat.Extra = make(map[interface{}]interface{})
7272
stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second
7373
stat.Extra[StatLimitData] = limit.GetData()

‎p2p/protocol/circuitv2/relay/relay_test.go

+32-8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/libp2p/go-libp2p/core/crypto"
13+
"github.com/libp2p/go-libp2p/core/event"
1314
"github.com/libp2p/go-libp2p/core/host"
1415
"github.com/libp2p/go-libp2p/core/metrics"
1516
"github.com/libp2p/go-libp2p/core/network"
@@ -23,6 +24,7 @@ import (
2324
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
2425
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
2526
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
27+
"github.com/stretchr/testify/require"
2628

2729
ma "github.com/multiformats/go-multiaddr"
2830
)
@@ -49,7 +51,8 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u
4951
}
5052

5153
bwr := metrics.NewBandwidthCounter()
52-
netw, err := swarm.NewSwarm(p, ps, eventbus.NewBus(), swarm.WithMetrics(bwr))
54+
bus := eventbus.NewBus()
55+
netw, err := swarm.NewSwarm(p, ps, bus, swarm.WithMetrics(bwr))
5356
if err != nil {
5457
t.Fatal(err)
5558
}
@@ -70,7 +73,7 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u
7073
t.Fatal(err)
7174
}
7275

73-
h := bhost.NewBlankHost(netw)
76+
h := bhost.NewBlankHost(netw, bhost.WithEventBus(bus))
7477

7578
hosts = append(hosts, h)
7679
}
@@ -145,20 +148,41 @@ func TestBasicRelay(t *testing.T) {
145148
t.Fatal(err)
146149
}
147150

151+
sub, err := hosts[2].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
152+
require.NoError(t, err)
153+
148154
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
149155
if err != nil {
150156
t.Fatal(err)
151157
}
158+
for {
159+
var e interface{}
160+
select {
161+
case e = <-sub.Out():
162+
case <-time.After(2 * time.Second):
163+
t.Fatal("expected limited connectivity event")
164+
}
165+
evt, ok := e.(event.EvtPeerConnectednessChanged)
166+
if !ok {
167+
t.Fatalf("invalid event: %s", e)
168+
}
169+
if evt.Peer == hosts[0].ID() {
170+
if evt.Connectedness != network.Limited {
171+
t.Fatalf("expected limited connectivity %s", evt.Connectedness)
172+
}
173+
break
174+
}
175+
}
152176

153177
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
154178
if len(conns) != 1 {
155179
t.Fatalf("expected 1 connection, but got %d", len(conns))
156180
}
157-
if !conns[0].Stat().Transient {
181+
if !conns[0].Stat().Limited {
158182
t.Fatal("expected transient connection")
159183
}
160184

161-
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
185+
s, err := hosts[2].NewStream(network.WithAllowLimitedConn(ctx, "test"), hosts[0].ID(), "test")
162186
if err != nil {
163187
t.Fatal(err)
164188
}
@@ -229,11 +253,11 @@ func TestRelayLimitTime(t *testing.T) {
229253
if len(conns) != 1 {
230254
t.Fatalf("expected 1 connection, but got %d", len(conns))
231255
}
232-
if !conns[0].Stat().Transient {
256+
if !conns[0].Stat().Limited {
233257
t.Fatal("expected transient connection")
234258
}
235259

236-
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
260+
s, err := hosts[2].NewStream(network.WithAllowLimitedConn(ctx, "test"), hosts[0].ID(), "test")
237261
if err != nil {
238262
t.Fatal(err)
239263
}
@@ -315,11 +339,11 @@ func TestRelayLimitData(t *testing.T) {
315339
if len(conns) != 1 {
316340
t.Fatalf("expected 1 connection, but got %d", len(conns))
317341
}
318-
if !conns[0].Stat().Transient {
342+
if !conns[0].Stat().Limited {
319343
t.Fatal("expected transient connection")
320344
}
321345

322-
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
346+
s, err := hosts[2].NewStream(network.WithAllowLimitedConn(ctx, "test"), hosts[0].ID(), "test")
323347
if err != nil {
324348
t.Fatal(err)
325349
}

‎p2p/protocol/holepunch/holepunch_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func TestFailuresOnResponder(t *testing.T) {
339339
defer h2.Close()
340340
defer relay.Close()
341341

342-
s, err := h2.NewStream(network.WithUseTransient(context.Background(), "holepunch"), h1.ID(), holepunch.Protocol)
342+
s, err := h2.NewStream(network.WithAllowLimitedConn(context.Background(), "holepunch"), h1.ID(), holepunch.Protocol)
343343
require.NoError(t, err)
344344

345345
go tc.initiator(s)

‎p2p/protocol/holepunch/holepuncher.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
174174
// initiateHolePunch opens a new hole punching coordination stream,
175175
// exchanges the addresses and measures the RTT.
176176
func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, []ma.Multiaddr, time.Duration, error) {
177-
hpCtx := network.WithUseTransient(hp.ctx, "hole-punch")
177+
hpCtx := network.WithAllowLimitedConn(hp.ctx, "hole-punch")
178178
sCtx := network.WithNoDial(hpCtx, "hole-punch")
179179

180180
str, err := hp.host.NewStream(sCtx, rp, Protocol)

‎p2p/protocol/identify/id.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
408408
func (ids *idService) identifyConn(c network.Conn) error {
409409
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
410410
defer cancel()
411-
s, err := c.NewStream(network.WithUseTransient(ctx, "identify"))
411+
s, err := c.NewStream(network.WithAllowLimitedConn(ctx, "identify"))
412412
if err != nil {
413413
log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err)
414414
return err
@@ -752,7 +752,8 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
752752
// Taking the lock ensures that we don't concurrently process a disconnect.
753753
ids.addrMu.Lock()
754754
ttl := peerstore.RecentlyConnectedAddrTTL
755-
if ids.Host.Network().Connectedness(p) == network.Connected {
755+
switch ids.Host.Network().Connectedness(p) {
756+
case network.Limited, network.Connected:
756757
ttl = peerstore.ConnectedAddrTTL
757758
}
758759

@@ -980,13 +981,15 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) {
980981
delete(ids.conns, c)
981982
ids.connsMu.Unlock()
982983

983-
if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected {
984-
// Last disconnect.
985-
// Undo the setting of addresses to peer.ConnectedAddrTTL we did
986-
ids.addrMu.Lock()
987-
defer ids.addrMu.Unlock()
988-
ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
984+
switch ids.Host.Network().Connectedness(c.RemotePeer()) {
985+
case network.Connected, network.Limited:
986+
return
989987
}
988+
// Last disconnect.
989+
// Undo the setting of addresses to peer.ConnectedAddrTTL we did
990+
ids.addrMu.Lock()
991+
defer ids.addrMu.Unlock()
992+
ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
990993
}
991994

992995
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}

‎p2p/protocol/ping/ping.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func pingError(err error) chan Result {
111111
// Ping pings the remote peer until the context is canceled, returning a stream
112112
// of RTTs or errors.
113113
func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result {
114-
s, err := h.NewStream(network.WithUseTransient(ctx, "ping"), p, ID)
114+
s, err := h.NewStream(network.WithAllowLimitedConn(ctx, "ping"), p, ID)
115115
if err != nil {
116116
return pingError(err)
117117
}

‎p2p/test/basichost/basic_host_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestNoStreamOverTransientConnection(t *testing.T) {
7777

7878
require.Error(t, err)
7979

80-
_, err = h1.NewStream(network.WithUseTransient(context.Background(), "test"), h2.ID(), "/testprotocol")
80+
_, err = h1.NewStream(network.WithAllowLimitedConn(context.Background(), "test"), h2.ID(), "/testprotocol")
8181
require.NoError(t, err)
8282
}
8383

‎p2p/test/swarm/swarm_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,16 @@ func TestNewStreamTransientConnection(t *testing.T) {
110110

111111
h1.Peerstore().AddAddr(h2.ID(), relayaddr, peerstore.TempAddrTTL)
112112

113-
// WithUseTransient should succeed
113+
// WithAllowLimitedConn should succeed
114114
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
115115
defer cancel()
116-
ctx = network.WithUseTransient(ctx, "test")
116+
ctx = network.WithAllowLimitedConn(ctx, "test")
117117
s, err := h1.Network().NewStream(ctx, h2.ID())
118118
require.NoError(t, err)
119119
require.NotNil(t, s)
120120
defer s.Close()
121121

122-
// Without WithUseTransient should fail with context deadline exceeded
122+
// Without WithAllowLimitedConn should fail with context deadline exceeded
123123
ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
124124
defer cancel()
125125
s, err = h1.Network().NewStream(ctx, h2.ID())

0 commit comments

Comments
 (0)
Please sign in to comment.