Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow cancelling IWANT using IDONTWANT #591

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,11 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
ihave := make(map[string]*pb.Message)
for _, iwant := range ctl.GetIwant() {
for _, mid := range iwant.GetMessageIDs() {
// Check if that peer has sent IDONTWANT before, if so don't send them the message
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how expensive is this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant computeChecksum? no, it's just one hash function.

continue
}

msg, count, ok := gs.mcache.GetForPeer(mid, p)
if !ok {
continue
Expand Down
104 changes: 104 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3079,6 +3079,110 @@ func TestGossipsubIdontwantSmallMessage(t *testing.T) {
<-ctx.Done()
}

// Test that IWANT will have no effect after IDONTWANT is sent
func TestGossipsubIdontwantBeforeIwant(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 3)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

psubs := make([]*PubSub, 2)
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

// Wait a bit after the last message before checking the result
msgWaitMax := 2 * time.Second
msgTimer := time.NewTimer(msgWaitMax)

// Checks we received right messages
msgReceived := false
ihaveReceived := false
checkMsgs := func() {
if msgReceived {
t.Fatalf("Expected no messages received after IDONWANT")
}
if !ihaveReceived {
t.Fatalf("Expected IHAVE received")
}
}

// Wait for the timer to expire
go func() {
select {
case <-msgTimer.C:
checkMsgs()
cancel()
return
case <-ctx.Done():
checkMsgs()
}
}()

newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// Check if it receives any message
if len(irpc.GetPublish()) > 0 {
msgReceived = true
}
// The middle peer is supposed to send IHAVE
for _, ihave := range irpc.GetControl().GetIhave() {
ihaveReceived = true
mids := ihave.GetMessageIDs()

writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: mids}}},
})
// Wait for the middle peer to process IDONTWANT
time.Sleep(100 * time.Millisecond)
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Iwant: []*pb.ControlIWant{{MessageIDs: mids}}},
})
}
// When the middle peer connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and pruning to the middle peer to make sure
// that it's not in the mesh
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}},
})

go func() {
// Wait for an interval to make sure the middle peer
// received and processed the subscribe
time.Sleep(100 * time.Millisecond)

data := make([]byte, 16)
crand.Read(data)

// Publish the message from the first peer
if err := psubs[0].Publish(topic, data); err != nil {
t.Error(err)
return // cannot call t.Fatal in a non-test goroutine
}
}()
}
}
})

connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])

<-ctx.Done()
}

// Test that IDONTWANT will cleared when it's old enough
func TestGossipsubIdontwantClear(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading