Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Oct 1, 2023
1 parent db8eba8 commit 7d09432
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,17 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
s.conns.Unlock()

// Notify goroutines waiting for a direct connection

// Go routines interested in waiting for direct connection first acquire this lock and then
// acquire conns.RLock. Do not acquire this lock before conns.Unlock to prevent deadlock.
s.directConnNotifs.Lock()
if !c.Stat().Transient {
// Go routines interested in waiting for direct connection first acquire this lock
// and then acquire s.conns.RLock. Do not acquire this lock before conns.Unlock to
// prevent deadlock.
s.directConnNotifs.Lock()
for _, ch := range s.directConnNotifs.m[p] {
close(ch)
}
delete(s.directConnNotifs.m, p)
s.directConnNotifs.Unlock()
}
s.directConnNotifs.Unlock()

// Emit event after releasing `s.conns` lock so that a consumer can still
// use swarm methods that need the `s.conns` lock.
Expand Down Expand Up @@ -466,15 +466,15 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error
//
// TODO: Try all connections even if we get an error opening a stream on
// a non-closed connection.
dialed := false
for i := 0; i < 1; i++ {
numDials := 0
for {
c := s.bestConnToPeer(p)
if c == nil {
if nodial, _ := network.GetNoDial(ctx); !nodial {
if dialed {
numDials++
if numDials > DialAttempts {
return nil, errors.New("max dial attempts exceeded")
}
dialed = true
var err error
c, err = s.dialPeer(ctx, p)
if err != nil {
Expand Down Expand Up @@ -503,7 +503,6 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error
}
return str, nil
}
return nil, network.ErrNoConn
}

// waitForDirectConn waits for a direct connection established through hole punching or connection reversal.
Expand All @@ -524,30 +523,33 @@ func (s *Swarm) waitForDirectConn(ctx context.Context, p peer.ID) (*Conn, error)
s.directConnNotifs.m[p] = append(s.directConnNotifs.m[p], ch)
s.directConnNotifs.Unlock()

// Wait for notification.
// There's no point waiting for more than a minute here.
ctx, cancel := context.WithTimeout(ctx, time.Minute)
// apply the DialPeer timeout
ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
defer cancel()

// Wait for notification.
select {
case <-ctx.Done():
// Remove ourselves from the notification list
s.directConnNotifs.Lock()
defer s.directConnNotifs.Unlock()

s.directConnNotifs.m[p] = slices.DeleteFunc(
s.directConnNotifs.m[p],
func(c chan struct{}) bool { return c == ch },
)
if len(s.directConnNotifs.m[p]) == 0 {
delete(s.directConnNotifs.m, p)
}
s.directConnNotifs.Unlock()
return nil, ctx.Err()
case <-ch:
// We do not need to remove ourselves from the list here as the notifier
// clears the map
// clears the map entry
c := s.bestConnToPeer(p)
if c == nil {
return nil, network.ErrNoConn
} else if c.Stat().Transient {
}
if c.Stat().Transient {
return nil, network.ErrTransientConn
}
return c, nil
Expand Down

0 comments on commit 7d09432

Please sign in to comment.