Skip to content

Commit 12d5826

Browse files
committed
simplify connectedness events
1 parent 2681035 commit 12d5826

File tree

1 file changed

+31
-71
lines changed

1 file changed

+31
-71
lines changed

p2p/net/swarm/swarm.go

+31-71
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,8 @@ type Swarm struct {
158158

159159
conns struct {
160160
sync.RWMutex
161-
m map[peer.ID][]*Conn
162-
connectednessEventQueue map[peer.ID][]network.Connectedness
163-
lastConnectednessEvent map[peer.ID]network.Connectedness
161+
m map[peer.ID][]*Conn
162+
connectednessEvents chan peer.ID
164163
}
165164

166165
listeners struct {
@@ -240,8 +239,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
240239
}
241240

242241
s.conns.m = make(map[peer.ID][]*Conn)
243-
s.conns.connectednessEventQueue = make(map[peer.ID][]network.Connectedness)
244-
s.conns.lastConnectednessEvent = make(map[peer.ID]network.Connectedness)
242+
s.conns.connectednessEvents = make(chan peer.ID, 32)
245243
s.listeners.m = make(map[transport.Listener]struct{})
246244
s.transports.m = make(map[int]transport.Transport)
247245
s.notifs.m = make(map[network.Notifiee]struct{})
@@ -308,17 +306,10 @@ func (s *Swarm) close() {
308306

309307
// Wait for everything to finish.
310308
s.refs.Wait()
311-
close(s.connectednessEventCh)
309+
close(s.conns.connectednessEvents)
312310
<-s.connectednessEmitterDone
313311
s.emitter.Close()
314312

315-
// Remove the connectedness map only after we have closed the connection and sent all the disconnection
316-
// events
317-
s.conns.Lock()
318-
s.conns.connectednessEventQueue = nil
319-
s.conns.lastConnectednessEvent = nil
320-
s.conns.Unlock()
321-
322313
// Now close out any transports (if necessary). Do this after closing
323314
// all connections/listeners.
324315
s.transports.Lock()
@@ -402,8 +393,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
402393

403394
c.streams.m = make(map[*Stream]struct{})
404395
s.conns.m[p] = append(s.conns.m[p], c)
405-
s.maybeEnqueueConnectednessUnlocked(p)
406-
407396
// Add two swarm refs:
408397
// * One will be decremented after the close notifications fire in Conn.doClose
409398
// * The other will be decremented when Conn.start exits.
@@ -414,6 +403,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
414403
c.notifyLk.Lock()
415404
s.conns.Unlock()
416405

406+
// Block this goroutine till this request is enqueued.
407+
// This ensures that there are only a finite number of goroutines that are waiting to send
408+
// the connectedness event on the disconnection side in swarm.removeConn.
409+
// This is so because the goroutine to enqueue disconnection event can only be started
410+
// from either a subscriber or a notifier or after calling c.start
411+
s.conns.connectednessEvents <- p
412+
417413
if !isLimited {
418414
// Notify goroutines waiting for a direct connection
419415
//
@@ -790,68 +786,32 @@ func (s *Swarm) removeConn(c *Conn) {
790786
}
791787
}
792788
}
793-
s.maybeEnqueueConnectednessUnlocked(p)
794789
s.conns.Unlock()
795-
}
796-
797-
func (s *Swarm) lastConnectednessEventUnlocked(p peer.ID) network.Connectedness {
798-
events := s.conns.connectednessEventQueue[p]
799-
if len(events) > 0 {
800-
return events[len(events)-1]
801-
}
802-
return s.conns.lastConnectednessEvent[p]
803-
}
804-
805-
func (s *Swarm) maybeEnqueueConnectednessUnlocked(p peer.ID) {
806-
oldState := s.lastConnectednessEventUnlocked(p)
807-
newState := s.connectednessUnlocked(p)
808-
if oldState != newState {
809-
if s.conns.connectednessEventQueue != nil {
810-
s.conns.connectednessEventQueue[p] = append(s.conns.connectednessEventQueue[p], newState)
811-
select {
812-
case s.connectednessEventCh <- struct{}{}:
813-
default:
814-
}
815-
} else {
816-
log.Errorf("SWARM BUG: nil connectedness map")
817-
}
818-
}
790+
// Do this in a separate go routine to not block the caller.
791+
// This ensures that if a event subscriber closes the connection from the subscription goroutine
792+
// this doesn't deadlock
793+
go func() {
794+
s.conns.connectednessEvents <- p
795+
}()
819796
}
820797

821798
func (s *Swarm) connectednessEventEmitter() {
822799
defer close(s.connectednessEmitterDone)
823-
for range s.connectednessEventCh {
824-
for {
825-
var c network.Connectedness
826-
var peer peer.ID
827-
s.conns.Lock()
828-
for p, v := range s.conns.connectednessEventQueue {
829-
if len(v) == 0 {
830-
// this shouldn't happen
831-
delete(s.conns.connectednessEventQueue, p)
832-
log.Errorf("SWARM BUG: empty connectedness event slice %v %v", p, v)
833-
continue
834-
}
835-
c = v[0]
836-
peer = p
837-
s.conns.connectednessEventQueue[p] = v[1:]
838-
if len(s.conns.connectednessEventQueue[p]) == 0 {
839-
delete(s.conns.connectednessEventQueue, p)
840-
}
841-
if c == network.NotConnected {
842-
delete(s.conns.lastConnectednessEvent, p)
843-
} else {
844-
s.conns.lastConnectednessEvent[p] = c
845-
}
846-
break
847-
}
848-
s.conns.Unlock()
849-
if peer == "" {
850-
break
851-
}
800+
lastConnectednessEvents := make(map[peer.ID]network.Connectedness)
801+
for p := range s.conns.connectednessEvents {
802+
s.conns.Lock()
803+
oldState := lastConnectednessEvents[p]
804+
newState := s.connectednessUnlocked(p)
805+
if newState != network.NotConnected {
806+
lastConnectednessEvents[p] = newState
807+
} else {
808+
delete(lastConnectednessEvents, p)
809+
}
810+
s.conns.Unlock()
811+
if newState != oldState {
852812
s.emitter.Emit(event.EvtPeerConnectednessChanged{
853-
Peer: peer,
854-
Connectedness: c,
813+
Peer: p,
814+
Connectedness: newState,
855815
})
856816
}
857817
}

0 commit comments

Comments
 (0)