Skip to content

Commit

Permalink
improve handling of dead peers (#508)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan authored Nov 20, 2022
1 parent aed7fc4 commit 7612414
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio"
"github.com/libp2p/go-msgio/protoio"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
Expand Down Expand Up @@ -126,7 +125,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}

go p.handleSendingMessages(ctx, s, outgoing)
go p.handlePeerEOF(ctx, s)
go p.handlePeerDead(s)
select {
case p.newPeerStream <- s:
case <-ctx.Done():
Expand All @@ -142,19 +141,16 @@ func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, back
}
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
func (p *PubSub) handlePeerDead(s network.Stream) {
pid := s.Conn().RemotePeer()
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
if err != nil {
p.notifyPeerDead(pid)
return
}

_, err := s.Read([]byte{0})
if err == nil {
log.Debugf("unexpected message from %s", pid)
}

s.Reset()
p.notifyPeerDead(pid)
}

func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
Expand Down

0 comments on commit 7612414

Please sign in to comment.