diff --git a/comm.go b/comm.go index af0ba207..a403f9fc 100644 --- a/comm.go +++ b/comm.go @@ -33,7 +33,7 @@ func (p *PubSub) getHelloPacket() *RPC { for t := range subscriptions { as := &pb.RPC_SubOpts{ - Topicid: proto.String(t), + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: t}, Subscribe: proto.Bool(true), } rpc.Subscriptions = append(rpc.Subscriptions, as) diff --git a/compat_test.go b/compat_test.go index 110c34a2..6914d644 100644 --- a/compat_test.go +++ b/compat_test.go @@ -15,7 +15,7 @@ func TestMultitopicMessageCompatibility(t *testing.T) { From: []byte("A"), Data: []byte("blah"), Seqno: []byte("123"), - Topic: &topic1, + TopicRef: &pb.Message_Topic{Topic: topic1}, Signature: []byte("a-signature"), Key: []byte("a-key"), } diff --git a/extensions.go b/extensions.go index 3a2c3f11..bfdfd609 100644 --- a/extensions.go +++ b/extensions.go @@ -6,13 +6,18 @@ import ( ) type PeerExtensions struct { - TestExtension bool + TestExtension bool + TopicTableExtension *pubsub_pb.ExtTopicTable } type TestExtensionConfig struct { OnReceiveTestExtension func(from peer.ID) } +type TopicTableExtensionConfig struct { + TopicBundles [][]string +} + func WithTestExtension(c TestExtensionConfig) Option { return func(ps *PubSub) error { if rt, ok := ps.rt.(*GossipSubRouter); ok { @@ -26,6 +31,21 @@ func WithTestExtension(c TestExtensionConfig) Option { } } +func WithTopicTableExtension(c TopicTableExtensionConfig) Option { + return func(ps *PubSub) error { + if rt, ok := ps.rt.(*GossipSubRouter); ok { + e, err := newTopicTableExtension(c.TopicBundles) + if err != nil { + return err + } + + rt.extensions.myExtensions.TopicTableExtension = e.GetControlExtension() + rt.extensions.topicTableExtension = e + } + return nil + } +} + func hasPeerExtensions(rpc *RPC) bool { if rpc != nil && rpc.Control != nil && rpc.Control.Extensions != nil { return true @@ -37,18 +57,29 @@ func peerExtensionsFromRPC(rpc *RPC) PeerExtensions { out := PeerExtensions{} if hasPeerExtensions(rpc) { out.TestExtension = rpc.Control.Extensions.GetTestExtension() + out.TopicTableExtension = rpc.Control.Extensions.GetTopicTableExtension() } return out } -func (pe *PeerExtensions) ExtendRPC(rpc *RPC) *RPC { +func (pe *PeerExtensions) ExtendHelloRPC(rpc *RPC) *RPC { if pe.TestExtension { if rpc.Control == nil { rpc.Control = &pubsub_pb.ControlMessage{} } - rpc.Control.Extensions = &pubsub_pb.ControlExtensions{ - TestExtension: &pe.TestExtension, + if rpc.Control.Extensions == nil { + rpc.Control.Extensions = &pubsub_pb.ControlExtensions{} + } + rpc.Control.Extensions.TestExtension = &pe.TestExtension + } + if pe.TopicTableExtension != nil { + if rpc.Control == nil { + rpc.Control = &pubsub_pb.ControlMessage{} + } + if rpc.Control.Extensions == nil { + rpc.Control.Extensions = &pubsub_pb.ControlExtensions{} } + rpc.Control.Extensions.TopicTableExtension = pe.TopicTableExtension } return rpc } @@ -60,7 +91,8 @@ type extensionsState struct { reportMisbehavior func(peer.ID) sendRPC func(p peer.ID, r *RPC, urgent bool) - testExtension *testExtension + testExtension *testExtension + topicTableExtension *topicTableExtension } func newExtensionsState(myExtensions PeerExtensions, reportMisbehavior func(peer.ID), sendRPC func(peer.ID, *RPC, bool)) *extensionsState { @@ -70,7 +102,9 @@ func newExtensionsState(myExtensions PeerExtensions, reportMisbehavior func(peer sentExtensions: make(map[peer.ID]struct{}), reportMisbehavior: reportMisbehavior, sendRPC: sendRPC, - testExtension: nil, + + testExtension: nil, + topicTableExtension: nil, } } @@ -96,9 +130,16 @@ func (es *extensionsState) HandleRPC(rpc *RPC) { es.extensionsHandleRPC(rpc) } +func (es *extensionsState) InterceptRPC(rpc *RPC) *RPC { + if es.myExtensions.TopicTableExtension != nil && es.peerExtensions[rpc.from].TopicTableExtension != nil { + rpc = es.topicTableExtension.InterceptRPC(rpc) + } + return rpc +} + func (es *extensionsState) AddPeer(id peer.ID, helloPacket *RPC) *RPC { // Send our extensions as the first message. - helloPacket = es.myExtensions.ExtendRPC(helloPacket) + helloPacket = es.myExtensions.ExtendHelloRPC(helloPacket) es.sentExtensions[id] = struct{}{} if _, ok := es.peerExtensions[id]; ok { @@ -126,12 +167,34 @@ func (es *extensionsState) RemovePeer(id peer.ID) { } } +func (es *extensionsState) ExtendRPC(id peer.ID, rpc *RPC) *RPC { + if es.myExtensions.TopicTableExtension != nil && es.peerExtensions[id].TopicTableExtension != nil { + rpc = es.topicTableExtension.ExtendRPC(id, rpc) + } + return rpc +} + // extensionsAddPeer is only called once we've both sent and received the // extensions control message. func (es *extensionsState) extensionsAddPeer(id peer.ID) { if es.myExtensions.TestExtension && es.peerExtensions[id].TestExtension { es.testExtension.AddPeer(id) } + if es.myExtensions.TopicTableExtension != nil && es.peerExtensions[id].TopicTableExtension != nil { + hashSlices := es.peerExtensions[id].TopicTableExtension.GetTopicBundleHashes() + // Parsing the slices of bytes, to get a slice of TopicBundleHash + bundleHashes := make([]TopicBundleHash, len(hashSlices)) + for _, buf := range hashSlices { + hash, err := newTopicBundleHash(buf) + if err != nil { + // If there is an error parsing the hash, just quietly return + return + } + bundleHashes = append(bundleHashes, *hash) + } + // If there is an error adding a peer, just quietly skip it + _ = es.topicTableExtension.AddPeer(id, bundleHashes) + } } // extensionsRemovePeer is always called after extensionsAddPeer. diff --git a/floodsub.go b/floodsub.go index e8616b1c..051acae5 100644 --- a/floodsub.go +++ b/floodsub.go @@ -46,6 +46,10 @@ func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID, hello *RPC) *RPC return hello } +func (fs *FloodSubRouter) InterceptRPC(rpc *RPC) *RPC { + return rpc +} + func (fs *FloodSubRouter) RemovePeer(p peer.ID) { fs.tracer.RemovePeer(p) } diff --git a/fuzz_helpers_test.go b/fuzz_helpers_test.go index 0831deb6..184f3c6c 100644 --- a/fuzz_helpers_test.go +++ b/fuzz_helpers_test.go @@ -36,7 +36,7 @@ func generateSub(data []byte, limit int) *pb.RPC_SubOpts { subscribe := generateBool(&data) str := string(make([]byte, topicIDSize)) - return &pb.RPC_SubOpts{Subscribe: &subscribe, Topicid: &str} + return &pb.RPC_SubOpts{Subscribe: &subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: str}} } func generateControl(data []byte, limit int) *pb.ControlMessage { @@ -65,7 +65,7 @@ func generateControl(data []byte, limit int) *pb.ControlMessage { msgCount := int(generateU16(&data)) % limit topicSize := int(generateU16(&data)) % limit topic := string(make([]byte, topicSize)) - ctl.Ihave = append(ctl.Ihave, &pb.ControlIHave{TopicID: &topic}) + ctl.Ihave = append(ctl.Ihave, &pb.ControlIHave{TopicRef: &pb.ControlIHave_TopicID{TopicID: topic}}) ctl.Ihave[i].MessageIDs = make([]string, 0, msgCount) for j := 0; j < msgCount; j++ { @@ -81,7 +81,7 @@ func generateControl(data []byte, limit int) *pb.ControlMessage { for i := 0; i < numGraft; i++ { topicSize := int(generateU16(&data)) % limit topic := string(make([]byte, topicSize)) - ctl.Graft = append(ctl.Graft, &pb.ControlGraft{TopicID: &topic}) + ctl.Graft = append(ctl.Graft, &pb.ControlGraft{TopicRef: &pb.ControlGraft_TopicID{TopicID: topic}}) } if ctl.Size() > limit { return &pb.ControlMessage{} @@ -92,7 +92,7 @@ func generateControl(data []byte, limit int) *pb.ControlMessage { for i := 0; i < numPrune; i++ { topicSize := int(generateU16(&data)) % limit topic := string(make([]byte, topicSize)) - ctl.Prune = append(ctl.Prune, &pb.ControlPrune{TopicID: &topic}) + ctl.Prune = append(ctl.Prune, &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: topic}}) } if ctl.Size() > limit { return &pb.ControlMessage{} diff --git a/gossipsub.go b/gossipsub.go index c492ded9..c25c5a48 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -780,6 +780,13 @@ loop: return helloPacket } +func (gs *GossipSubRouter) InterceptRPC(rpc *RPC) *RPC { + if gs.feature(GossipSubFeatureExtensions, gs.peers[rpc.from]) { + return gs.extensions.InterceptRPC(rpc) + } + return rpc +} + func (gs *GossipSubRouter) RemovePeer(p peer.ID) { gs.logger.Debug("PEERDOWN: Remove disconnected peer", "peer", p) gs.tracer.RemovePeer(p) @@ -1463,7 +1470,7 @@ func (gs *GossipSubRouter) Leave(topic string) { } func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { - graft := []*pb.ControlGraft{{TopicID: &topic}} + graft := []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: topic}}} out := rpcWithControl(nil, nil, nil, graft, nil, nil) gs.sendRPC(p, out, false) } @@ -1532,6 +1539,10 @@ func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) { func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bool) { var err error + // let extensions modify the rpc before sending if the other peer supports extensions + if gs.feature(GossipSubFeatureExtensions, gs.peers[p]) { + rpc = gs.extensions.ExtendRPC(p, rpc) + } if urgent { err = q.UrgentPush(rpc, false) } else { @@ -1925,7 +1936,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, // topic here changes with every // iteration of the slice. copiedID := topic - graft = append(graft, &pb.ControlGraft{TopicID: &copiedID}) + graft = append(graft, &pb.ControlGraft{TopicRef: &pb.ControlGraft_TopicID{TopicID: copiedID}}) } var prune []*pb.ControlPrune @@ -2007,7 +2018,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{} shuffleStrings(mids) copy(peerMids, mids) } - gs.enqueueGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: peerMids}) + gs.enqueueGossip(p, &pb.ControlIHave{TopicRef: &pb.ControlIHave_TopicID{TopicID: topic}, MessageIDs: peerMids}) } } @@ -2104,7 +2115,7 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsubscribe bool) *pb.ControlPrune { if !gs.feature(GossipSubFeaturePX, gs.peers[p]) { // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway - return &pb.ControlPrune{TopicID: &topic} + return &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: topic}} } backoff := uint64(gs.params.PruneBackoff / time.Second) @@ -2130,7 +2141,7 @@ func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsub } } - return &pb.ControlPrune{TopicID: &topic, Peers: px, Backoff: &backoff} + return &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: topic}, Peers: px, Backoff: &backoff} } func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID { diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index e42b8859..64aad80f 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -90,8 +90,8 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -194,8 +194,8 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) sub := sub @@ -209,7 +209,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { // Send a bunch of IHAVEs for i := 0; i < 3*GossipSubMaxIHaveLength; i++ { ihavelst := []string{"someid" + strconv.Itoa(i)} - ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}} + ihave := []*pb.ControlIHave{{TopicRef: &pb.ControlIHave_TopicID{TopicID: sub.GetTopicid()}, MessageIDs: ihavelst}} orpc := rpcWithControl(nil, ihave, nil, nil, nil, nil) writeMsg(&orpc.RPC) } @@ -239,7 +239,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { // Send a bunch of IHAVEs for i := 0; i < 3*GossipSubMaxIHaveLength; i++ { ihavelst := []string{"someid" + strconv.Itoa(i+100)} - ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}} + ihave := []*pb.ControlIHave{{TopicRef: &pb.ControlIHave_TopicID{TopicID: sub.GetTopicid()}, MessageIDs: ihavelst}} orpc := rpcWithControl(nil, ihave, nil, nil, nil, nil) writeMsg(&orpc.RPC) } @@ -329,14 +329,14 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) // Graft to the peer on a non-existent topic nonExistentTopic := "non-existent" writeMsg(&pb.RPC{ - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: &nonExistentTopic}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: nonExistentTopic}}}}, }) go func() { @@ -428,9 +428,9 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { for _, sub := range irpc.GetSubscriptions() { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer - graft := []*pb.ControlGraft{{TopicID: sub.Topicid}} + graft := []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}} writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, Control: &pb.ControlMessage{Graft: graft}, }) @@ -452,7 +452,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { // Send a PRUNE to remove the attacker node from the legit // host's mesh var prune []*pb.ControlPrune - prune = append(prune, &pb.ControlPrune{TopicID: sub.Topicid}) + prune = append(prune, &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: sub.GetTopicid()}}) writeMsg(&pb.RPC{ Control: &pb.ControlMessage{Prune: prune}, }) @@ -703,8 +703,8 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -720,10 +720,10 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) { // fail validation and reduce the attacker's score) for i := 0; i < 100; i++ { msg := &pb.Message{ - Data: []byte("some data" + strconv.Itoa(i)), - Topic: &mytopic, - From: []byte(attacker.ID()), - Seqno: []byte{byte(i + 1)}, + Data: []byte("some data" + strconv.Itoa(i)), + TopicRef: &pb.Message_Topic{Topic: mytopic}, + From: []byte(attacker.ID()), + Seqno: []byte{byte(i + 1)}, } writeMsg(&pb.RPC{ Publish: []*pb.Message{msg}, @@ -836,8 +836,8 @@ func TestGossipsubAttackSpamIDONTWANT(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the middle peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { diff --git a/gossipsub_test.go b/gossipsub_test.go index 0fca9d34..b5eeb744 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -1772,8 +1772,8 @@ func TestGossipsubPiggybackControl(t *testing.T) { rpc := &RPC{RPC: pb.RPC{}} gs.piggybackControl(blah, rpc, &pb.ControlMessage{ - Graft: []*pb.ControlGraft{{TopicID: &test1}, {TopicID: &test2}, {TopicID: &test3}}, - Prune: []*pb.ControlPrune{{TopicID: &test1}, {TopicID: &test2}, {TopicID: &test3}}, + Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: test1}}, {TopicRef: &pb.ControlGraft_TopicID{TopicID: test2}}, {TopicRef: &pb.ControlGraft_TopicID{TopicID: test3}}}, + Prune: []*pb.ControlPrune{{TopicRef: &pb.ControlPrune_TopicID{TopicID: test1}}, {TopicRef: &pb.ControlPrune_TopicID{TopicID: test2}}, {TopicRef: &pb.ControlPrune_TopicID{TopicID: test3}}}, }) res <- rpc } @@ -2132,7 +2132,7 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { w := protoio.NewDelimitedWriter(os) truth := true topic := "test" - err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, Topicid: &topic}}}) + err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic}}}}) if err != nil { if !sq.ignoreErrors { panic(err) @@ -2365,7 +2365,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { w := protoio.NewDelimitedWriter(os) truth := true topic := "test" - err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, Topicid: &topic}}}) + err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic}}}}) if err != nil { panic(err) } @@ -2398,7 +2398,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { // send a PRUNE for all grafts, so we don't get direct message deliveries var prunes []*pb.ControlPrune for _, graft := range rpc.Control.Graft { - prunes = append(prunes, &pb.ControlPrune{TopicID: graft.TopicID}) + prunes = append(prunes, &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: graft.GetTopicID()}}) } var iwants []*pb.ControlIWant @@ -2485,7 +2485,7 @@ func TestFragmentRPCFunction(t *testing.T) { rpc.Subscriptions = []*pb.RPC_SubOpts{ { Subscribe: &truth, - Topicid: &topic, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic}, }, } rpc.Publish = make([]*pb.Message, nMessages) @@ -2519,8 +2519,8 @@ func TestFragmentRPCFunction(t *testing.T) { // the control messages should be in a separate RPC at the end // reuse RPC from prev test, but add a control message rpc.Control = &pb.ControlMessage{ - Graft: []*pb.ControlGraft{{TopicID: &topic}}, - Prune: []*pb.ControlPrune{{TopicID: &topic}}, + Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: topic}}}, + Prune: []*pb.ControlPrune{{TopicRef: &pb.ControlPrune_TopicID{TopicID: topic}}}, Ihave: []*pb.ControlIHave{{MessageIDs: []string{"foo"}}}, Iwant: []*pb.ControlIWant{{MessageIDs: []string{"bar"}}}, } @@ -2707,8 +2707,8 @@ func (c *compressedRPC) append(rpc *pb.RPC) { c.iwant = slices.DeleteFunc(c.iwant, func(e string) bool { return len(e) == 0 }) } for _, ihave := range ctrl.Ihave { - c.ihave[*ihave.TopicID] = append(c.ihave[*ihave.TopicID], ihave.MessageIDs...) - c.ihave[*ihave.TopicID] = slices.DeleteFunc(c.ihave[*ihave.TopicID], func(e string) bool { return len(e) == 0 }) + c.ihave[ihave.GetTopicID()] = append(c.ihave[ihave.GetTopicID()], ihave.MessageIDs...) + c.ihave[ihave.GetTopicID()] = slices.DeleteFunc(c.ihave[ihave.GetTopicID()], func(e string) bool { return len(e) == 0 }) } for _, idontwant := range ctrl.Idontwant { c.idontwant = append(c.idontwant, idontwant.MessageIDs...) @@ -2722,7 +2722,7 @@ func (c *compressedRPC) append(rpc *pb.RPC) { c.prune = append(c.prune, d) } for _, graft := range ctrl.Graft { - c.graft = append(c.graft, *graft.TopicID) + c.graft = append(c.graft, graft.GetTopicID()) c.graft = slices.DeleteFunc(c.graft, func(e string) bool { return len(e) == 0 }) } } @@ -2860,8 +2860,8 @@ func TestGossipsubIdontwantSend(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the middle peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -2954,8 +2954,8 @@ func TestGossipsubIdontwantReceive(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the middle peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -3183,8 +3183,8 @@ func TestGossipsubIdontwantNonMesh(t *testing.T) { // Reply by subcribing to the topic and pruning to the middle peer to make sure // that it's not in the mesh writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicRef: &pb.ControlPrune_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -3271,8 +3271,8 @@ func TestGossipsubIdontwantIncompat(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the middle peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -3359,8 +3359,8 @@ func TestGossipsubIdontwantSmallMessage(t *testing.T) { // Reply by subcribing to the topic and pruning to the middle peer to make sure // that it's not in the mesh writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -3464,8 +3464,8 @@ func TestGossipsubIdontwantBeforeIwant(t *testing.T) { // Reply by subcribing to the topic and pruning to the middle peer to make sure // that it's not in the mesh writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicRef: &pb.ControlPrune_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -3549,8 +3549,8 @@ func TestGossipsubIdontwantClear(t *testing.T) { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the middle peer writeMsg(&pb.RPC{ - Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, - Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}}, }) go func() { @@ -4394,3 +4394,118 @@ func TestTestExtension(t *testing.T) { t.Fatal("TestExtension not received") } } + +func TestTopicTableExtension(t *testing.T) { + hosts := getDefaultHosts(t, 2) + topicName := "foobar" + c := TopicTableExtensionConfig{ + TopicBundles: [][]string{{topicName}}, + } + psub := getGossipsub(context.Background(), hosts[0], WithTopicTableExtension(c)) + topic, err := psub.Join(topicName) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + psub1 := newSkeletonGossipsub(ctx, hosts[1]) + e1, err := newTopicTableExtension(c.TopicBundles) + if err != nil { + t.Fatal(err) + } + + connect(t, hosts[0], hosts[1]) + + const timeout = 3 * time.Second + select { + case <-time.After(timeout): + t.Fatal("Timeout") + case r := <-psub1.outRPC: + if r.GetControl().GetExtensions().GetTopicTableExtension() == nil { + t.Fatal("Unexpected RPC. First RPC should contain the Topic Table Extension Control Message") + } + } + + // hello packet + truth := true + psub1.inRPC <- &pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topicName}}}, + Control: &pb.ControlMessage{ + Extensions: &pb.ControlExtensions{ + TopicTableExtension: e1.GetControlExtension(), + }, + }, + } + + // wait for the first host to build the topic table + time.Sleep(time.Second * 1) + + // publish a message + testMsg := []byte("test-message") + topic.Publish(ctx, testMsg) + + // make sure that the topic ref in the message is an inex + select { + case <-time.After(timeout): + t.Fatal("Timeout") + case r := <-psub1.outRPC: + msgs := r.GetPublish() + if len(msgs) != 1 { + t.Fatal("one published message is expected") + } + msg := msgs[0] + + if _, ok := msg.GetTopicRef().(*pb.Message_TopicIndex); !ok { + t.Fatal("the topic index is expected") + } + if !bytes.Equal(msg.GetData(), testMsg) { + t.Fatal("got wrong data") + } + } +} + +func TestTopicTableExtensionDense(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 20) + topicName := "foobar" + + c := TopicTableExtensionConfig{ + TopicBundles: [][]string{{topicName}}, + } + psubs := getGossipsubs(ctx, hosts, WithTopicTableExtension(c)) + + var msgs []*Subscription + for _, ps := range psubs { + subch, err := ps.Subscribe(topicName) + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := mrand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} diff --git a/mcache_test.go b/mcache_test.go index 93bcfdc6..6326cb3f 100644 --- a/mcache_test.go +++ b/mcache_test.go @@ -159,9 +159,9 @@ func makeTestMessage(n int) *pb.Message { data := []byte(fmt.Sprintf("%d", n)) topic := "test" return &pb.Message{ - Data: data, - Topic: &topic, - From: []byte("test"), - Seqno: seqno, + Data: data, + TopicRef: &pb.Message_Topic{Topic: topic}, + From: []byte("test"), + Seqno: seqno, } } diff --git a/pb/rpc.pb.go b/pb/rpc.pb.go index cd05d8eb..b4d06dc2 100644 --- a/pb/rpc.pb.go +++ b/pb/rpc.pb.go @@ -97,11 +97,15 @@ func (m *RPC) GetTestExtension() *TestExtension { } type RPC_SubOpts struct { - Subscribe *bool `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"` - Topicid *string `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Subscribe *bool `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"` + // Types that are valid to be assigned to TopicRef: + // + // *RPC_SubOpts_Topicid + // *RPC_SubOpts_TopicIndex + TopicRef isRPC_SubOpts_TopicRef `protobuf_oneof:"topicRef"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *RPC_SubOpts) Reset() { *m = RPC_SubOpts{} } @@ -137,6 +141,29 @@ func (m *RPC_SubOpts) XXX_DiscardUnknown() { var xxx_messageInfo_RPC_SubOpts proto.InternalMessageInfo +type isRPC_SubOpts_TopicRef interface { + isRPC_SubOpts_TopicRef() + MarshalTo([]byte) (int, error) + Size() int +} + +type RPC_SubOpts_Topicid struct { + Topicid string `protobuf:"bytes,2,opt,name=topicid,oneof" json:"topicid,omitempty"` +} +type RPC_SubOpts_TopicIndex struct { + TopicIndex uint32 `protobuf:"varint,3,opt,name=topicIndex,oneof" json:"topicIndex,omitempty"` +} + +func (*RPC_SubOpts_Topicid) isRPC_SubOpts_TopicRef() {} +func (*RPC_SubOpts_TopicIndex) isRPC_SubOpts_TopicRef() {} + +func (m *RPC_SubOpts) GetTopicRef() isRPC_SubOpts_TopicRef { + if m != nil { + return m.TopicRef + } + return nil +} + func (m *RPC_SubOpts) GetSubscribe() bool { if m != nil && m.Subscribe != nil { return *m.Subscribe @@ -145,22 +172,41 @@ func (m *RPC_SubOpts) GetSubscribe() bool { } func (m *RPC_SubOpts) GetTopicid() string { - if m != nil && m.Topicid != nil { - return *m.Topicid + if x, ok := m.GetTopicRef().(*RPC_SubOpts_Topicid); ok { + return x.Topicid } return "" } +func (m *RPC_SubOpts) GetTopicIndex() uint32 { + if x, ok := m.GetTopicRef().(*RPC_SubOpts_TopicIndex); ok { + return x.TopicIndex + } + return 0 +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*RPC_SubOpts) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*RPC_SubOpts_Topicid)(nil), + (*RPC_SubOpts_TopicIndex)(nil), + } +} + type Message struct { - From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` - Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"` - Topic *string `protobuf:"bytes,4,opt,name=topic" json:"topic,omitempty"` - Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"` - Key []byte `protobuf:"bytes,6,opt,name=key" json:"key,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` + Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"` + // Types that are valid to be assigned to TopicRef: + // + // *Message_Topic + // *Message_TopicIndex + TopicRef isMessage_TopicRef `protobuf_oneof:"topicRef"` + Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"` + Key []byte `protobuf:"bytes,6,opt,name=key" json:"key,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Message) Reset() { *m = Message{} } @@ -196,6 +242,29 @@ func (m *Message) XXX_DiscardUnknown() { var xxx_messageInfo_Message proto.InternalMessageInfo +type isMessage_TopicRef interface { + isMessage_TopicRef() + MarshalTo([]byte) (int, error) + Size() int +} + +type Message_Topic struct { + Topic string `protobuf:"bytes,4,opt,name=topic,oneof" json:"topic,omitempty"` +} +type Message_TopicIndex struct { + TopicIndex uint32 `protobuf:"varint,7,opt,name=topicIndex,oneof" json:"topicIndex,omitempty"` +} + +func (*Message_Topic) isMessage_TopicRef() {} +func (*Message_TopicIndex) isMessage_TopicRef() {} + +func (m *Message) GetTopicRef() isMessage_TopicRef { + if m != nil { + return m.TopicRef + } + return nil +} + func (m *Message) GetFrom() []byte { if m != nil { return m.From @@ -218,12 +287,19 @@ func (m *Message) GetSeqno() []byte { } func (m *Message) GetTopic() string { - if m != nil && m.Topic != nil { - return *m.Topic + if x, ok := m.GetTopicRef().(*Message_Topic); ok { + return x.Topic } return "" } +func (m *Message) GetTopicIndex() uint32 { + if x, ok := m.GetTopicRef().(*Message_TopicIndex); ok { + return x.TopicIndex + } + return 0 +} + func (m *Message) GetSignature() []byte { if m != nil { return m.Signature @@ -238,6 +314,14 @@ func (m *Message) GetKey() []byte { return nil } +// XXX_OneofWrappers is for the internal use of the proto package. +func (*Message) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*Message_Topic)(nil), + (*Message_TopicIndex)(nil), + } +} + type ControlMessage struct { Ihave []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"` Iwant []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"` @@ -326,7 +410,11 @@ func (m *ControlMessage) GetExtensions() *ControlExtensions { } type ControlIHave struct { - TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` + // Types that are valid to be assigned to TopicRef: + // + // *ControlIHave_TopicID + // *ControlIHave_TopicIndex + TopicRef isControlIHave_TopicRef `protobuf_oneof:"topicRef"` // implementors from other languages should use bytes here - go protobuf emits invalid utf8 strings MessageIDs []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -367,13 +455,43 @@ func (m *ControlIHave) XXX_DiscardUnknown() { var xxx_messageInfo_ControlIHave proto.InternalMessageInfo +type isControlIHave_TopicRef interface { + isControlIHave_TopicRef() + MarshalTo([]byte) (int, error) + Size() int +} + +type ControlIHave_TopicID struct { + TopicID string `protobuf:"bytes,1,opt,name=topicID,oneof" json:"topicID,omitempty"` +} +type ControlIHave_TopicIndex struct { + TopicIndex uint32 `protobuf:"varint,3,opt,name=topicIndex,oneof" json:"topicIndex,omitempty"` +} + +func (*ControlIHave_TopicID) isControlIHave_TopicRef() {} +func (*ControlIHave_TopicIndex) isControlIHave_TopicRef() {} + +func (m *ControlIHave) GetTopicRef() isControlIHave_TopicRef { + if m != nil { + return m.TopicRef + } + return nil +} + func (m *ControlIHave) GetTopicID() string { - if m != nil && m.TopicID != nil { - return *m.TopicID + if x, ok := m.GetTopicRef().(*ControlIHave_TopicID); ok { + return x.TopicID } return "" } +func (m *ControlIHave) GetTopicIndex() uint32 { + if x, ok := m.GetTopicRef().(*ControlIHave_TopicIndex); ok { + return x.TopicIndex + } + return 0 +} + func (m *ControlIHave) GetMessageIDs() []string { if m != nil { return m.MessageIDs @@ -381,6 +499,14 @@ func (m *ControlIHave) GetMessageIDs() []string { return nil } +// XXX_OneofWrappers is for the internal use of the proto package. +func (*ControlIHave) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*ControlIHave_TopicID)(nil), + (*ControlIHave_TopicIndex)(nil), + } +} + type ControlIWant struct { // implementors from other languages should use bytes here - go protobuf emits invalid utf8 strings MessageIDs []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"` @@ -430,10 +556,14 @@ func (m *ControlIWant) GetMessageIDs() []string { } type ControlGraft struct { - TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // Types that are valid to be assigned to TopicRef: + // + // *ControlGraft_TopicID + // *ControlGraft_TopicIndex + TopicRef isControlGraft_TopicRef `protobuf_oneof:"topicRef"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ControlGraft) Reset() { *m = ControlGraft{} } @@ -469,20 +599,62 @@ func (m *ControlGraft) XXX_DiscardUnknown() { var xxx_messageInfo_ControlGraft proto.InternalMessageInfo +type isControlGraft_TopicRef interface { + isControlGraft_TopicRef() + MarshalTo([]byte) (int, error) + Size() int +} + +type ControlGraft_TopicID struct { + TopicID string `protobuf:"bytes,1,opt,name=topicID,oneof" json:"topicID,omitempty"` +} +type ControlGraft_TopicIndex struct { + TopicIndex uint32 `protobuf:"varint,2,opt,name=topicIndex,oneof" json:"topicIndex,omitempty"` +} + +func (*ControlGraft_TopicID) isControlGraft_TopicRef() {} +func (*ControlGraft_TopicIndex) isControlGraft_TopicRef() {} + +func (m *ControlGraft) GetTopicRef() isControlGraft_TopicRef { + if m != nil { + return m.TopicRef + } + return nil +} + func (m *ControlGraft) GetTopicID() string { - if m != nil && m.TopicID != nil { - return *m.TopicID + if x, ok := m.GetTopicRef().(*ControlGraft_TopicID); ok { + return x.TopicID } return "" } +func (m *ControlGraft) GetTopicIndex() uint32 { + if x, ok := m.GetTopicRef().(*ControlGraft_TopicIndex); ok { + return x.TopicIndex + } + return 0 +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*ControlGraft) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*ControlGraft_TopicID)(nil), + (*ControlGraft_TopicIndex)(nil), + } +} + type ControlPrune struct { - TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"` - Peers []*PeerInfo `protobuf:"bytes,2,rep,name=peers" json:"peers,omitempty"` - Backoff *uint64 `protobuf:"varint,3,opt,name=backoff" json:"backoff,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // Types that are valid to be assigned to TopicRef: + // + // *ControlPrune_TopicID + // *ControlPrune_TopicIndex + TopicRef isControlPrune_TopicRef `protobuf_oneof:"topicRef"` + Peers []*PeerInfo `protobuf:"bytes,2,rep,name=peers" json:"peers,omitempty"` + Backoff *uint64 `protobuf:"varint,3,opt,name=backoff" json:"backoff,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ControlPrune) Reset() { *m = ControlPrune{} } @@ -518,13 +690,43 @@ func (m *ControlPrune) XXX_DiscardUnknown() { var xxx_messageInfo_ControlPrune proto.InternalMessageInfo +type isControlPrune_TopicRef interface { + isControlPrune_TopicRef() + MarshalTo([]byte) (int, error) + Size() int +} + +type ControlPrune_TopicID struct { + TopicID string `protobuf:"bytes,1,opt,name=topicID,oneof" json:"topicID,omitempty"` +} +type ControlPrune_TopicIndex struct { + TopicIndex uint32 `protobuf:"varint,4,opt,name=topicIndex,oneof" json:"topicIndex,omitempty"` +} + +func (*ControlPrune_TopicID) isControlPrune_TopicRef() {} +func (*ControlPrune_TopicIndex) isControlPrune_TopicRef() {} + +func (m *ControlPrune) GetTopicRef() isControlPrune_TopicRef { + if m != nil { + return m.TopicRef + } + return nil +} + func (m *ControlPrune) GetTopicID() string { - if m != nil && m.TopicID != nil { - return *m.TopicID + if x, ok := m.GetTopicRef().(*ControlPrune_TopicID); ok { + return x.TopicID } return "" } +func (m *ControlPrune) GetTopicIndex() uint32 { + if x, ok := m.GetTopicRef().(*ControlPrune_TopicIndex); ok { + return x.TopicIndex + } + return 0 +} + func (m *ControlPrune) GetPeers() []*PeerInfo { if m != nil { return m.Peers @@ -539,6 +741,14 @@ func (m *ControlPrune) GetBackoff() uint64 { return 0 } +// XXX_OneofWrappers is for the internal use of the proto package. +func (*ControlPrune) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*ControlPrune_TopicID)(nil), + (*ControlPrune_TopicIndex)(nil), + } +} + type ControlIDontWant struct { // implementors from other languages should use bytes here - go protobuf emits invalid utf8 strings MessageIDs []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"` @@ -590,10 +800,11 @@ func (m *ControlIDontWant) GetMessageIDs() []string { type ControlExtensions struct { // Experimental extensions must use field numbers larger than 0x200000 to be // encoded with 4 bytes - TestExtension *bool `protobuf:"varint,6492434,opt,name=testExtension" json:"testExtension,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + TestExtension *bool `protobuf:"varint,6492434,opt,name=testExtension" json:"testExtension,omitempty"` + TopicTableExtension *ExtTopicTable `protobuf:"bytes,4820938,opt,name=topicTableExtension" json:"topicTableExtension,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ControlExtensions) Reset() { *m = ControlExtensions{} } @@ -636,6 +847,60 @@ func (m *ControlExtensions) GetTestExtension() bool { return false } +func (m *ControlExtensions) GetTopicTableExtension() *ExtTopicTable { + if m != nil { + return m.TopicTableExtension + } + return nil +} + +type ExtTopicTable struct { + TopicBundleHashes [][]byte `protobuf:"bytes,1,rep,name=topicBundleHashes" json:"topicBundleHashes,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ExtTopicTable) Reset() { *m = ExtTopicTable{} } +func (m *ExtTopicTable) String() string { return proto.CompactTextString(m) } +func (*ExtTopicTable) ProtoMessage() {} +func (*ExtTopicTable) Descriptor() ([]byte, []int) { + return fileDescriptor_77a6da22d6a3feb1, []int{9} +} +func (m *ExtTopicTable) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ExtTopicTable) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ExtTopicTable.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ExtTopicTable) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExtTopicTable.Merge(m, src) +} +func (m *ExtTopicTable) XXX_Size() int { + return m.Size() +} +func (m *ExtTopicTable) XXX_DiscardUnknown() { + xxx_messageInfo_ExtTopicTable.DiscardUnknown(m) +} + +var xxx_messageInfo_ExtTopicTable proto.InternalMessageInfo + +func (m *ExtTopicTable) GetTopicBundleHashes() [][]byte { + if m != nil { + return m.TopicBundleHashes + } + return nil +} + type PeerInfo struct { PeerID []byte `protobuf:"bytes,1,opt,name=peerID" json:"peerID,omitempty"` SignedPeerRecord []byte `protobuf:"bytes,2,opt,name=signedPeerRecord" json:"signedPeerRecord,omitempty"` @@ -648,7 +913,7 @@ func (m *PeerInfo) Reset() { *m = PeerInfo{} } func (m *PeerInfo) String() string { return proto.CompactTextString(m) } func (*PeerInfo) ProtoMessage() {} func (*PeerInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{9} + return fileDescriptor_77a6da22d6a3feb1, []int{10} } func (m *PeerInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -701,7 +966,7 @@ func (m *TestExtension) Reset() { *m = TestExtension{} } func (m *TestExtension) String() string { return proto.CompactTextString(m) } func (*TestExtension) ProtoMessage() {} func (*TestExtension) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{10} + return fileDescriptor_77a6da22d6a3feb1, []int{11} } func (m *TestExtension) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -741,6 +1006,7 @@ func init() { proto.RegisterType((*ControlPrune)(nil), "pubsub.pb.ControlPrune") proto.RegisterType((*ControlIDontWant)(nil), "pubsub.pb.ControlIDontWant") proto.RegisterType((*ControlExtensions)(nil), "pubsub.pb.ControlExtensions") + proto.RegisterType((*ExtTopicTable)(nil), "pubsub.pb.ExtTopicTable") proto.RegisterType((*PeerInfo)(nil), "pubsub.pb.PeerInfo") proto.RegisterType((*TestExtension)(nil), "pubsub.pb.TestExtension") } @@ -748,44 +1014,51 @@ func init() { func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) } var fileDescriptor_77a6da22d6a3feb1 = []byte{ - // 583 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x94, 0xcd, 0x8e, 0xd3, 0x3e, - 0x14, 0xc5, 0x95, 0x7e, 0x4c, 0x9b, 0xdb, 0xf4, 0xff, 0x2f, 0x06, 0x0d, 0x06, 0x46, 0x55, 0x95, - 0x0d, 0x05, 0x41, 0x16, 0x65, 0x85, 0xd4, 0xcd, 0xd0, 0x22, 0xa6, 0x0b, 0xa0, 0x32, 0x48, 0xac, - 0x93, 0xd4, 0xed, 0x44, 0x33, 0xb5, 0x83, 0xed, 0x0c, 0xf0, 0x0e, 0xb0, 0xe1, 0x11, 0x58, 0xf3, - 0x1a, 0x48, 0x2c, 0x79, 0x04, 0xd4, 0x27, 0x41, 0x76, 0x3e, 0x9a, 0x36, 0x53, 0xd8, 0xd9, 0xd7, - 0xbf, 0xe3, 0x1c, 0x9f, 0x6b, 0x07, 0x6c, 0x11, 0x87, 0x5e, 0x2c, 0xb8, 0xe2, 0xc8, 0x8e, 0x93, - 0x40, 0x26, 0x81, 0x17, 0x07, 0xee, 0xf7, 0x1a, 0xd4, 0xc9, 0x7c, 0x82, 0xc6, 0xd0, 0x95, 0x49, - 0x20, 0x43, 0x11, 0xc5, 0x2a, 0xe2, 0x4c, 0x62, 0x6b, 0x50, 0x1f, 0x76, 0x46, 0xc7, 0x5e, 0x81, - 0x7a, 0x64, 0x3e, 0xf1, 0xde, 0x24, 0xc1, 0xeb, 0x58, 0x49, 0xb2, 0x0b, 0xa3, 0x47, 0xd0, 0x8a, - 0x93, 0xe0, 0x32, 0x92, 0xe7, 0xb8, 0x66, 0x74, 0xa8, 0xa4, 0x7b, 0x49, 0xa5, 0xf4, 0x57, 0x94, - 0xe4, 0x08, 0x7a, 0x02, 0xad, 0x90, 0x33, 0x25, 0xf8, 0x25, 0xae, 0x0f, 0xac, 0x61, 0x67, 0x74, - 0xa7, 0x44, 0x4f, 0xd2, 0x95, 0x42, 0x94, 0x91, 0xe8, 0x14, 0xba, 0x8a, 0x4a, 0xf5, 0xfc, 0xa3, - 0xa2, 0x4c, 0x46, 0x9c, 0xe1, 0xaf, 0xdf, 0x3e, 0xa7, 0x6a, 0x5c, 0x52, 0xbf, 0x2d, 0x23, 0x64, - 0x57, 0x71, 0xf7, 0x14, 0x5a, 0x99, 0x7f, 0x74, 0x02, 0x76, 0x76, 0x82, 0x80, 0x62, 0x6b, 0x60, - 0x0d, 0xdb, 0x64, 0x5b, 0x40, 0x18, 0x5a, 0x8a, 0xc7, 0x51, 0x18, 0x2d, 0x70, 0x6d, 0x60, 0x0d, - 0x6d, 0x92, 0x4f, 0xdd, 0x2f, 0x16, 0xb4, 0x32, 0x6b, 0x08, 0x41, 0x63, 0x29, 0xf8, 0xda, 0xc8, - 0x1d, 0x62, 0xc6, 0xba, 0xb6, 0xf0, 0x95, 0x6f, 0x64, 0x0e, 0x31, 0x63, 0x74, 0x0b, 0x9a, 0x92, - 0xbe, 0x67, 0xdc, 0x1c, 0xd6, 0x21, 0xe9, 0x44, 0x57, 0xcd, 0xa6, 0xb8, 0x61, 0xbe, 0x90, 0x4e, - 0x8c, 0xaf, 0x68, 0xc5, 0x7c, 0x95, 0x08, 0x8a, 0x9b, 0x86, 0xdf, 0x16, 0x50, 0x0f, 0xea, 0x17, - 0xf4, 0x13, 0x3e, 0x32, 0x75, 0x3d, 0x74, 0x7f, 0xd4, 0xe0, 0xbf, 0xdd, 0xc4, 0xd0, 0x63, 0x68, - 0x46, 0xe7, 0xfe, 0x15, 0xcd, 0x3a, 0x78, 0xbb, 0x9a, 0xed, 0xec, 0xcc, 0xbf, 0xa2, 0x24, 0xa5, - 0x0c, 0xfe, 0xc1, 0x67, 0x2a, 0x6b, 0xdc, 0x75, 0xf8, 0x3b, 0x9f, 0x29, 0x92, 0x52, 0x1a, 0x5f, - 0x09, 0x7f, 0xa9, 0x70, 0xfd, 0x10, 0xfe, 0x42, 0x2f, 0x93, 0x94, 0xd2, 0x78, 0x2c, 0x12, 0x46, - 0x71, 0xe3, 0x10, 0x3e, 0xd7, 0xcb, 0x24, 0xa5, 0xd0, 0x53, 0xb0, 0xa3, 0x05, 0x67, 0xca, 0x18, - 0x6a, 0x1a, 0xc9, 0xbd, 0x6b, 0x0c, 0x4d, 0x39, 0x53, 0xc6, 0xd4, 0x96, 0x46, 0x63, 0x00, 0x9a, - 0x77, 0x5a, 0x9a, 0x88, 0x3a, 0xa3, 0x93, 0xaa, 0xb6, 0xb8, 0x0d, 0x92, 0x94, 0x78, 0xf7, 0x0c, - 0x9c, 0x72, 0x38, 0xc5, 0x0d, 0x98, 0x4d, 0x4d, 0x7b, 0xf3, 0x1b, 0x30, 0x9b, 0xa2, 0x3e, 0xc0, - 0x3a, 0x4d, 0x7a, 0x36, 0x95, 0x26, 0x34, 0x9b, 0x94, 0x2a, 0xae, 0xb7, 0xdd, 0x49, 0x5b, 0xdc, - 0xe3, 0xad, 0x0a, 0x3f, 0x2c, 0x78, 0x13, 0xdc, 0xe1, 0x2f, 0xbb, 0xeb, 0x82, 0x34, 0x99, 0xfd, - 0xc5, 0xe3, 0x03, 0x68, 0xc6, 0x94, 0x0a, 0x99, 0xf5, 0xf4, 0x66, 0x29, 0x86, 0x39, 0xa5, 0x62, - 0xc6, 0x96, 0x9c, 0xa4, 0x84, 0xde, 0x24, 0xf0, 0xc3, 0x0b, 0xbe, 0x5c, 0x9a, 0xeb, 0xd9, 0x20, - 0xf9, 0xd4, 0x1d, 0x41, 0x6f, 0x3f, 0xef, 0x7f, 0x1e, 0x66, 0x0c, 0x37, 0x2a, 0x39, 0xa3, 0xfb, - 0x07, 0x5e, 0x6e, 0x7b, 0xef, 0x7d, 0xba, 0xaf, 0xa0, 0x9d, 0xdb, 0x43, 0xc7, 0x70, 0xa4, 0x0d, - 0x66, 0x67, 0x73, 0x48, 0x36, 0x43, 0x0f, 0xa1, 0xa7, 0xdf, 0x03, 0x5d, 0x68, 0x92, 0xd0, 0x90, - 0x8b, 0x45, 0xf6, 0xd8, 0x2a, 0x75, 0xf7, 0x7f, 0xe8, 0xee, 0xfc, 0x0f, 0x9e, 0x39, 0x3f, 0x37, - 0x7d, 0xeb, 0xd7, 0xa6, 0x6f, 0xfd, 0xde, 0xf4, 0xad, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xa2, - 0x64, 0xfc, 0x1b, 0x11, 0x05, 0x00, 0x00, + // 693 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x94, 0xcf, 0x6e, 0xd3, 0x4e, + 0x10, 0xc7, 0xeb, 0xfc, 0xa9, 0x93, 0xa9, 0xf3, 0xfb, 0xb5, 0x5b, 0x54, 0x4c, 0xa9, 0xa2, 0xc8, + 0x17, 0x02, 0x2a, 0x39, 0x84, 0x13, 0x52, 0x39, 0xd0, 0xa6, 0x22, 0x39, 0x14, 0xa2, 0xa5, 0x12, + 0x5c, 0xed, 0x78, 0xd3, 0x5a, 0x4d, 0x77, 0x5d, 0xef, 0xba, 0x84, 0x67, 0x80, 0x13, 0x77, 0x84, + 0xc4, 0x81, 0xb7, 0xe0, 0x88, 0x84, 0x38, 0xf1, 0x00, 0x1c, 0x50, 0x9f, 0x04, 0x79, 0xec, 0x38, + 0x76, 0x9c, 0xaa, 0xea, 0xcd, 0x3b, 0xfb, 0xf9, 0xee, 0x7e, 0x77, 0x66, 0x3c, 0x50, 0x0f, 0xfc, + 0x51, 0xc7, 0x0f, 0x84, 0x12, 0xa4, 0xee, 0x87, 0x8e, 0x0c, 0x9d, 0x8e, 0xef, 0x58, 0x7f, 0x4a, + 0x50, 0xa6, 0xc3, 0x03, 0xb2, 0x07, 0x0d, 0x19, 0x3a, 0x72, 0x14, 0x78, 0xbe, 0xf2, 0x04, 0x97, + 0xa6, 0xd6, 0x2a, 0xb7, 0xd7, 0xba, 0x5b, 0x9d, 0x14, 0xed, 0xd0, 0xe1, 0x41, 0xe7, 0x75, 0xe8, + 0xbc, 0xf2, 0x95, 0xa4, 0x79, 0x98, 0xec, 0x82, 0xee, 0x87, 0xce, 0xc4, 0x93, 0xa7, 0x66, 0x09, + 0x75, 0x24, 0xa3, 0x3b, 0x62, 0x52, 0xda, 0x27, 0x8c, 0xce, 0x10, 0xf2, 0x04, 0xf4, 0x91, 0xe0, + 0x2a, 0x10, 0x13, 0xb3, 0xdc, 0xd2, 0xda, 0x6b, 0xdd, 0x7b, 0x19, 0xfa, 0x20, 0xde, 0x49, 0x45, + 0x09, 0x49, 0x9e, 0x43, 0x43, 0x31, 0xa9, 0x0e, 0xa7, 0x8a, 0x71, 0xe9, 0x09, 0x6e, 0x7e, 0xfa, + 0xfa, 0x31, 0x56, 0x9b, 0x19, 0xf5, 0x71, 0x16, 0xa1, 0x79, 0xc5, 0xf6, 0x05, 0xe8, 0x89, 0x7f, + 0xb2, 0x03, 0xf5, 0xe4, 0x05, 0x0e, 0x33, 0xb5, 0x96, 0xd6, 0xae, 0xd1, 0x79, 0x80, 0x6c, 0x83, + 0xae, 0x84, 0xef, 0x8d, 0x3c, 0xd7, 0x2c, 0xb5, 0xb4, 0x76, 0xbd, 0xbf, 0x42, 0x67, 0x01, 0xd2, + 0x02, 0xc0, 0xcf, 0x01, 0x77, 0xd9, 0x14, 0xfd, 0x37, 0xfa, 0x2b, 0x34, 0x13, 0xdb, 0x07, 0xa8, + 0xe1, 0x8a, 0xb2, 0xb1, 0xf5, 0x5d, 0x03, 0x3d, 0x79, 0x0a, 0x21, 0x50, 0x19, 0x07, 0xe2, 0x1c, + 0xaf, 0x33, 0x28, 0x7e, 0x47, 0x31, 0xd7, 0x56, 0x36, 0x5e, 0x63, 0x50, 0xfc, 0x26, 0x77, 0xa0, + 0x2a, 0xd9, 0x05, 0x17, 0x78, 0xb8, 0x41, 0xe3, 0x05, 0xd9, 0x82, 0x2a, 0x9e, 0x6a, 0x56, 0x12, + 0x47, 0xf1, 0x72, 0xc1, 0x8f, 0x5e, 0xf4, 0x83, 0x6f, 0xf5, 0x4e, 0xb8, 0xad, 0xc2, 0x80, 0x99, + 0x55, 0x3c, 0x73, 0x1e, 0x20, 0xeb, 0x50, 0x3e, 0x63, 0xef, 0xcd, 0x55, 0x8c, 0x47, 0x9f, 0x39, + 0xff, 0x3f, 0x4a, 0xf0, 0x5f, 0xbe, 0x22, 0xe4, 0x31, 0x54, 0xbd, 0x53, 0xfb, 0x92, 0x25, 0x1d, + 0x72, 0xb7, 0x58, 0xbb, 0x41, 0xdf, 0xbe, 0x64, 0x34, 0xa6, 0x10, 0x7f, 0x67, 0x73, 0x95, 0x34, + 0xc6, 0x32, 0xfc, 0x8d, 0xcd, 0x15, 0x8d, 0xa9, 0x08, 0x3f, 0x09, 0xec, 0xb1, 0x32, 0xcb, 0xd7, + 0xe1, 0x2f, 0xa2, 0x6d, 0x1a, 0x53, 0x11, 0xee, 0x07, 0x21, 0x67, 0x66, 0xe5, 0x3a, 0x7c, 0x18, + 0x6d, 0xd3, 0x98, 0x22, 0x4f, 0xa1, 0xee, 0xb9, 0x82, 0x2b, 0x34, 0x54, 0x45, 0xc9, 0xfd, 0x25, + 0x86, 0x7a, 0x82, 0x2b, 0x34, 0x35, 0xa7, 0xc9, 0x1e, 0x00, 0x9b, 0x75, 0x92, 0xc4, 0x74, 0xad, + 0x75, 0x77, 0x8a, 0xda, 0xb4, 0xdb, 0x24, 0xcd, 0xf0, 0xd6, 0x14, 0x8c, 0x6c, 0x72, 0xd2, 0x0e, + 0x1b, 0xf4, 0xb0, 0x1d, 0xe6, 0x1d, 0x36, 0xe8, 0xdd, 0xdc, 0x61, 0xa4, 0x09, 0x70, 0x1e, 0x57, + 0x63, 0xd0, 0x93, 0x98, 0xd8, 0x3a, 0xcd, 0x44, 0x72, 0x15, 0xec, 0xcc, 0x6f, 0x8e, 0x9e, 0xb4, + 0xa0, 0xd5, 0x16, 0xb5, 0xd6, 0xdb, 0x94, 0xc7, 0x44, 0xdf, 0xc2, 0x69, 0xe9, 0x86, 0x7f, 0xe1, + 0xb3, 0x96, 0x1e, 0x8d, 0x45, 0xb9, 0xc5, 0xd1, 0x95, 0x25, 0x49, 0x78, 0x08, 0x55, 0x9f, 0xb1, + 0x40, 0x26, 0x8d, 0xb5, 0x99, 0xa9, 0xc5, 0x90, 0xb1, 0x60, 0xc0, 0xc7, 0x82, 0xc6, 0x04, 0x31, + 0x41, 0x77, 0xec, 0xd1, 0x99, 0x18, 0x8f, 0x31, 0x9d, 0x15, 0x3a, 0x5b, 0xe6, 0xfc, 0x75, 0x61, + 0x7d, 0xb1, 0x01, 0x6e, 0xcc, 0xd6, 0x07, 0x0d, 0x36, 0x0a, 0x95, 0x27, 0x0f, 0xae, 0x99, 0x55, + 0xb5, 0x85, 0x89, 0x44, 0x8e, 0x60, 0x13, 0xaf, 0x3f, 0xb6, 0x9d, 0x09, 0x9b, 0xe3, 0xbf, 0xbe, + 0x7c, 0x2b, 0x15, 0x46, 0xdb, 0xe1, 0x54, 0x1d, 0xa7, 0x2c, 0x5d, 0xa6, 0xb3, 0x9e, 0x41, 0x23, + 0x47, 0x91, 0x5d, 0xd8, 0x40, 0x6e, 0x3f, 0xe4, 0xee, 0x84, 0xf5, 0x6d, 0x79, 0xca, 0xe2, 0x57, + 0x18, 0xb4, 0xb8, 0x61, 0xbd, 0x84, 0xda, 0x2c, 0x73, 0x64, 0x0b, 0x56, 0xa3, 0xdc, 0x25, 0xa5, + 0x31, 0x68, 0xb2, 0x22, 0x8f, 0x60, 0x3d, 0x9a, 0x1d, 0xcc, 0x8d, 0x48, 0xca, 0x46, 0x22, 0x70, + 0x93, 0xe1, 0x55, 0x88, 0x5b, 0xff, 0x43, 0x23, 0x37, 0x8f, 0xf7, 0x8d, 0x9f, 0x57, 0x4d, 0xed, + 0xf7, 0x55, 0x53, 0xfb, 0x7b, 0xd5, 0xd4, 0xfe, 0x05, 0x00, 0x00, 0xff, 0xff, 0x54, 0x04, 0x21, + 0xbb, 0x91, 0x06, 0x00, 0x00, } func (m *RPC) Marshal() (dAtA []byte, err error) { @@ -897,12 +1170,14 @@ func (m *RPC_SubOpts) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } - if m.Topicid != nil { - i -= len(*m.Topicid) - copy(dAtA[i:], *m.Topicid) - i = encodeVarintRpc(dAtA, i, uint64(len(*m.Topicid))) - i-- - dAtA[i] = 0x12 + if m.TopicRef != nil { + { + size := m.TopicRef.Size() + i -= size + if _, err := m.TopicRef.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } } if m.Subscribe != nil { i-- @@ -917,6 +1192,32 @@ func (m *RPC_SubOpts) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *RPC_SubOpts_Topicid) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *RPC_SubOpts_Topicid) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Topicid) + copy(dAtA[i:], m.Topicid) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Topicid))) + i-- + dAtA[i] = 0x12 + return len(dAtA) - i, nil +} +func (m *RPC_SubOpts_TopicIndex) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *RPC_SubOpts_TopicIndex) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i = encodeVarintRpc(dAtA, i, uint64(m.TopicIndex)) + i-- + dAtA[i] = 0x18 + return len(dAtA) - i, nil +} func (m *Message) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -941,6 +1242,15 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.TopicRef != nil { + { + size := m.TopicRef.Size() + i -= size + if _, err := m.TopicRef.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } if m.Key != nil { i -= len(m.Key) copy(dAtA[i:], m.Key) @@ -955,13 +1265,6 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x2a } - if m.Topic != nil { - i -= len(*m.Topic) - copy(dAtA[i:], *m.Topic) - i = encodeVarintRpc(dAtA, i, uint64(len(*m.Topic))) - i-- - dAtA[i] = 0x22 - } if m.Seqno != nil { i -= len(m.Seqno) copy(dAtA[i:], m.Seqno) @@ -986,6 +1289,32 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Message_Topic) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_Topic) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Topic) + copy(dAtA[i:], m.Topic) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Topic))) + i-- + dAtA[i] = 0x22 + return len(dAtA) - i, nil +} +func (m *Message_TopicIndex) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_TopicIndex) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i = encodeVarintRpc(dAtA, i, uint64(m.TopicIndex)) + i-- + dAtA[i] = 0x38 + return len(dAtA) - i, nil +} func (m *ControlMessage) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1119,6 +1448,15 @@ func (m *ControlIHave) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.TopicRef != nil { + { + size := m.TopicRef.Size() + i -= size + if _, err := m.TopicRef.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } if len(m.MessageIDs) > 0 { for iNdEx := len(m.MessageIDs) - 1; iNdEx >= 0; iNdEx-- { i -= len(m.MessageIDs[iNdEx]) @@ -1128,16 +1466,35 @@ func (m *ControlIHave) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x12 } } - if m.TopicID != nil { - i -= len(*m.TopicID) - copy(dAtA[i:], *m.TopicID) - i = encodeVarintRpc(dAtA, i, uint64(len(*m.TopicID))) - i-- - dAtA[i] = 0xa - } return len(dAtA) - i, nil } +func (m *ControlIHave_TopicID) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ControlIHave_TopicID) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.TopicID) + copy(dAtA[i:], m.TopicID) + i = encodeVarintRpc(dAtA, i, uint64(len(m.TopicID))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *ControlIHave_TopicIndex) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ControlIHave_TopicIndex) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i = encodeVarintRpc(dAtA, i, uint64(m.TopicIndex)) + i-- + dAtA[i] = 0x18 + return len(dAtA) - i, nil +} func (m *ControlIWant) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1198,16 +1555,44 @@ func (m *ControlGraft) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } - if m.TopicID != nil { - i -= len(*m.TopicID) - copy(dAtA[i:], *m.TopicID) - i = encodeVarintRpc(dAtA, i, uint64(len(*m.TopicID))) - i-- - dAtA[i] = 0xa + if m.TopicRef != nil { + { + size := m.TopicRef.Size() + i -= size + if _, err := m.TopicRef.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } } return len(dAtA) - i, nil } +func (m *ControlGraft_TopicID) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ControlGraft_TopicID) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.TopicID) + copy(dAtA[i:], m.TopicID) + i = encodeVarintRpc(dAtA, i, uint64(len(m.TopicID))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *ControlGraft_TopicIndex) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ControlGraft_TopicIndex) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i = encodeVarintRpc(dAtA, i, uint64(m.TopicIndex)) + i-- + dAtA[i] = 0x10 + return len(dAtA) - i, nil +} func (m *ControlPrune) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1232,6 +1617,15 @@ func (m *ControlPrune) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.TopicRef != nil { + { + size := m.TopicRef.Size() + i -= size + if _, err := m.TopicRef.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } if m.Backoff != nil { i = encodeVarintRpc(dAtA, i, uint64(*m.Backoff)) i-- @@ -1251,16 +1645,35 @@ func (m *ControlPrune) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x12 } } - if m.TopicID != nil { - i -= len(*m.TopicID) - copy(dAtA[i:], *m.TopicID) - i = encodeVarintRpc(dAtA, i, uint64(len(*m.TopicID))) - i-- - dAtA[i] = 0xa - } return len(dAtA) - i, nil } +func (m *ControlPrune_TopicID) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ControlPrune_TopicID) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.TopicID) + copy(dAtA[i:], m.TopicID) + i = encodeVarintRpc(dAtA, i, uint64(len(m.TopicID))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *ControlPrune_TopicIndex) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ControlPrune_TopicIndex) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i = encodeVarintRpc(dAtA, i, uint64(m.TopicIndex)) + i-- + dAtA[i] = 0x20 + return len(dAtA) - i, nil +} func (m *ControlIDontWant) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1337,6 +1750,60 @@ func (m *ControlExtensions) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x90 } + if m.TopicTableExtension != nil { + { + size, err := m.TopicTableExtension.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + i-- + dAtA[i] = 0xb1 + i-- + dAtA[i] = 0xfc + i-- + dAtA[i] = 0xd2 + } + return len(dAtA) - i, nil +} + +func (m *ExtTopicTable) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ExtTopicTable) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ExtTopicTable) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.TopicBundleHashes) > 0 { + for iNdEx := len(m.TopicBundleHashes) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.TopicBundleHashes[iNdEx]) + copy(dAtA[i:], m.TopicBundleHashes[iNdEx]) + i = encodeVarintRpc(dAtA, i, uint64(len(m.TopicBundleHashes[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } return len(dAtA) - i, nil } @@ -1460,9 +1927,8 @@ func (m *RPC_SubOpts) Size() (n int) { if m.Subscribe != nil { n += 2 } - if m.Topicid != nil { - l = len(*m.Topicid) - n += 1 + l + sovRpc(uint64(l)) + if m.TopicRef != nil { + n += m.TopicRef.Size() } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) @@ -1470,6 +1936,25 @@ func (m *RPC_SubOpts) Size() (n int) { return n } +func (m *RPC_SubOpts_Topicid) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Topicid) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *RPC_SubOpts_TopicIndex) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += 1 + sovRpc(uint64(m.TopicIndex)) + return n +} func (m *Message) Size() (n int) { if m == nil { return 0 @@ -1488,9 +1973,8 @@ func (m *Message) Size() (n int) { l = len(m.Seqno) n += 1 + l + sovRpc(uint64(l)) } - if m.Topic != nil { - l = len(*m.Topic) - n += 1 + l + sovRpc(uint64(l)) + if m.TopicRef != nil { + n += m.TopicRef.Size() } if m.Signature != nil { l = len(m.Signature) @@ -1505,7 +1989,26 @@ func (m *Message) Size() (n int) { } return n } - + +func (m *Message_Topic) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Topic) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *Message_TopicIndex) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += 1 + sovRpc(uint64(m.TopicIndex)) + return n +} func (m *ControlMessage) Size() (n int) { if m == nil { return 0 @@ -1558,9 +2061,8 @@ func (m *ControlIHave) Size() (n int) { } var l int _ = l - if m.TopicID != nil { - l = len(*m.TopicID) - n += 1 + l + sovRpc(uint64(l)) + if m.TopicRef != nil { + n += m.TopicRef.Size() } if len(m.MessageIDs) > 0 { for _, s := range m.MessageIDs { @@ -1574,6 +2076,25 @@ func (m *ControlIHave) Size() (n int) { return n } +func (m *ControlIHave_TopicID) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TopicID) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *ControlIHave_TopicIndex) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += 1 + sovRpc(uint64(m.TopicIndex)) + return n +} func (m *ControlIWant) Size() (n int) { if m == nil { return 0 @@ -1598,9 +2119,8 @@ func (m *ControlGraft) Size() (n int) { } var l int _ = l - if m.TopicID != nil { - l = len(*m.TopicID) - n += 1 + l + sovRpc(uint64(l)) + if m.TopicRef != nil { + n += m.TopicRef.Size() } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) @@ -1608,15 +2128,33 @@ func (m *ControlGraft) Size() (n int) { return n } +func (m *ControlGraft_TopicID) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TopicID) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *ControlGraft_TopicIndex) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += 1 + sovRpc(uint64(m.TopicIndex)) + return n +} func (m *ControlPrune) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.TopicID != nil { - l = len(*m.TopicID) - n += 1 + l + sovRpc(uint64(l)) + if m.TopicRef != nil { + n += m.TopicRef.Size() } if len(m.Peers) > 0 { for _, e := range m.Peers { @@ -1633,6 +2171,25 @@ func (m *ControlPrune) Size() (n int) { return n } +func (m *ControlPrune_TopicID) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TopicID) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *ControlPrune_TopicIndex) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += 1 + sovRpc(uint64(m.TopicIndex)) + return n +} func (m *ControlIDontWant) Size() (n int) { if m == nil { return 0 @@ -1657,6 +2214,10 @@ func (m *ControlExtensions) Size() (n int) { } var l int _ = l + if m.TopicTableExtension != nil { + l = m.TopicTableExtension.Size() + n += 4 + l + sovRpc(uint64(l)) + } if m.TestExtension != nil { n += 5 } @@ -1666,6 +2227,24 @@ func (m *ControlExtensions) Size() (n int) { return n } +func (m *ExtTopicTable) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.TopicBundleHashes) > 0 { + for _, b := range m.TopicBundleHashes { + l = len(b) + n += 1 + l + sovRpc(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func (m *PeerInfo) Size() (n int) { if m == nil { return 0 @@ -1975,9 +2554,28 @@ func (m *RPC_SubOpts) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - s := string(dAtA[iNdEx:postIndex]) - m.Topicid = &s + m.TopicRef = &RPC_SubOpts_Topicid{string(dAtA[iNdEx:postIndex])} iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicIndex", wireType) + } + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TopicRef = &RPC_SubOpts_TopicIndex{v} default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -2161,8 +2759,7 @@ func (m *Message) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - s := string(dAtA[iNdEx:postIndex]) - m.Topic = &s + m.TopicRef = &Message_Topic{string(dAtA[iNdEx:postIndex])} iNdEx = postIndex case 5: if wireType != 2 { @@ -2232,6 +2829,26 @@ func (m *Message) Unmarshal(dAtA []byte) error { m.Key = []byte{} } iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicIndex", wireType) + } + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TopicRef = &Message_TopicIndex{v} default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -2570,8 +3187,7 @@ func (m *ControlIHave) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - s := string(dAtA[iNdEx:postIndex]) - m.TopicID = &s + m.TopicRef = &ControlIHave_TopicID{string(dAtA[iNdEx:postIndex])} iNdEx = postIndex case 2: if wireType != 2 { @@ -2605,6 +3221,26 @@ func (m *ControlIHave) Unmarshal(dAtA []byte) error { } m.MessageIDs = append(m.MessageIDs, string(dAtA[iNdEx:postIndex])) iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicIndex", wireType) + } + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TopicRef = &ControlIHave_TopicIndex{v} default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -2769,9 +3405,28 @@ func (m *ControlGraft) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - s := string(dAtA[iNdEx:postIndex]) - m.TopicID = &s + m.TopicRef = &ControlGraft_TopicID{string(dAtA[iNdEx:postIndex])} iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicIndex", wireType) + } + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TopicRef = &ControlGraft_TopicIndex{v} default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -2853,8 +3508,7 @@ func (m *ControlPrune) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - s := string(dAtA[iNdEx:postIndex]) - m.TopicID = &s + m.TopicRef = &ControlPrune_TopicID{string(dAtA[iNdEx:postIndex])} iNdEx = postIndex case 2: if wireType != 2 { @@ -2910,6 +3564,26 @@ func (m *ControlPrune) Unmarshal(dAtA []byte) error { } } m.Backoff = &v + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicIndex", wireType) + } + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TopicRef = &ControlPrune_TopicIndex{v} default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -3044,6 +3718,42 @@ func (m *ControlExtensions) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: ControlExtensions: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 4820938: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicTableExtension", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.TopicTableExtension == nil { + m.TopicTableExtension = &ExtTopicTable{} + } + if err := m.TopicTableExtension.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex case 6492434: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field TestExtension", wireType) @@ -3087,6 +3797,89 @@ func (m *ControlExtensions) Unmarshal(dAtA []byte) error { } return nil } +func (m *ExtTopicTable) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ExtTopicTable: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ExtTopicTable: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicBundleHashes", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TopicBundleHashes = append(m.TopicBundleHashes, make([]byte, postIndex-iNdEx)) + copy(m.TopicBundleHashes[len(m.TopicBundleHashes)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *PeerInfo) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pb/rpc.proto b/pb/rpc.proto index 4f3833c1..fdb5131d 100644 --- a/pb/rpc.proto +++ b/pb/rpc.proto @@ -8,7 +8,10 @@ message RPC { message SubOpts { optional bool subscribe = 1; // subscribe or unsubcribe - optional string topicid = 2; + oneof topicRef { + string topicid = 2; + uint32 topicIndex = 3; + } } optional ControlMessage control = 3; @@ -25,7 +28,10 @@ message Message { optional bytes from = 1; optional bytes data = 2; optional bytes seqno = 3; - optional string topic = 4; + oneof topicRef { + string topic = 4; + uint32 topicIndex = 7; + } optional bytes signature = 5; optional bytes key = 6; } @@ -40,7 +46,10 @@ message ControlMessage { } message ControlIHave { - optional string topicID = 1; + oneof topicRef { + string topicID = 1; + uint32 topicIndex = 3; + } // implementors from other languages should use bytes here - go protobuf emits invalid utf8 strings repeated string messageIDs = 2; } @@ -51,11 +60,17 @@ message ControlIWant { } message ControlGraft { - optional string topicID = 1; + oneof topicRef { + string topicID = 1; + uint32 topicIndex = 2; + } } message ControlPrune { - optional string topicID = 1; + oneof topicRef { + string topicID = 1; + uint32 topicIndex = 4; + } repeated PeerInfo peers = 2; optional uint64 backoff = 3; } @@ -72,6 +87,11 @@ message ControlExtensions { // Experimental extensions must use field numbers larger than 0x200000 to be // encoded with 4 bytes optional bool testExtension = 6492434; + optional ExtTopicTable topicTableExtension = 4820938; +} + +message ExtTopicTable { + repeated bytes topicBundleHashes = 1; } @@ -80,4 +100,4 @@ message PeerInfo { optional bytes signedPeerRecord = 2; } -message TestExtension {} \ No newline at end of file +message TestExtension {} diff --git a/pubsub.go b/pubsub.go index 03ada8ef..80d70703 100644 --- a/pubsub.go +++ b/pubsub.go @@ -217,6 +217,8 @@ type PubSubRouter interface { // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) + // InterceptRPC intercepts any incoming RPC before doing anything. Routers may also modify the RPC. + InterceptRPC(*RPC) *RPC // Publish is invoked to forward a new message that has been validated. Publish(*Message) // Join notifies the router that we want to receive and forward messages in a topic. @@ -402,9 +404,9 @@ func (rpc *RPC) split(limit int) iter.Seq[RPC] { for _, ihave := range ctl.GetIhave() { if len(nextRPC.Control.Ihave) == 0 || - nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1].TopicID != ihave.TopicID { + nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1].GetTopicID() != ihave.GetTopicID() { // Start a new IHAVE if we are referencing a new topic ID - newIhave := &pb.ControlIHave{TopicID: ihave.TopicID} + newIhave := &pb.ControlIHave{TopicRef: ihave.TopicRef} if nextRPC.Control.Ihave = append(nextRPC.Control.Ihave, newIhave); nextRPC.Size() > limit { nextRPC.Control.Ihave = nextRPC.Control.Ihave[:len(nextRPC.Control.Ihave)-1] if !yield(nextRPC) { @@ -423,7 +425,7 @@ func (rpc *RPC) split(limit int) iter.Seq[RPC] { return } nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ - Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}}, + Ihave: []*pb.ControlIHave{{TopicRef: ihave.TopicRef, MessageIDs: []string{msgID}}}, }}, from: rpc.from} } } @@ -1149,7 +1151,7 @@ func (p *PubSub) handleRemoveRelay(topic string) { // Only called from processLoop. func (p *PubSub) announce(topic string, sub bool) { subopt := &pb.RPC_SubOpts{ - Topicid: &topic, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic}, Subscribe: &sub, } @@ -1193,7 +1195,7 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { } subopt := &pb.RPC_SubOpts{ - Topicid: &topic, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic}, Subscribe: &sub, } @@ -1267,6 +1269,9 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) { } func (p *PubSub) handleIncomingRPC(rpc *RPC) { + // intercept and possibly modify the RPC by the router + rpc = p.rt.InterceptRPC(rpc) + // pass the rpc through app specific validation (if any available). if p.appSpecificRpcInspector != nil { // check if the RPC is allowed by the external inspector diff --git a/randomsub.go b/randomsub.go index a2d2b4dc..efdb01fb 100644 --- a/randomsub.go +++ b/randomsub.go @@ -51,6 +51,10 @@ func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID, hello *RPC) *RP return hello } +func (rs *RandomSubRouter) InterceptRPC(rpc *RPC) *RPC { + return rpc +} + func (rs *RandomSubRouter) RemovePeer(p peer.ID) { rs.tracer.RemovePeer(p) delete(rs.peers, p) diff --git a/score_test.go b/score_test.go index 93369a9e..15b2bc78 100644 --- a/score_test.go +++ b/score_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" ) @@ -110,7 +111,7 @@ func TestScoreFirstMessageDeliveries(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.DeliverMessage(&msg) @@ -150,7 +151,7 @@ func TestScoreFirstMessageDeliveriesCap(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.DeliverMessage(&msg) @@ -190,7 +191,7 @@ func TestScoreFirstMessageDeliveriesDecay(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.DeliverMessage(&msg) @@ -270,7 +271,7 @@ func TestScoreMeshMessageDeliveries(t *testing.T) { wg := sync.WaitGroup{} for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.DeliverMessage(&msg) @@ -340,7 +341,7 @@ func TestScoreMeshMessageDeliveriesDecay(t *testing.T) { nMessages := 40 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.DeliverMessage(&msg) @@ -414,7 +415,7 @@ func TestScoreMeshFailurePenalty(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.DeliverMessage(&msg) @@ -474,7 +475,7 @@ func TestScoreInvalidMessageDeliveries(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.RejectMessage(&msg, RejectInvalidSignature) } @@ -511,7 +512,7 @@ func TestScoreInvalidMessageDeliveriesDecay(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.RejectMessage(&msg, RejectInvalidSignature) } @@ -557,7 +558,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) { ps.AddPeer(peerB, "myproto") pbMsg := makeTestMessage(0) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} msg2 := Message{ReceivedFrom: peerB, Message: pbMsg} @@ -948,7 +949,7 @@ func TestScoreRecapTopicParams(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.DeliverMessage(&msg) @@ -1030,7 +1031,7 @@ func TestScoreResetTopicParams(t *testing.T) { nMessages := 100 for i := 0; i < nMessages; i++ { pbMsg := makeTestMessage(i) - pbMsg.Topic = &mytopic + pbMsg.TopicRef = &pb.Message_Topic{Topic: mytopic} msg := Message{ReceivedFrom: peerA, Message: pbMsg} ps.ValidateMessage(&msg) ps.RejectMessage(&msg, RejectValidationFailed) diff --git a/sign_test.go b/sign_test.go index 48149346..4b42e2c0 100644 --- a/sign_test.go +++ b/sign_test.go @@ -30,10 +30,10 @@ func testSignVerify(t *testing.T, privk crypto.PrivKey) { } topic := "foo" m := pb.Message{ - Data: []byte("abc"), - Topic: &topic, - From: []byte(id), - Seqno: []byte("123"), + Data: []byte("abc"), + TopicRef: &pb.Message_Topic{Topic: topic}, + From: []byte(id), + Seqno: []byte("123"), } signMessage(id, privk, &m) err = verifyMessageSignature(&m) diff --git a/subscription_filter_test.go b/subscription_filter_test.go index 0057cdcf..f0dae654 100644 --- a/subscription_filter_test.go +++ b/subscription_filter_test.go @@ -20,15 +20,15 @@ func TestBasicSubscriptionFilter(t *testing.T) { yes := true subs := []*pb.RPC_SubOpts{ { - Topicid: &topic1, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic1}, Subscribe: &yes, }, { - Topicid: &topic2, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic2}, Subscribe: &yes, }, { - Topicid: &topic3, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic3}, Subscribe: &yes, }, } @@ -109,24 +109,24 @@ func TestSubscriptionFilterDeduplication(t *testing.T) { no := false subs := []*pb.RPC_SubOpts{ { - Topicid: &topic1, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic1}, Subscribe: &yes, }, { - Topicid: &topic1, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic1}, Subscribe: &yes, }, { - Topicid: &topic2, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic2}, Subscribe: &yes, }, { - Topicid: &topic2, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic2}, Subscribe: &no, }, { - Topicid: &topic3, + TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: topic3}, Subscribe: &yes, }, } diff --git a/tag_tracer_test.go b/tag_tracer_test.go index f96a8e1e..ceea5aeb 100644 --- a/tag_tracer_test.go +++ b/tag_tracer_test.go @@ -105,9 +105,9 @@ func TestTagTracerDeliveryTags(t *testing.T) { msg := &Message{ ReceivedFrom: p, Message: &pb.Message{ - From: []byte(p), - Data: []byte("hello"), - Topic: topic, + From: []byte(p), + Data: []byte("hello"), + TopicRef: &pb.Message_Topic{Topic: *topic}, }, } tt.DeliverMessage(msg) @@ -191,10 +191,10 @@ func TestTagTracerDeliveryTagsNearFirst(t *testing.T) { msg := &Message{ ReceivedFrom: p, Message: &pb.Message{ - From: []byte(p), - Data: []byte(fmt.Sprintf("msg-%d", i)), - Topic: &topic, - Seqno: []byte(fmt.Sprintf("%d", i)), + From: []byte(p), + Data: []byte(fmt.Sprintf("msg-%d", i)), + TopicRef: &pb.Message_Topic{Topic: topic}, + Seqno: []byte(fmt.Sprintf("%d", i)), }, } diff --git a/testextension.go b/test_extension.go similarity index 100% rename from testextension.go rename to test_extension.go diff --git a/topic.go b/topic.go index dd094eae..42a7055b 100644 --- a/topic.go +++ b/topic.go @@ -290,10 +290,10 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes } m := &pb.Message{ - Data: data, - Topic: &t.topic, - From: nil, - Seqno: nil, + Data: data, + TopicRef: &pb.Message_Topic{Topic: t.topic}, + From: nil, + Seqno: nil, } if pid != "" { m.From = []byte(pid) diff --git a/topictable_extension.go b/topictable_extension.go new file mode 100644 index 00000000..88cb6b35 --- /dev/null +++ b/topictable_extension.go @@ -0,0 +1,268 @@ +package pubsub + +import ( + "bytes" + "crypto/sha256" + "fmt" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + "sort" + "strings" +) + +type TopicBundleHash [4]byte + +func newTopicBundleHash(bytes []byte) (*TopicBundleHash, error) { + if len(bytes) != 4 { + return nil, fmt.Errorf("expected 4 bytes for TopicBundleHash found: %d", len(bytes)) + } + var hash TopicBundleHash + copy(hash[:], bytes) + + return &hash, nil +} + +type topicTableExtension struct { + bundleHashes []TopicBundleHash + intersectedHashes map[peer.ID][]TopicBundleHash + + indexToName map[TopicBundleHash][]string // bundle hash -> list of topics + nameToIndex map[TopicBundleHash]map[string]int // bundle hash -> topic -> 0-based index in bundle +} + +func newTopicTableExtension(myBundles [][]string) (*topicTableExtension, error) { + bundleHashes := make([]TopicBundleHash, 0, len(myBundles)) + + indexToName := make(map[TopicBundleHash][]string) + nameToIndex := make(map[TopicBundleHash]map[string]int) + + for _, topics := range myBundles { + sort.Strings(topics) + + hash := computeTopicBundleHash(topics) + bundleHashes = append(bundleHashes, hash) + + indexToName[hash] = topics + nameToIndex[hash] = make(map[string]int) + for idx, topic := range topics { + nameToIndex[hash][topic] = idx + } + } + if err := validateBundles(bundleHashes); err != nil { + return nil, err + } + e := &topicTableExtension{ + bundleHashes: bundleHashes, + intersectedHashes: make(map[peer.ID][]TopicBundleHash), + indexToName: indexToName, + nameToIndex: nameToIndex, + } + return e, nil +} + +func (e *topicTableExtension) GetControlExtension() *pubsub_pb.ExtTopicTable { + hashSlices := make([][]byte, 0, len(e.bundleHashes)) + + for _, hash := range e.bundleHashes { + hashSlices = append(hashSlices, hash[:]) + } + return &pubsub_pb.ExtTopicTable{ + TopicBundleHashes: hashSlices, + } +} + +func (e *topicTableExtension) InterceptRPC(rpc *RPC) *RPC { + // Replace all topic indices with topic names for this peer + id := rpc.from + + // Replace in IHAVE messages + for _, ihave := range rpc.GetControl().GetIhave() { + if topicIndex, ok := ihave.GetTopicRef().(*pubsub_pb.ControlIHave_TopicIndex); ok { + if topicName, err := e.GetTopicName(id, int(topicIndex.TopicIndex)); err == nil { + ihave.TopicRef = &pubsub_pb.ControlIHave_TopicID{TopicID: topicName} + } + } + } + // Replace in GRAFT messages + for _, graft := range rpc.GetControl().GetGraft() { + if topicIndex, ok := graft.GetTopicRef().(*pubsub_pb.ControlGraft_TopicIndex); ok { + if topicName, err := e.GetTopicName(id, int(topicIndex.TopicIndex)); err == nil { + graft.TopicRef = &pubsub_pb.ControlGraft_TopicID{TopicID: topicName} + } + } + } + // Replace in PRUNE messages + for _, prune := range rpc.GetControl().GetPrune() { + if topicIndex, ok := prune.GetTopicRef().(*pubsub_pb.ControlPrune_TopicIndex); ok { + if topicName, err := e.GetTopicName(id, int(topicIndex.TopicIndex)); err == nil { + prune.TopicRef = &pubsub_pb.ControlPrune_TopicID{TopicID: topicName} + } + } + } + // Replace in published messages + for _, msg := range rpc.GetPublish() { + if topicIndex, ok := msg.GetTopicRef().(*pubsub_pb.Message_TopicIndex); ok { + if topicName, err := e.GetTopicName(id, int(topicIndex.TopicIndex)); err == nil { + msg.TopicRef = &pubsub_pb.Message_Topic{Topic: topicName} + } + } + } + // Replace in subscriptions + for _, sub := range rpc.GetSubscriptions() { + if topicIndex, ok := sub.GetTopicRef().(*pubsub_pb.RPC_SubOpts_TopicIndex); ok { + if topicName, err := e.GetTopicName(id, int(topicIndex.TopicIndex)); err == nil { + sub.TopicRef = &pubsub_pb.RPC_SubOpts_Topicid{Topicid: topicName} + } + } + } + return rpc +} + +func (e *topicTableExtension) AddPeer(id peer.ID, bundles []TopicBundleHash) error { + if err := validateBundles(bundles); err != nil { + return err + } + e.intersectedHashes[id] = computeBundleIntersection(e.bundleHashes, bundles) + return nil +} + +func (e *topicTableExtension) ExtendRPC(id peer.ID, rpc *RPC) *RPC { + // If it's a hello packet, don't replace the topic names with topic indices + if rpc.GetControl().GetExtensions().GetTopicTableExtension() != nil { + return rpc + } + + // Replace all topic names with topic indices for this peer + + // Replace in IHAVE messages + for _, ihave := range rpc.GetControl().GetIhave() { + if topicID, ok := ihave.GetTopicRef().(*pubsub_pb.ControlIHave_TopicID); ok { + if idx, err := e.GetTopicIndex(id, topicID.TopicID); err == nil { + ihave.TopicRef = &pubsub_pb.ControlIHave_TopicIndex{TopicIndex: uint32(idx)} + } + } + } + // Replace in GRAFT messages + for _, graft := range rpc.GetControl().GetGraft() { + if topicID, ok := graft.GetTopicRef().(*pubsub_pb.ControlGraft_TopicID); ok { + if idx, err := e.GetTopicIndex(id, topicID.TopicID); err == nil { + graft.TopicRef = &pubsub_pb.ControlGraft_TopicIndex{TopicIndex: uint32(idx)} + } + } + } + // Replace in PRUNE messages + for _, prune := range rpc.GetControl().GetPrune() { + if topicID, ok := prune.GetTopicRef().(*pubsub_pb.ControlPrune_TopicID); ok { + if idx, err := e.GetTopicIndex(id, topicID.TopicID); err == nil { + prune.TopicRef = &pubsub_pb.ControlPrune_TopicIndex{TopicIndex: uint32(idx)} + } + } + } + // Replace in published messages + for _, msg := range rpc.GetPublish() { + if topic, ok := msg.GetTopicRef().(*pubsub_pb.Message_Topic); ok { + if idx, err := e.GetTopicIndex(id, topic.Topic); err == nil { + msg.TopicRef = &pubsub_pb.Message_TopicIndex{TopicIndex: uint32(idx)} + } + } + } + // Replace in subscriptions + for _, sub := range rpc.GetSubscriptions() { + if topicid, ok := sub.GetTopicRef().(*pubsub_pb.RPC_SubOpts_Topicid); ok { + if idx, err := e.GetTopicIndex(id, topicid.Topicid); err == nil { + sub.TopicRef = &pubsub_pb.RPC_SubOpts_TopicIndex{TopicIndex: uint32(idx)} + } + } + } + return rpc +} + +// Note that topicIndex is 1-based +func (e *topicTableExtension) GetTopicName(id peer.ID, topicIndex int) (string, error) { + if topicIndex < 1 { + return "", fmt.Errorf("invalid topic index: %d", topicIndex) + } + + // Turn the index to 0-based + idx := topicIndex - 1 + + for _, hash := range e.intersectedHashes[id] { + if idx < len(e.indexToName[hash]) { + return e.indexToName[hash][idx], nil + } else { + idx -= len(e.indexToName[hash]) + } + } + return "", fmt.Errorf("invalid topic index: %d", topicIndex) +} + +// It returns a 1-based index +func (e *topicTableExtension) GetTopicIndex(id peer.ID, topicName string) (int, error) { + topicIndex := 0 + + for _, hash := range e.intersectedHashes[id] { + if idx, ok := e.nameToIndex[hash][topicName]; ok { + topicIndex += idx + // Turn the index to 1-based + topicIndex += 1 + return topicIndex, nil + } else { + topicIndex += len(e.nameToIndex[hash]) + } + } + return 0, fmt.Errorf("the topic not found: %s", topicName) +} + +func validateBundles(bundles []TopicBundleHash) error { + seen := make(map[TopicBundleHash]struct{}, len(bundles)) + for _, bundle := range bundles { + if _, ok := seen[bundle]; ok { + return fmt.Errorf("duplicates found") + } + seen[bundle] = struct{}{} + } + return nil +} + +// Assume that the topics have been sorted +func computeTopicBundleHash(sortedTopics []string) TopicBundleHash { + concatenated := strings.Join(sortedTopics, "") + hash := sha256.Sum256([]byte(concatenated)) + + var result TopicBundleHash + copy(result[:], hash[len(hash)-4:]) + return result +} + +func computeBundleIntersection(first, second []TopicBundleHash) []TopicBundleHash { + var result []TopicBundleHash + + // Find common prefix where elements at each index are equal in both slices. + for i := 0; i < min(len(first), len(second)) && bytes.Equal(first[i][:], second[i][:]); i++ { + result = append(result, first[i]) + } + + // Store the length of the matching prefix. This is our marker. + prefixLen := len(result) + + // Build a set of the remaining elements in the first slice after the prefix. + // For each remaining element in the second slice, if it exists in the set, + // add it to the result. (Duplicates possible if not validated up front.) + seen := make(map[TopicBundleHash]struct{}) + for _, v := range first[prefixLen:] { + seen[v] = struct{}{} + } + for _, v := range second[prefixLen:] { + if _, ok := seen[v]; ok { + result = append(result, v) + } + } + + // Sort the unordered tail lexicographically. + unordered := result[prefixLen:] + sort.Slice(unordered, func(i, j int) bool { + return bytes.Compare(unordered[i][:], unordered[j][:]) < 0 + }) + + return result +} diff --git a/topictable_extension_test.go b/topictable_extension_test.go new file mode 100644 index 00000000..8a5f8265 --- /dev/null +++ b/topictable_extension_test.go @@ -0,0 +1,304 @@ +package pubsub + +import ( + "testing" + + "github.com/libp2p/go-libp2p/core/peer" +) + +func TestGetTopicName(t *testing.T) { + // Create a topic table extension with some bundles + myBundles := [][]string{ + {"topic-c", "topic-a", "topic-b"}, // Will be sorted to: topic-a, topic-b, topic-c + {"topic-z", "topic-x", "topic-y"}, // Will be sorted to: topic-x, topic-y, topic-z + } + + ext, err := newTopicTableExtension(myBundles) + if err != nil { + t.Fatalf("failed to create topic table extension: %v", err) + } + + // Create peer bundles that intersect with our bundles + peerID := peer.ID("test-peer") + peerBundles := []TopicBundleHash{ + computeTopicBundleHash([]string{"topic-a", "topic-b", "topic-c"}), + computeTopicBundleHash([]string{"topic-x", "topic-y", "topic-z"}), + } + + err = ext.AddPeer(peerID, peerBundles) + if err != nil { + t.Fatalf("failed to add peer: %v", err) + } + + tests := []struct { + name string + topicIndex int + want string + wantErr bool + }{ + { + name: "first topic in first bundle", + topicIndex: 1, + want: "topic-a", + wantErr: false, + }, + { + name: "second topic in first bundle", + topicIndex: 2, + want: "topic-b", + wantErr: false, + }, + { + name: "third topic in first bundle", + topicIndex: 3, + want: "topic-c", + wantErr: false, + }, + { + name: "first topic in second bundle", + topicIndex: 4, + want: "topic-x", + wantErr: false, + }, + { + name: "second topic in second bundle", + topicIndex: 5, + want: "topic-y", + wantErr: false, + }, + { + name: "third topic in second bundle", + topicIndex: 6, + want: "topic-z", + wantErr: false, + }, + { + name: "invalid index: zero", + topicIndex: 0, + want: "", + wantErr: true, + }, + { + name: "invalid index: negative", + topicIndex: -1, + want: "", + wantErr: true, + }, + { + name: "invalid index: out of bounds", + topicIndex: 7, + want: "", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ext.GetTopicName(peerID, tt.topicIndex) + if (err != nil) != tt.wantErr { + t.Errorf("GetTopicName() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetTopicName() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetTopicIndex(t *testing.T) { + // Create a topic table extension with some bundles + myBundles := [][]string{ + {"topic-c", "topic-a", "topic-b"}, // Will be sorted to: topic-a, topic-b, topic-c + {"topic-z", "topic-x", "topic-y"}, // Will be sorted to: topic-x, topic-y, topic-z + } + + ext, err := newTopicTableExtension(myBundles) + if err != nil { + t.Fatalf("failed to create topic table extension: %v", err) + } + + // Create peer bundles that intersect with our bundles + peerID := peer.ID("test-peer") + peerBundles := []TopicBundleHash{ + computeTopicBundleHash([]string{"topic-a", "topic-b", "topic-c"}), + computeTopicBundleHash([]string{"topic-x", "topic-y", "topic-z"}), + } + + err = ext.AddPeer(peerID, peerBundles) + if err != nil { + t.Fatalf("failed to add peer: %v", err) + } + + tests := []struct { + name string + topicName string + want int + wantErr bool + }{ + { + name: "first topic in first bundle", + topicName: "topic-a", + want: 1, + wantErr: false, + }, + { + name: "second topic in first bundle", + topicName: "topic-b", + want: 2, + wantErr: false, + }, + { + name: "third topic in first bundle", + topicName: "topic-c", + want: 3, + wantErr: false, + }, + { + name: "first topic in second bundle", + topicName: "topic-x", + want: 4, + wantErr: false, + }, + { + name: "second topic in second bundle", + topicName: "topic-y", + want: 5, + wantErr: false, + }, + { + name: "third topic in second bundle", + topicName: "topic-z", + want: 6, + wantErr: false, + }, + { + name: "topic not found", + topicName: "topic-nonexistent", + want: 0, + wantErr: true, + }, + { + name: "empty topic name", + topicName: "", + want: 0, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ext.GetTopicIndex(peerID, tt.topicName) + if (err != nil) != tt.wantErr { + t.Errorf("GetTopicIndex() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetTopicIndex() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetTopicNameAndIndexRoundTrip(t *testing.T) { + // Test that GetTopicName and GetTopicIndex are inverse operations + myBundles := [][]string{ + {"alpha", "beta", "gamma"}, + {"delta", "epsilon", "zeta"}, + {"eta", "theta"}, + } + + ext, err := newTopicTableExtension(myBundles) + if err != nil { + t.Fatalf("failed to create topic table extension: %v", err) + } + + peerID := peer.ID("test-peer") + peerBundles := []TopicBundleHash{ + computeTopicBundleHash([]string{"alpha", "beta", "gamma"}), + computeTopicBundleHash([]string{"delta", "epsilon", "zeta"}), + computeTopicBundleHash([]string{"eta", "theta"}), + } + + err = ext.AddPeer(peerID, peerBundles) + if err != nil { + t.Fatalf("failed to add peer: %v", err) + } + + // Test round trip: index -> name -> index + for i := 1; i <= 8; i++ { + name, err := ext.GetTopicName(peerID, i) + if err != nil { + t.Fatalf("GetTopicName(%d) failed: %v", i, err) + } + + index, err := ext.GetTopicIndex(peerID, name) + if err != nil { + t.Fatalf("GetTopicIndex(%s) failed: %v", name, err) + } + + if index != i { + t.Errorf("Round trip failed: started with index %d, got back %d (via name %s)", i, index, name) + } + } +} + +func TestGetTopicWithPartialIntersection(t *testing.T) { + // Test case where peer has only partial intersection with our bundles + myBundles := [][]string{ + {"topic-a", "topic-b", "topic-c"}, + {"topic-x", "topic-y", "topic-z"}, + {"topic-1", "topic-2", "topic-3"}, + } + + ext, err := newTopicTableExtension(myBundles) + if err != nil { + t.Fatalf("failed to create topic table extension: %v", err) + } + + peerID := peer.ID("test-peer") + // Peer only has the second bundle in common + peerBundles := []TopicBundleHash{ + computeTopicBundleHash([]string{"topic-x", "topic-y", "topic-z"}), + computeTopicBundleHash([]string{"topic-k", "topic-l", "topic-m"}), + } + + err = ext.AddPeer(peerID, peerBundles) + if err != nil { + t.Fatalf("failed to add peer: %v", err) + } + + // Should be able to access topics in the intersected bundle + name, err := ext.GetTopicName(peerID, 1) + if err != nil { + t.Fatalf("GetTopicName(1) failed: %v", err) + } + if name != "topic-x" { + t.Errorf("GetTopicName(1) = %v, want topic-x", name) + } + + // Should be able to get index for topic in intersected bundle + index, err := ext.GetTopicIndex(peerID, "topic-y") + if err != nil { + t.Fatalf("GetTopicIndex(topic-y) failed: %v", err) + } + if index != 2 { + t.Errorf("GetTopicIndex(topic-y) = %v, want 2", index) + } + + // Should not be able to access topics outside intersection + _, err = ext.GetTopicIndex(peerID, "topic-a") + if err == nil { + t.Error("GetTopicIndex(topic-a) should have failed for topic outside intersection") + } + _, err = ext.GetTopicIndex(peerID, "topic-k") + if err == nil { + t.Error("GetTopicIndex(topic-k) should have failed for topic outside intersection") + } + + // Should not be able to access index beyond intersection + _, err = ext.GetTopicName(peerID, 4) + if err == nil { + t.Error("GetTopicName(4) should have failed for index beyond intersection") + } +} diff --git a/trace.go b/trace.go index 7dbb5409..727401e0 100644 --- a/trace.go +++ b/trace.go @@ -77,13 +77,14 @@ func (t *pubsubTracer) PublishMessage(msg *Message) { } now := time.Now().UnixNano() + topic := msg.Message.GetTopic() evt := &pb.TraceEvent{ Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, PublishMessage: &pb.TraceEvent_PublishMessage{ MessageID: []byte(t.idGen.ID(msg)), - Topic: msg.Message.Topic, + Topic: &topic, }, } @@ -118,6 +119,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { } now := time.Now().UnixNano() + topic := msg.GetTopic() evt := &pb.TraceEvent{ Type: pb.TraceEvent_REJECT_MESSAGE.Enum(), PeerID: []byte(t.pid), @@ -126,7 +128,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { MessageID: []byte(t.idGen.ID(msg)), ReceivedFrom: []byte(msg.ReceivedFrom), Reason: &reason, - Topic: msg.Topic, + Topic: &topic, }, } @@ -149,6 +151,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) { } now := time.Now().UnixNano() + topic := msg.GetTopic() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(), PeerID: []byte(t.pid), @@ -156,7 +159,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) { DuplicateMessage: &pb.TraceEvent_DuplicateMessage{ MessageID: []byte(t.idGen.ID(msg)), ReceivedFrom: []byte(msg.ReceivedFrom), - Topic: msg.Topic, + Topic: &topic, }, } @@ -179,13 +182,14 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) { } now := time.Now().UnixNano() + topic := msg.GetTopic() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, DeliverMessage: &pb.TraceEvent_DeliverMessage{ MessageID: []byte(t.idGen.ID(msg)), - Topic: msg.Topic, + Topic: &topic, ReceivedFrom: []byte(msg.ReceivedFrom), }, } @@ -343,18 +347,20 @@ func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { var msgs []*pb.TraceEvent_MessageMeta for _, m := range rpc.Publish { + topic := m.GetTopic() msgs = append(msgs, &pb.TraceEvent_MessageMeta{ MessageID: []byte(t.idGen.RawID(m)), - Topic: m.Topic, + Topic: &topic, }) } rpcMeta.Messages = msgs var subs []*pb.TraceEvent_SubMeta for _, sub := range rpc.Subscriptions { + topic := sub.GetTopicid() subs = append(subs, &pb.TraceEvent_SubMeta{ Subscribe: sub.Subscribe, - Topic: sub.Topicid, + Topic: &topic, }) } rpcMeta.Subscription = subs @@ -366,8 +372,9 @@ func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { for _, mid := range ctl.MessageIDs { mids = append(mids, []byte(mid)) } + topic := ctl.GetTopicID() ihave = append(ihave, &pb.TraceEvent_ControlIHaveMeta{ - Topic: ctl.TopicID, + Topic: &topic, MessageIDs: mids, }) } @@ -385,8 +392,9 @@ func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { var graft []*pb.TraceEvent_ControlGraftMeta for _, ctl := range rpc.Control.Graft { + topic := ctl.GetTopicID() graft = append(graft, &pb.TraceEvent_ControlGraftMeta{ - Topic: ctl.TopicID, + Topic: &topic, }) } @@ -396,8 +404,9 @@ func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { for _, pi := range ctl.Peers { peers = append(peers, pi.PeerID) } + topic := ctl.GetTopicID() prune = append(prune, &pb.TraceEvent_ControlPruneMeta{ - Topic: ctl.TopicID, + Topic: &topic, Peers: peers, }) }