Skip to content

Commit a16b08d

Browse files
committed
Always send 1 event for a connection
1 parent c68a035 commit a16b08d

6 files changed

+253
-79
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package swarm
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/libp2p/go-libp2p/core/event"
8+
"github.com/libp2p/go-libp2p/core/network"
9+
"github.com/libp2p/go-libp2p/core/peer"
10+
)
11+
12+
// connectednessEventEmitter emits PeerConnectednessChanged events.
13+
// We ensure that for any peer we connected to we always sent atleast 1 NotConnected Event after
14+
// the peer disconnects. This is because peers can observe a connection before they are notified
15+
// of the connection by a peer connectedness changed event.
16+
type connectednessEventEmitter struct {
17+
mx sync.RWMutex
18+
// newConns is the channel that holds the peerIDs we recently connected to
19+
newConns chan peer.ID
20+
removeConnsMx sync.Mutex
21+
// removeConns is a slice of peerIDs we have recently closed connections to
22+
removeConns []peer.ID
23+
// lastEvent is the last connectedness event sent for a particular peer.
24+
lastEvent map[peer.ID]network.Connectedness
25+
// connectedness is the function that gives the peers current connectedness state
26+
connectedness func(peer.ID) network.Connectedness
27+
// emitter is the PeerConnectednessChanged event emitter
28+
emitter event.Emitter
29+
wg sync.WaitGroup
30+
removeConnNotif chan struct{}
31+
ctx context.Context
32+
cancel context.CancelFunc
33+
}
34+
35+
func newConnectednessEventEmitter(connectedness func(peer.ID) network.Connectedness, emitter event.Emitter) *connectednessEventEmitter {
36+
ctx, cancel := context.WithCancel(context.Background())
37+
c := &connectednessEventEmitter{
38+
newConns: make(chan peer.ID, 32),
39+
lastEvent: make(map[peer.ID]network.Connectedness),
40+
removeConnNotif: make(chan struct{}, 1),
41+
connectedness: connectedness,
42+
emitter: emitter,
43+
ctx: ctx,
44+
cancel: cancel,
45+
}
46+
c.wg.Add(1)
47+
go c.runEmitter()
48+
return c
49+
}
50+
51+
func (c *connectednessEventEmitter) AddConn(p peer.ID) {
52+
c.mx.RLock()
53+
defer c.mx.RUnlock()
54+
if c.ctx.Err() != nil {
55+
return
56+
}
57+
58+
c.newConns <- p
59+
}
60+
61+
func (c *connectednessEventEmitter) RemoveConn(p peer.ID) {
62+
c.mx.RLock()
63+
defer c.mx.RUnlock()
64+
if c.ctx.Err() != nil {
65+
return
66+
}
67+
68+
c.removeConnsMx.Lock()
69+
// This queue is not unbounded since we block in the AddConn method
70+
// So we are adding connections to the swarm only at a rate
71+
// the subscriber for our peer connectedness changed events can consume them.
72+
// If a lot of open connections are closed at once, increasing the disconnected
73+
// event notification rate, the rate of adding connections to the swarm would
74+
// proportionately reduce, which would eventually reduce the length of this slice.
75+
c.removeConns = append(c.removeConns, p)
76+
c.removeConnsMx.Unlock()
77+
78+
select {
79+
case c.removeConnNotif <- struct{}{}:
80+
default:
81+
}
82+
}
83+
84+
func (c *connectednessEventEmitter) Close() {
85+
c.cancel()
86+
c.wg.Wait()
87+
}
88+
89+
func (c *connectednessEventEmitter) runEmitter() {
90+
defer c.wg.Done()
91+
for {
92+
select {
93+
case p := <-c.newConns:
94+
c.notifyPeer(p, true)
95+
case <-c.removeConnNotif:
96+
c.sendConnRemovedNotifications()
97+
case <-c.ctx.Done():
98+
c.mx.Lock() // Wait for all pending AddConn & RemoveConn operations to complete
99+
defer c.mx.Unlock()
100+
for {
101+
select {
102+
case p := <-c.newConns:
103+
c.notifyPeer(p, true)
104+
case <-c.removeConnNotif:
105+
c.sendConnRemovedNotifications()
106+
default:
107+
return
108+
}
109+
}
110+
}
111+
}
112+
}
113+
114+
func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) {
115+
oldState := c.lastEvent[p]
116+
c.lastEvent[p] = c.connectedness(p)
117+
if c.lastEvent[p] == network.NotConnected {
118+
delete(c.lastEvent, p)
119+
}
120+
if (forceNotConnectedEvent && c.lastEvent[p] == network.NotConnected) || c.lastEvent[p] != oldState {
121+
c.emitter.Emit(event.EvtPeerConnectednessChanged{
122+
Peer: p,
123+
Connectedness: c.lastEvent[p],
124+
})
125+
}
126+
}
127+
128+
func (c *connectednessEventEmitter) sendConnRemovedNotifications() {
129+
c.removeConnsMx.Lock()
130+
defer c.removeConnsMx.Unlock()
131+
for {
132+
if len(c.removeConns) == 0 {
133+
return
134+
}
135+
p := c.removeConns[0]
136+
c.removeConns[0] = ""
137+
c.removeConns = c.removeConns[1:]
138+
139+
c.removeConnsMx.Unlock()
140+
c.notifyPeer(p, false)
141+
c.removeConnsMx.Lock()
142+
}
143+
}

p2p/net/swarm/dial_worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ loop:
340340
ad.expectedTCPUpgradeTime = time.Time{}
341341
if res.Conn != nil {
342342
// we got a connection, add it to the swarm
343-
conn, err := w.s.addConn(res.Conn, network.DirOutbound)
343+
conn, err := w.s.addConn(ad.ctx, res.Conn, network.DirOutbound)
344344
if err != nil {
345345
// oops no, we failed to add it to the swarm
346346
res.Conn.Close()

p2p/net/swarm/swarm.go

+34-75
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,7 @@ type Swarm struct {
144144
// down before continuing.
145145
refs sync.WaitGroup
146146

147-
emitter event.Emitter
148-
connectednessEventCh chan struct{}
149-
connectednessEmitterDone chan struct{}
147+
emitter event.Emitter
150148

151149
rcmgr network.ResourceManager
152150

@@ -158,8 +156,7 @@ type Swarm struct {
158156

159157
conns struct {
160158
sync.RWMutex
161-
m map[peer.ID][]*Conn
162-
connectednessEvents chan peer.ID
159+
m map[peer.ID][]*Conn
163160
}
164161

165162
listeners struct {
@@ -206,9 +203,10 @@ type Swarm struct {
206203

207204
dialRanker network.DialRanker
208205

209-
udpBlackHoleConfig blackHoleConfig
210-
ipv6BlackHoleConfig blackHoleConfig
211-
bhd *blackHoleDetector
206+
udpBlackHoleConfig blackHoleConfig
207+
ipv6BlackHoleConfig blackHoleConfig
208+
bhd *blackHoleDetector
209+
connectednessEventEmitter *connectednessEventEmitter
212210
}
213211

214212
// NewSwarm constructs a Swarm.
@@ -219,17 +217,15 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
219217
}
220218
ctx, cancel := context.WithCancel(context.Background())
221219
s := &Swarm{
222-
local: local,
223-
peers: peers,
224-
emitter: emitter,
225-
connectednessEventCh: make(chan struct{}, 1),
226-
connectednessEmitterDone: make(chan struct{}),
227-
ctx: ctx,
228-
ctxCancel: cancel,
229-
dialTimeout: defaultDialTimeout,
230-
dialTimeoutLocal: defaultDialTimeoutLocal,
231-
maResolver: madns.DefaultResolver,
232-
dialRanker: DefaultDialRanker,
220+
local: local,
221+
peers: peers,
222+
emitter: emitter,
223+
ctx: ctx,
224+
ctxCancel: cancel,
225+
dialTimeout: defaultDialTimeout,
226+
dialTimeoutLocal: defaultDialTimeoutLocal,
227+
maResolver: madns.DefaultResolver,
228+
dialRanker: DefaultDialRanker,
233229

234230
// A black hole is a binary property. On a network if UDP dials are blocked or there is
235231
// no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials
@@ -239,11 +235,11 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
239235
}
240236

241237
s.conns.m = make(map[peer.ID][]*Conn)
242-
s.conns.connectednessEvents = make(chan peer.ID, 32)
243238
s.listeners.m = make(map[transport.Listener]struct{})
244239
s.transports.m = make(map[int]transport.Transport)
245240
s.notifs.m = make(map[network.Notifiee]struct{})
246241
s.directConnNotifs.m = make(map[peer.ID][]chan struct{})
242+
s.connectednessEventEmitter = newConnectednessEventEmitter(s.Connectedness, emitter)
247243

248244
for _, opt := range opts {
249245
if err := opt(s); err != nil {
@@ -260,7 +256,6 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
260256
s.backf.init(s.ctx)
261257

262258
s.bhd = newBlackHoleDetector(s.udpBlackHoleConfig, s.ipv6BlackHoleConfig, s.metricsTracer)
263-
go s.connectednessEventEmitter()
264259
return s, nil
265260
}
266261

@@ -306,8 +301,7 @@ func (s *Swarm) close() {
306301

307302
// Wait for everything to finish.
308303
s.refs.Wait()
309-
close(s.conns.connectednessEvents)
310-
<-s.connectednessEmitterDone
304+
s.connectednessEventEmitter.Close()
311305
s.emitter.Close()
312306

313307
// Now close out any transports (if necessary). Do this after closing
@@ -338,7 +332,7 @@ func (s *Swarm) close() {
338332
wg.Wait()
339333
}
340334

341-
func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
335+
func (s *Swarm) addConn(ctx context.Context, tc transport.CapableConn, dir network.Direction) (*Conn, error) {
342336
var (
343337
p = tc.RemotePeer()
344338
addr = tc.RemoteMultiaddr()
@@ -397,18 +391,15 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
397391
// * One will be decremented after the close notifications fire in Conn.doClose
398392
// * The other will be decremented when Conn.start exits.
399393
s.refs.Add(2)
400-
401394
// Take the notification lock before releasing the conns lock to block
402395
// Disconnect notifications until after the Connect notifications done.
396+
// This lock also ensures that swarm.refs.Wait() exits after we have
397+
// enqueued the peer connectedness changed notification.
398+
// TODO: Fix this fragility by taking a swarm ref for dial worker loop
403399
c.notifyLk.Lock()
404400
s.conns.Unlock()
405401

406-
// Block this goroutine till this request is enqueued.
407-
// This ensures that there are only a finite number of goroutines that are waiting to send
408-
// the connectedness event on the disconnection side in swarm.removeConn.
409-
// This is so because the goroutine to enqueue disconnection event can only be started
410-
// from either a subscriber or a notifier or after calling c.start
411-
s.conns.connectednessEvents <- p
402+
s.connectednessEventEmitter.AddConn(p)
412403

413404
if !isLimited {
414405
// Notify goroutines waiting for a direct connection
@@ -423,7 +414,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
423414
delete(s.directConnNotifs.m, p)
424415
s.directConnNotifs.Unlock()
425416
}
426-
427417
s.notifyAll(func(f network.Notifiee) {
428418
f.Connected(s, c)
429419
})
@@ -771,52 +761,21 @@ func (s *Swarm) removeConn(c *Conn) {
771761

772762
s.conns.Lock()
773763
cs := s.conns.m[p]
774-
if len(cs) == 1 {
775-
delete(s.conns.m, p)
776-
} else {
777-
for i, ci := range cs {
778-
if ci == c {
779-
// NOTE: We're intentionally preserving order.
780-
// This way, connections to a peer are always
781-
// sorted oldest to newest.
782-
copy(cs[i:], cs[i+1:])
783-
cs[len(cs)-1] = nil
784-
s.conns.m[p] = cs[:len(cs)-1]
785-
break
786-
}
764+
for i, ci := range cs {
765+
if ci == c {
766+
// NOTE: We're intentionally preserving order.
767+
// This way, connections to a peer are always
768+
// sorted oldest to newest.
769+
copy(cs[i:], cs[i+1:])
770+
cs[len(cs)-1] = nil
771+
s.conns.m[p] = cs[:len(cs)-1]
772+
break
787773
}
788774
}
789-
s.conns.Unlock()
790-
// Do this in a separate go routine to not block the caller.
791-
// This ensures that if a event subscriber closes the connection from the subscription goroutine
792-
// this doesn't deadlock
793-
s.refs.Add(1)
794-
go func() {
795-
defer s.refs.Done()
796-
s.conns.connectednessEvents <- p
797-
}()
798-
}
799-
800-
func (s *Swarm) connectednessEventEmitter() {
801-
defer close(s.connectednessEmitterDone)
802-
lastConnectednessEvents := make(map[peer.ID]network.Connectedness)
803-
for p := range s.conns.connectednessEvents {
804-
s.conns.Lock()
805-
oldState := lastConnectednessEvents[p]
806-
newState := s.connectednessUnlocked(p)
807-
if newState != network.NotConnected {
808-
lastConnectednessEvents[p] = newState
809-
} else {
810-
delete(lastConnectednessEvents, p)
811-
}
812-
s.conns.Unlock()
813-
if newState != oldState {
814-
s.emitter.Emit(event.EvtPeerConnectednessChanged{
815-
Peer: p,
816-
Connectedness: newState,
817-
})
818-
}
775+
if len(s.conns.m[p]) == 0 {
776+
delete(s.conns.m, p)
819777
}
778+
s.conns.Unlock()
820779
}
821780

822781
// String returns a string representation of Network.

p2p/net/swarm/swarm_conn.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ func (c *Conn) doClose() {
7373

7474
c.err = c.conn.Close()
7575

76+
// Send the connectedness event after closing the connection.
77+
// This ensures that both remote connection close and local connection
78+
// close events are sent after the underlying transport connection is closed.
79+
c.swarm.connectednessEventEmitter.RemoveConn(c.RemotePeer())
80+
7681
// This is just for cleaning up state. The connection has already been closed.
7782
// We *could* optimize this but it really isn't worth it.
7883
for s := range streams {
@@ -85,10 +90,11 @@ func (c *Conn) doClose() {
8590
c.notifyLk.Lock()
8691
defer c.notifyLk.Unlock()
8792

93+
// Only notify for disconnection if we notified for connection
8894
c.swarm.notifyAll(func(f network.Notifiee) {
8995
f.Disconnected(c.swarm, c)
9096
})
91-
c.swarm.refs.Done() // taken in Swarm.addConn
97+
c.swarm.refs.Done()
9298
}()
9399
}
94100

@@ -108,7 +114,6 @@ func (c *Conn) start() {
108114
go func() {
109115
defer c.swarm.refs.Done()
110116
defer c.Close()
111-
112117
for {
113118
ts, err := c.conn.AcceptStream()
114119
if err != nil {

0 commit comments

Comments
 (0)