Skip to content

Commit

Permalink
Allow cancelling IWANT using IDONTWANT
Browse files Browse the repository at this point in the history
As specified in the Gossipsub v1.2 spec, we should allow cancelling
IWANT by IDONTWANT.

That is if IDONTWANT already arrived, we should not process IWANT.

However due to the code structure, we can cancel IWANT only in
handleIWant.
  • Loading branch information
ppopth committed Dec 29, 2024
1 parent 0936035 commit a450ff7
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 0 deletions.
5 changes: 5 additions & 0 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,11 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
ihave := make(map[string]*pb.Message)
for _, iwant := range ctl.GetIwant() {
for _, mid := range iwant.GetMessageIDs() {
// Check if that peer has sent IDONTWANT before, if so don't send them the message
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
continue
}

msg, count, ok := gs.mcache.GetForPeer(mid, p)
if !ok {
continue
Expand Down
104 changes: 104 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3079,6 +3079,110 @@ func TestGossipsubIdontwantSmallMessage(t *testing.T) {
<-ctx.Done()
}

// Test that IWANT will have no effect after IDONTWANT is sent
func TestGossipsubIdontwantBeforeIwant(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 3)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

psubs := make([]*PubSub, 2)
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

// Wait a bit after the last message before checking the result
msgWaitMax := 2 * time.Second
msgTimer := time.NewTimer(msgWaitMax)

// Checks we received right messages
msgReceived := false
ihaveReceived := false
checkMsgs := func() {
if msgReceived {
t.Fatalf("Expected no messages received after IDONWANT")
}
if !ihaveReceived {
t.Fatalf("Expected IHAVE received")
}
}

// Wait for the timer to expire
go func() {
select {
case <-msgTimer.C:
checkMsgs()
cancel()
return
case <-ctx.Done():
checkMsgs()
}
}()

newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// Check if it receives any message
if len(irpc.GetPublish()) > 0 {
msgReceived = true
}
// The middle peer is supposed to send IHAVE
for _, ihave := range irpc.GetControl().GetIhave() {
ihaveReceived = true
mids := ihave.GetMessageIDs()

writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: mids}}},
})
// Wait for the middle peer to process IDONTWANT
time.Sleep(100 * time.Millisecond)
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Iwant: []*pb.ControlIWant{{MessageIDs: mids}}},
})
}
// When the middle peer connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and pruning to the middle peer to make sure
// that it's not in the mesh
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}},
})

go func() {
// Wait for an interval to make sure the middle peer
// received and processed the subscribe
time.Sleep(100 * time.Millisecond)

data := make([]byte, 16)
crand.Read(data)

// Publish the message from the first peer
if err := psubs[0].Publish(topic, data); err != nil {
t.Error(err)
return // cannot call t.Fatal in a non-test goroutine
}
}()
}
}
})

connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])

<-ctx.Done()
}

// Test that IDONTWANT will cleared when it's old enough
func TestGossipsubIdontwantClear(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit a450ff7

Please sign in to comment.