Skip to content

Commit 02a74d8

Browse files
committed
Always send 1 event for a connection
1 parent c68a035 commit 02a74d8

File tree

5 files changed

+197
-66
lines changed

5 files changed

+197
-66
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package swarm
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
8+
"github.com/libp2p/go-libp2p/core/event"
9+
"github.com/libp2p/go-libp2p/core/network"
10+
"github.com/libp2p/go-libp2p/core/peer"
11+
)
12+
13+
// connectednessEventEmitter emits PeerConnectednessChanged events.
14+
// We ensure that for any peer we connected to we always sent atleast 1 NotConnected Event after
15+
// the peer disconnects. This is because peers can observe a connection before they are notified
16+
// of the connection by a peer connectedness changed event.
17+
type connectednessEventEmitter struct {
18+
mx sync.RWMutex
19+
// newConns is the channel that holds the peerIDs we recently connected to
20+
newConns chan peer.ID
21+
removeConnsMx sync.Mutex
22+
// removeConns is a slice of peerIDs we have recently closed connections to
23+
removeConns []peer.ID
24+
// lastEvent is the last connectedness event sent for a particular peer.
25+
lastEvent map[peer.ID]network.Connectedness
26+
// connectedness is the function that gives the peers current connectedness state
27+
connectedness func(peer.ID) network.Connectedness
28+
// emitter is the PeerConnectednessChanged event emitter
29+
emitter event.Emitter
30+
wg sync.WaitGroup
31+
removeConnNotif chan struct{}
32+
ctx context.Context
33+
cancel context.CancelFunc
34+
}
35+
36+
func newConnectednessEventEmitter(connectedness func(peer.ID) network.Connectedness, emitter event.Emitter) *connectednessEventEmitter {
37+
ctx, cancel := context.WithCancel(context.Background())
38+
c := &connectednessEventEmitter{
39+
newConns: make(chan peer.ID, 32),
40+
lastEvent: make(map[peer.ID]network.Connectedness),
41+
removeConnNotif: make(chan struct{}, 1),
42+
connectedness: connectedness,
43+
emitter: emitter,
44+
ctx: ctx,
45+
cancel: cancel,
46+
}
47+
c.wg.Add(1)
48+
go c.runEmitter()
49+
return c
50+
}
51+
52+
func (c *connectednessEventEmitter) AddConn(ctx context.Context, p peer.ID) error {
53+
c.mx.RLock()
54+
defer c.mx.RUnlock()
55+
if c.ctx.Err() != nil {
56+
return errors.New("emitter closed")
57+
}
58+
59+
select {
60+
case c.newConns <- p:
61+
return nil
62+
case <-ctx.Done():
63+
return ctx.Err()
64+
case <-c.ctx.Done():
65+
return errors.New("emitter closed")
66+
}
67+
}
68+
69+
func (c *connectednessEventEmitter) RemoveConn(p peer.ID) {
70+
c.mx.RLock()
71+
defer c.mx.RUnlock()
72+
if c.ctx.Err() != nil {
73+
return
74+
}
75+
76+
c.removeConnsMx.Lock()
77+
// This queue is not unbounded since we block in the AddConn method
78+
// So we are adding connections to the swarm only at a rate
79+
// the subscriber for our peer connectedness changed events can consume them.
80+
// If a lot of open connections are closed at once, increasing the disconnected
81+
// event notification rate, the rate of adding connections to the swarm would
82+
// proportionately reduce, which would eventually reduce the length of this slice.
83+
c.removeConns = append(c.removeConns, p)
84+
c.removeConnsMx.Unlock()
85+
86+
select {
87+
case c.removeConnNotif <- struct{}{}:
88+
default:
89+
}
90+
}
91+
92+
func (c *connectednessEventEmitter) Close() {
93+
c.cancel()
94+
c.wg.Wait()
95+
}
96+
97+
func (c *connectednessEventEmitter) runEmitter() {
98+
defer c.wg.Done()
99+
for {
100+
select {
101+
case p := <-c.newConns:
102+
c.notifyPeer(p, true)
103+
case <-c.removeConnNotif:
104+
c.sendConnRemovedNotifications()
105+
case <-c.ctx.Done():
106+
c.mx.Lock() // Wait for all pending AddConn & RemoveConn operations to complete
107+
defer c.mx.Unlock()
108+
for {
109+
select {
110+
case p := <-c.newConns:
111+
c.notifyPeer(p, true)
112+
case <-c.removeConnNotif:
113+
c.sendConnRemovedNotifications()
114+
default:
115+
return
116+
}
117+
}
118+
}
119+
}
120+
}
121+
122+
func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) {
123+
oldState := c.lastEvent[p]
124+
c.lastEvent[p] = c.connectedness(p)
125+
if c.lastEvent[p] == network.NotConnected {
126+
delete(c.lastEvent, p)
127+
}
128+
if (forceNotConnectedEvent && c.lastEvent[p] == network.NotConnected) || c.lastEvent[p] != oldState {
129+
c.emitter.Emit(event.EvtPeerConnectednessChanged{
130+
Peer: p,
131+
Connectedness: c.lastEvent[p],
132+
})
133+
}
134+
}
135+
136+
func (c *connectednessEventEmitter) sendConnRemovedNotifications() {
137+
c.removeConnsMx.Lock()
138+
defer c.removeConnsMx.Unlock()
139+
for {
140+
if len(c.removeConns) == 0 {
141+
return
142+
}
143+
p := c.removeConns[0]
144+
c.removeConns[0] = ""
145+
c.removeConns = c.removeConns[1:]
146+
147+
c.removeConnsMx.Unlock()
148+
c.notifyPeer(p, false)
149+
c.removeConnsMx.Lock()
150+
}
151+
}

p2p/net/swarm/dial_worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ loop:
340340
ad.expectedTCPUpgradeTime = time.Time{}
341341
if res.Conn != nil {
342342
// we got a connection, add it to the swarm
343-
conn, err := w.s.addConn(res.Conn, network.DirOutbound)
343+
conn, err := w.s.addConn(ad.ctx, res.Conn, network.DirOutbound)
344344
if err != nil {
345345
// oops no, we failed to add it to the swarm
346346
res.Conn.Close()

p2p/net/swarm/swarm.go

+30-60
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,7 @@ type Swarm struct {
144144
// down before continuing.
145145
refs sync.WaitGroup
146146

147-
emitter event.Emitter
148-
connectednessEventCh chan struct{}
149-
connectednessEmitterDone chan struct{}
147+
emitter event.Emitter
150148

151149
rcmgr network.ResourceManager
152150

@@ -158,8 +156,7 @@ type Swarm struct {
158156

159157
conns struct {
160158
sync.RWMutex
161-
m map[peer.ID][]*Conn
162-
connectednessEvents chan peer.ID
159+
m map[peer.ID][]*Conn
163160
}
164161

165162
listeners struct {
@@ -206,9 +203,10 @@ type Swarm struct {
206203

207204
dialRanker network.DialRanker
208205

209-
udpBlackHoleConfig blackHoleConfig
210-
ipv6BlackHoleConfig blackHoleConfig
211-
bhd *blackHoleDetector
206+
udpBlackHoleConfig blackHoleConfig
207+
ipv6BlackHoleConfig blackHoleConfig
208+
bhd *blackHoleDetector
209+
connectednessEventEmitter *connectednessEventEmitter
212210
}
213211

214212
// NewSwarm constructs a Swarm.
@@ -219,17 +217,15 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
219217
}
220218
ctx, cancel := context.WithCancel(context.Background())
221219
s := &Swarm{
222-
local: local,
223-
peers: peers,
224-
emitter: emitter,
225-
connectednessEventCh: make(chan struct{}, 1),
226-
connectednessEmitterDone: make(chan struct{}),
227-
ctx: ctx,
228-
ctxCancel: cancel,
229-
dialTimeout: defaultDialTimeout,
230-
dialTimeoutLocal: defaultDialTimeoutLocal,
231-
maResolver: madns.DefaultResolver,
232-
dialRanker: DefaultDialRanker,
220+
local: local,
221+
peers: peers,
222+
emitter: emitter,
223+
ctx: ctx,
224+
ctxCancel: cancel,
225+
dialTimeout: defaultDialTimeout,
226+
dialTimeoutLocal: defaultDialTimeoutLocal,
227+
maResolver: madns.DefaultResolver,
228+
dialRanker: DefaultDialRanker,
233229

234230
// A black hole is a binary property. On a network if UDP dials are blocked or there is
235231
// no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials
@@ -239,11 +235,11 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
239235
}
240236

241237
s.conns.m = make(map[peer.ID][]*Conn)
242-
s.conns.connectednessEvents = make(chan peer.ID, 32)
243238
s.listeners.m = make(map[transport.Listener]struct{})
244239
s.transports.m = make(map[int]transport.Transport)
245240
s.notifs.m = make(map[network.Notifiee]struct{})
246241
s.directConnNotifs.m = make(map[peer.ID][]chan struct{})
242+
s.connectednessEventEmitter = newConnectednessEventEmitter(s.Connectedness, emitter)
247243

248244
for _, opt := range opts {
249245
if err := opt(s); err != nil {
@@ -260,7 +256,6 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
260256
s.backf.init(s.ctx)
261257

262258
s.bhd = newBlackHoleDetector(s.udpBlackHoleConfig, s.ipv6BlackHoleConfig, s.metricsTracer)
263-
go s.connectednessEventEmitter()
264259
return s, nil
265260
}
266261

@@ -306,8 +301,7 @@ func (s *Swarm) close() {
306301

307302
// Wait for everything to finish.
308303
s.refs.Wait()
309-
close(s.conns.connectednessEvents)
310-
<-s.connectednessEmitterDone
304+
s.connectednessEventEmitter.Close()
311305
s.emitter.Close()
312306

313307
// Now close out any transports (if necessary). Do this after closing
@@ -338,7 +332,7 @@ func (s *Swarm) close() {
338332
wg.Wait()
339333
}
340334

341-
func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
335+
func (s *Swarm) addConn(ctx context.Context, tc transport.CapableConn, dir network.Direction) (*Conn, error) {
342336
var (
343337
p = tc.RemotePeer()
344338
addr = tc.RemoteMultiaddr()
@@ -403,12 +397,17 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
403397
c.notifyLk.Lock()
404398
s.conns.Unlock()
405399

406-
// Block this goroutine till this request is enqueued.
407-
// This ensures that there are only a finite number of goroutines that are waiting to send
408-
// the connectedness event on the disconnection side in swarm.removeConn.
409-
// This is so because the goroutine to enqueue disconnection event can only be started
410-
// from either a subscriber or a notifier or after calling c.start
411-
s.conns.connectednessEvents <- p
400+
err := s.connectednessEventEmitter.AddConn(ctx, p)
401+
if err != nil {
402+
// Either the subscriber is busy or the swarm is closed
403+
c.Close()
404+
s.removeConn(c)
405+
c.notifyLk.Unlock()
406+
// Remove both the references we aren't going to send any notifications
407+
// or start this connection
408+
s.refs.Add(-2)
409+
return nil, fmt.Errorf("failed to send connectedness event: %w", err)
410+
}
412411

413412
if !isLimited {
414413
// Notify goroutines waiting for a direct connection
@@ -427,6 +426,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
427426
s.notifyAll(func(f network.Notifiee) {
428427
f.Connected(s, c)
429428
})
429+
c.notified = true
430430
c.notifyLk.Unlock()
431431

432432
c.start()
@@ -787,36 +787,6 @@ func (s *Swarm) removeConn(c *Conn) {
787787
}
788788
}
789789
s.conns.Unlock()
790-
// Do this in a separate go routine to not block the caller.
791-
// This ensures that if a event subscriber closes the connection from the subscription goroutine
792-
// this doesn't deadlock
793-
s.refs.Add(1)
794-
go func() {
795-
defer s.refs.Done()
796-
s.conns.connectednessEvents <- p
797-
}()
798-
}
799-
800-
func (s *Swarm) connectednessEventEmitter() {
801-
defer close(s.connectednessEmitterDone)
802-
lastConnectednessEvents := make(map[peer.ID]network.Connectedness)
803-
for p := range s.conns.connectednessEvents {
804-
s.conns.Lock()
805-
oldState := lastConnectednessEvents[p]
806-
newState := s.connectednessUnlocked(p)
807-
if newState != network.NotConnected {
808-
lastConnectednessEvents[p] = newState
809-
} else {
810-
delete(lastConnectednessEvents, p)
811-
}
812-
s.conns.Unlock()
813-
if newState != oldState {
814-
s.emitter.Emit(event.EvtPeerConnectednessChanged{
815-
Peer: p,
816-
Connectedness: newState,
817-
})
818-
}
819-
}
820790
}
821791

822792
// String returns a string representation of Network.

p2p/net/swarm/swarm_conn.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Conn struct {
3131
err error
3232

3333
notifyLk sync.Mutex
34+
notified bool
3435

3536
streams struct {
3637
sync.Mutex
@@ -73,6 +74,11 @@ func (c *Conn) doClose() {
7374

7475
c.err = c.conn.Close()
7576

77+
// Send the connectedness event after closing the connection.
78+
// This ensures that both remote connection close and local connection
79+
// close events are sent after the underlying transport connection is closed.
80+
c.swarm.connectednessEventEmitter.RemoveConn(c.RemotePeer())
81+
7682
// This is just for cleaning up state. The connection has already been closed.
7783
// We *could* optimize this but it really isn't worth it.
7884
for s := range streams {
@@ -85,10 +91,13 @@ func (c *Conn) doClose() {
8591
c.notifyLk.Lock()
8692
defer c.notifyLk.Unlock()
8793

88-
c.swarm.notifyAll(func(f network.Notifiee) {
89-
f.Disconnected(c.swarm, c)
90-
})
91-
c.swarm.refs.Done() // taken in Swarm.addConn
94+
defer c.swarm.refs.Done() // taken in Swarm.addConn
95+
if c.notified {
96+
// Only notify for disconnection if we notified for connection
97+
c.swarm.notifyAll(func(f network.Notifiee) {
98+
f.Disconnected(c.swarm, c)
99+
})
100+
}
92101
}()
93102
}
94103

p2p/net/swarm/swarm_listen.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package swarm
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"time"
@@ -142,7 +143,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
142143
s.refs.Add(1)
143144
go func() {
144145
defer s.refs.Done()
145-
_, err := s.addConn(c, network.DirInbound)
146+
_, err := s.addConn(context.Background(), c, network.DirInbound)
146147
switch err {
147148
case nil:
148149
case ErrSwarmClosed:

0 commit comments

Comments
 (0)