From a450ff7826cab5fb80c82fedf0d72329f5cd161e Mon Sep 17 00:00:00 2001 From: Pop Chunhapanya Date: Mon, 30 Dec 2024 01:53:16 +0700 Subject: [PATCH] Allow cancelling IWANT using IDONTWANT As specified in the Gossipsub v1.2 spec, we should allow cancelling IWANT by IDONTWANT. That is if IDONTWANT already arrived, we should not process IWANT. However due to the code structure, we can cancel IWANT only in handleIWant. --- gossipsub.go | 5 +++ gossipsub_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index d6041624..56b6886c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -839,6 +839,11 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb. ihave := make(map[string]*pb.Message) for _, iwant := range ctl.GetIwant() { for _, mid := range iwant.GetMessageIDs() { + // Check if that peer has sent IDONTWANT before, if so don't send them the message + if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok { + continue + } + msg, count, ok := gs.mcache.GetForPeer(mid, p) if !ok { continue diff --git a/gossipsub_test.go b/gossipsub_test.go index 93edeeca..675d164c 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3079,6 +3079,110 @@ func TestGossipsubIdontwantSmallMessage(t *testing.T) { <-ctx.Done() } +// Test that IWANT will have no effect after IDONTWANT is sent +func TestGossipsubIdontwantBeforeIwant(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 3) + + msgID := func(pmsg *pb.Message) string { + // silly content-based test message-ID: just use the data as whole + return base64.URLEncoding.EncodeToString(pmsg.Data) + } + + psubs := make([]*PubSub, 2) + psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID)) + psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID)) + + topic := "foobar" + for _, ps := range psubs { + _, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + } + + // Wait a bit after the last message before checking the result + msgWaitMax := 2 * time.Second + msgTimer := time.NewTimer(msgWaitMax) + + // Checks we received right messages + msgReceived := false + ihaveReceived := false + checkMsgs := func() { + if msgReceived { + t.Fatalf("Expected no messages received after IDONWANT") + } + if !ihaveReceived { + t.Fatalf("Expected IHAVE received") + } + } + + // Wait for the timer to expire + go func() { + select { + case <-msgTimer.C: + checkMsgs() + cancel() + return + case <-ctx.Done(): + checkMsgs() + } + }() + + newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { + // Check if it receives any message + if len(irpc.GetPublish()) > 0 { + msgReceived = true + } + // The middle peer is supposed to send IHAVE + for _, ihave := range irpc.GetControl().GetIhave() { + ihaveReceived = true + mids := ihave.GetMessageIDs() + + writeMsg(&pb.RPC{ + Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: mids}}}, + }) + // Wait for the middle peer to process IDONTWANT + time.Sleep(100 * time.Millisecond) + writeMsg(&pb.RPC{ + Control: &pb.ControlMessage{Iwant: []*pb.ControlIWant{{MessageIDs: mids}}}, + }) + } + // When the middle peer connects it will send us its subscriptions + for _, sub := range irpc.GetSubscriptions() { + if sub.GetSubscribe() { + // Reply by subcribing to the topic and pruning to the middle peer to make sure + // that it's not in the mesh + writeMsg(&pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}}, + }) + + go func() { + // Wait for an interval to make sure the middle peer + // received and processed the subscribe + time.Sleep(100 * time.Millisecond) + + data := make([]byte, 16) + crand.Read(data) + + // Publish the message from the first peer + if err := psubs[0].Publish(topic, data); err != nil { + t.Error(err) + return // cannot call t.Fatal in a non-test goroutine + } + }() + } + } + }) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + <-ctx.Done() +} + // Test that IDONTWANT will cleared when it's old enough func TestGossipsubIdontwantClear(t *testing.T) { ctx, cancel := context.WithCancel(context.Background())