Skip to content

Commit

Permalink
Merge branch 'master' into cancel-iwant
Browse files Browse the repository at this point in the history
  • Loading branch information
ppopth authored Dec 29, 2024
2 parents bb11f5c + 0936035 commit 3873331
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 1 deletion.
17 changes: 16 additions & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
GossipSubGraftFloodThreshold = 10 * time.Second
GossipSubMaxIHaveLength = 5000
GossipSubMaxIHaveMessages = 10
GossipSubMaxIDontWantLength = 10
GossipSubMaxIDontWantMessages = 1000
GossipSubIWantFollowupTime = 3 * time.Second
GossipSubIDontWantMessageThreshold = 1024 // 1KB
Expand Down Expand Up @@ -218,6 +219,10 @@ type GossipSubParams struct {
// MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
MaxIHaveMessages int

// MaxIDontWantLength is the maximum number of messages to include in an IDONTWANT message. Also controls
// the maximum number of IDONTWANT ids we will accept to protect against IDONTWANT floods. This value
// should be adjusted if your system anticipates a larger amount than specified per heartbeat.
MaxIDontWantLength int
// MaxIDontWantMessages is the maximum number of IDONTWANT messages to accept from a peer within a heartbeat.
MaxIDontWantMessages int

Expand Down Expand Up @@ -303,6 +308,7 @@ func DefaultGossipSubParams() GossipSubParams {
GraftFloodThreshold: GossipSubGraftFloodThreshold,
MaxIHaveLength: GossipSubMaxIHaveLength,
MaxIHaveMessages: GossipSubMaxIHaveMessages,
MaxIDontWantLength: GossipSubMaxIDontWantLength,
MaxIDontWantMessages: GossipSubMaxIDontWantMessages,
IWantFollowupTime: GossipSubIWantFollowupTime,
IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold,
Expand Down Expand Up @@ -1014,9 +1020,18 @@ func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) {
}
gs.peerdontwant[p]++

totalUnwantedIds := 0
// Remember all the unwanted message ids
mainIDWLoop:
for _, idontwant := range ctl.GetIdontwant() {
for _, mid := range idontwant.GetMessageIDs() {
// IDONTWANT flood protection
if totalUnwantedIds >= gs.params.MaxIDontWantLength {
log.Debugf("IDONWANT: peer %s has advertised too many ids (%d) within this message; ignoring", p, totalUnwantedIds)
break mainIDWLoop
}

totalUnwantedIds++
gs.unwanted[p][computeChecksum(mid)] = gs.params.IDontWantMessageTTL
}
}
Expand Down Expand Up @@ -1610,7 +1625,7 @@ func (gs *GossipSubRouter) heartbeat() {
}

// do we have too many peers?
if len(peers) > gs.params.Dhi {
if len(peers) >= gs.params.Dhi {
plst := peerMapToList(peers)

// sort by score (but shuffle first for the case we don't use the score)
Expand Down
48 changes: 48 additions & 0 deletions gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -891,6 +892,53 @@ func TestGossipsubAttackSpamIDONTWANT(t *testing.T) {
<-ctx.Done()
}

func TestGossipsubHandleIDontwantSpam(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 2)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

psubs := make([]*PubSub, 2)
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))

connect(t, hosts[0], hosts[1])

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}
exceededIDWLength := GossipSubMaxIDontWantLength + 1
var idwIds []string
for i := 0; i < exceededIDWLength; i++ {
idwIds = append(idwIds, fmt.Sprintf("idontwant-%d", i))
}
rPid := hosts[1].ID()
ctrlMessage := &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: idwIds}}}
grt := psubs[0].rt.(*GossipSubRouter)
grt.handleIDontWant(rPid, ctrlMessage)

if grt.peerdontwant[rPid] != 1 {
t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid])
}
mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok {
t.Errorf("Desired message id was not stored in the unwanted map: %s", mid)
}

mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok {
t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid)
}
}

type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)

func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg mockGSOnRead) {
Expand Down
47 changes: 47 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3280,6 +3280,53 @@ func TestGossipsubIdontwantClear(t *testing.T) {
<-ctx.Done()
}

func TestGossipsubPruneMeshCorrectly(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 9)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

params := DefaultGossipSubParams()
params.Dhi = 8

psubs := make([]*PubSub, 9)
for i := 0; i < 9; i++ {
psubs[i] = getGossipsub(ctx, hosts[i],
WithGossipSubParams(params),
WithMessageIdFn(msgID))
}

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

// Connect first peer with the rest of the 8 other
// peers.
for i := 1; i < 9; i++ {
connect(t, hosts[0], hosts[i])
}

// Wait for 2 heartbeats to be able to prune excess peers back down to D.
totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval
time.Sleep(totalTimeToWait)

meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic]
if !ok {
t.Fatal("mesh does not exist for topic")
}
if len(meshPeers) != params.D {
t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers))
}
}

func BenchmarkAllocDoDropRPC(b *testing.B) {
gs := GossipSubRouter{tracer: &pubsubTracer{}}

Expand Down

0 comments on commit 3873331

Please sign in to comment.