Skip to content

Commit 11515f0

Browse files
committed
Add a "transient" network connectivity state
Previously, we'd consider "transiently" connected peers to be connected. This meant: 1. We wouldn't fire a second event when transitioning to "really connected". The only option for users was to listen on the old-style per-connection notifications. 2. "Connectedness" checks would be a little too eager to treat a peer as connected. For 99% of users, "transient" peers should be treated as disconnected. So while it's technically a breaking change to split-out "transient" connectivity into a separate state, I expect it's more likely to fix bugs than anything. Unfortunately, this change _did_ require several changes to go-libp2p itself because go-libp2p _does_ care about transient connections: 1. We want to keep peerstore information for transient peers. 2. We may sometimes want to treat peers as "connected" in the host. 3. Identify still needs to run over transient connections. fixes #2692
1 parent 6aa701a commit 11515f0

File tree

6 files changed

+61
-24
lines changed

6 files changed

+61
-24
lines changed

core/network/network.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,13 @@ const (
6161
// CannotConnect means recently attempted connecting but failed to connect.
6262
// (should signal "made effort, failed")
6363
CannotConnect
64+
65+
// Transient means we have a transient connection to the peer, but aren't fully connected.
66+
Transient
6467
)
6568

6669
func (c Connectedness) String() string {
67-
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect"}
70+
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Transient"}
6871
if c < 0 || int(c) >= len(str) {
6972
return unrecognized
7073
}

p2p/host/basic/basic_host.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
723723
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
724724

725725
forceDirect, _ := network.GetForceDirectDial(ctx)
726+
canUseTransient, _ := network.GetUseTransient(ctx)
726727
if !forceDirect {
727-
if h.Network().Connectedness(pi.ID) == network.Connected {
728+
connectedness := rh.Network().Connectedness(pi.ID)
729+
if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) {
728730
return nil
729731
}
730732
}

p2p/host/pstoremanager/pstoremanager.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,16 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio
103103
ev := e.(event.EvtPeerConnectednessChanged)
104104
p := ev.Peer
105105
switch ev.Connectedness {
106-
case network.NotConnected:
106+
case network.Connected, network.Transient:
107+
// If we reconnect to the peer before we've cleared the information,
108+
// keep it. This is an optimization to keep the disconnected map
109+
// small. We still need to check that a peer is actually
110+
// disconnected before removing it from the peer store.
111+
delete(disconnected, p)
112+
default:
107113
if _, ok := disconnected[p]; !ok {
108114
disconnected[p] = time.Now()
109115
}
110-
case network.Connected:
111-
// If we reconnect to the peer before we've cleared the information, keep it.
112-
// This is an optimization to keep the disconnected map small.
113-
// We still need to check that a peer is actually disconnected before removing it from the peer store.
114-
delete(disconnected, p)
115116
}
116117
case <-ticker.C:
117118
now := time.Now()

p2p/host/routed/routed.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost {
4848
func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
4949
// first, check if we're already connected unless force direct dial.
5050
forceDirect, _ := network.GetForceDirectDial(ctx)
51+
canUseTransient, _ := network.GetUseTransient(ctx)
5152
if !forceDirect {
52-
if rh.Network().Connectedness(pi.ID) == network.Connected {
53+
connectedness := rh.Network().Connectedness(pi.ID)
54+
if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) {
5355
return nil
5456
}
5557
}

p2p/net/swarm/swarm.go

+34-8
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
344344
}
345345
stat.Direction = dir
346346
stat.Opened = time.Now()
347+
isTransient := stat.Transient
347348

348349
// Wrap and register the connection.
349350
c := &Conn{
@@ -383,8 +384,9 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
383384
return nil, ErrSwarmClosed
384385
}
385386

387+
oldState := s.connectednessUnlocked(p)
388+
386389
c.streams.m = make(map[*Stream]struct{})
387-
isFirstConnection := len(s.conns.m[p]) == 0
388390
s.conns.m[p] = append(s.conns.m[p], c)
389391

390392
// Add two swarm refs:
@@ -397,8 +399,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
397399
c.notifyLk.Lock()
398400
s.conns.Unlock()
399401

400-
// Notify goroutines waiting for a direct connection
401-
if !c.Stat().Transient {
402+
newState := network.Transient
403+
if !isTransient {
404+
newState = network.Connected
405+
406+
// Notify goroutines waiting for a direct connection
407+
//
402408
// Go routines interested in waiting for direct connection first acquire this lock
403409
// and then acquire s.conns.RLock. Do not acquire this lock before conns.Unlock to
404410
// prevent deadlock.
@@ -412,10 +418,10 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
412418

413419
// Emit event after releasing `s.conns` lock so that a consumer can still
414420
// use swarm methods that need the `s.conns` lock.
415-
if isFirstConnection {
421+
if oldState != newState {
416422
s.emitter.Emit(event.EvtPeerConnectednessChanged{
417423
Peer: p,
418-
Connectedness: network.Connected,
424+
Connectedness: newState,
419425
})
420426
}
421427

@@ -646,10 +652,30 @@ func isDirectConn(c *Conn) bool {
646652
// To check if we have an open connection, use `s.Connectedness(p) ==
647653
// network.Connected`.
648654
func (s *Swarm) Connectedness(p peer.ID) network.Connectedness {
649-
if s.bestConnToPeer(p) != nil {
650-
return network.Connected
655+
s.conns.RLock()
656+
defer s.conns.RUnlock()
657+
658+
s.connectednessUnlocked(p)
659+
}
660+
661+
func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness {
662+
var haveTransient bool
663+
for _, c := range s.conns.m[p] {
664+
if c.conn.IsClosed() {
665+
// We *will* garbage collect this soon anyways.
666+
continue
667+
}
668+
if c.Stat().Transient {
669+
haveTransient = true
670+
} else {
671+
return network.Connected
672+
}
673+
}
674+
if haveTransient {
675+
return network.Transient
676+
} else {
677+
return network.NotConnected
651678
}
652-
return network.NotConnected
653679
}
654680

655681
// Conns returns a slice of all connections.

p2p/protocol/identify/id.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,8 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
746746
// Taking the lock ensures that we don't concurrently process a disconnect.
747747
ids.addrMu.Lock()
748748
ttl := peerstore.RecentlyConnectedAddrTTL
749-
if ids.Host.Network().Connectedness(p) == network.Connected {
749+
switch ids.Host.Network().Connectedness(p) {
750+
case network.Transient, network.Connected:
750751
ttl = peerstore.ConnectedAddrTTL
751752
}
752753

@@ -975,13 +976,15 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) {
975976
delete(ids.conns, c)
976977
ids.connsMu.Unlock()
977978

978-
if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected {
979-
// Last disconnect.
980-
// Undo the setting of addresses to peer.ConnectedAddrTTL we did
981-
ids.addrMu.Lock()
982-
defer ids.addrMu.Unlock()
983-
ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
979+
switch ids.Host.Network().Connectedness(c.RemotePeer()) {
980+
case network.Connected, network.Transient:
981+
return
984982
}
983+
// Last disconnect.
984+
// Undo the setting of addresses to peer.ConnectedAddrTTL we did
985+
ids.addrMu.Lock()
986+
defer ids.addrMu.Unlock()
987+
ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
985988
}
986989

987990
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}

0 commit comments

Comments
 (0)