Skip to content

Commit b542b15

Browse files
committed
review comments
1 parent 25ccbf2 commit b542b15

5 files changed

+20
-23
lines changed

p2p/net/swarm/connectedness_event_emitter.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,14 @@ func (c *connectednessEventEmitter) RemoveConn(p peer.ID) {
6666
}
6767

6868
c.removeConnsMx.Lock()
69-
// This queue is not unbounded since we block in the AddConn method
70-
// So we are adding connections to the swarm only at a rate
71-
// the subscriber for our peer connectedness changed events can consume them.
72-
// If a lot of open connections are closed at once, increasing the disconnected
73-
// event notification rate, the rate of adding connections to the swarm would
74-
// proportionately reduce, which would eventually reduce the length of this slice.
69+
// This queue is roughly bounded by the total number of added connections we
70+
// have. If consumers of connectedness events are slow, we apply
71+
// backpressure to AddConn operations.
72+
//
73+
// We purposefully don't block/backpressure here to avoid deadlocks, since it's
74+
// reasonable for a consumer of the event to want to remove a connection.
7575
c.removeConns = append(c.removeConns, p)
76+
7677
c.removeConnsMx.Unlock()
7778

7879
select {
@@ -111,6 +112,12 @@ func (c *connectednessEventEmitter) runEmitter() {
111112
}
112113
}
113114

115+
// notifyPeer sends the peer connectedness event using the emitter.
116+
// Use forceNotConnectedEvent = true to send a NotConnected event even if
117+
// no Connected event was sent for this peer.
118+
// In case a peer is disconnected before we sent the Connected event, we still
119+
// send the Disconnected event because a connection to the peer can be observed
120+
// in such cases.
114121
func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) {
115122
oldState := c.lastEvent[p]
116123
c.lastEvent[p] = c.connectedness(p)
@@ -127,17 +134,10 @@ func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent
127134

128135
func (c *connectednessEventEmitter) sendConnRemovedNotifications() {
129136
c.removeConnsMx.Lock()
130-
defer c.removeConnsMx.Unlock()
131-
for {
132-
if len(c.removeConns) == 0 {
133-
return
134-
}
135-
p := c.removeConns[0]
136-
c.removeConns[0] = ""
137-
c.removeConns = c.removeConns[1:]
138-
139-
c.removeConnsMx.Unlock()
137+
removeConns := c.removeConns
138+
c.removeConns = nil
139+
c.removeConnsMx.Unlock()
140+
for _, p := range removeConns {
140141
c.notifyPeer(p, false)
141-
c.removeConnsMx.Lock()
142142
}
143143
}

p2p/net/swarm/dial_worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ loop:
340340
ad.expectedTCPUpgradeTime = time.Time{}
341341
if res.Conn != nil {
342342
// we got a connection, add it to the swarm
343-
conn, err := w.s.addConn(ad.ctx, res.Conn, network.DirOutbound)
343+
conn, err := w.s.addConn(res.Conn, network.DirOutbound)
344344
if err != nil {
345345
// oops no, we failed to add it to the swarm
346346
res.Conn.Close()

p2p/net/swarm/swarm.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func (s *Swarm) close() {
338338
wg.Wait()
339339
}
340340

341-
func (s *Swarm) addConn(ctx context.Context, tc transport.CapableConn, dir network.Direction) (*Conn, error) {
341+
func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
342342
var (
343343
p = tc.RemotePeer()
344344
addr = tc.RemoteMultiaddr()

p2p/net/swarm/swarm_event_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package swarm_test
22

33
import (
44
"context"
5-
"fmt"
65
"sync"
76
"testing"
87
"time"
@@ -308,5 +307,4 @@ func TestConnectednessEventDeadlockWithDial(t *testing.T) {
308307

309308
close(done)
310309
subWG.Wait()
311-
fmt.Println("swarm closed")
312310
}

p2p/net/swarm/swarm_listen.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package swarm
22

33
import (
4-
"context"
54
"errors"
65
"fmt"
76
"time"
@@ -143,7 +142,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
143142
s.refs.Add(1)
144143
go func() {
145144
defer s.refs.Done()
146-
_, err := s.addConn(context.Background(), c, network.DirInbound)
145+
_, err := s.addConn(c, network.DirInbound)
147146
switch err {
148147
case nil:
149148
case ErrSwarmClosed:

0 commit comments

Comments
 (0)