diff --git a/comm.go b/comm.go index 86380a74..b313943e 100644 --- a/comm.go +++ b/comm.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio" - "github.com/libp2p/go-msgio/protoio" pb "github.com/libp2p/go-libp2p-pubsub/pb" ) @@ -126,7 +125,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan } go p.handleSendingMessages(ctx, s, outgoing) - go p.handlePeerEOF(ctx, s) + go p.handlePeerDead(s) select { case p.newPeerStream <- s: case <-ctx.Done(): @@ -142,19 +141,16 @@ func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, back } } -func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { +func (p *PubSub) handlePeerDead(s network.Stream) { pid := s.Conn().RemotePeer() - r := protoio.NewDelimitedReader(s, p.maxMessageSize) - rpc := new(RPC) - for { - err := r.ReadMsg(&rpc.RPC) - if err != nil { - p.notifyPeerDead(pid) - return - } + _, err := s.Read([]byte{0}) + if err == nil { log.Debugf("unexpected message from %s", pid) } + + s.Reset() + p.notifyPeerDead(pid) } func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {