Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription filter #1210

Merged
merged 327 commits into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from 250 commits
Commits
Show all changes
327 commits
Select commit Hold shift + click to select a range
01eef9e
update middleware
synzhu Aug 21, 2021
2f7b178
Address comments
synzhu Aug 21, 2021
112c27d
protocol state provider test
synzhu Aug 21, 2021
96cf07f
updating origin ID checks to use idtranslator
synzhu Aug 21, 2021
93a573a
new providers
synzhu Aug 21, 2021
31c46bf
Merge branch 'master' into smnzhu/identity-provider
synzhu Aug 21, 2021
9dd6d5e
fix test
synzhu Aug 21, 2021
c7b4ba6
remove comments
synzhu Aug 21, 2021
47c6ad8
Merge branch 'smnzhu/identity-provider' of https://github.com/onflow/…
synzhu Aug 21, 2021
7a994ce
Update go.mod
synzhu Aug 21, 2021
9c0c1c8
Update hierarchical_translator_test.go
synzhu Aug 21, 2021
24ec4a1
fix int
synzhu Aug 21, 2021
ca95f68
removing unstaked network
vishalchangrani Aug 23, 2021
9c6136a
wip
vishalchangrani Aug 23, 2021
416583a
Merge branch 'smnzhu/splitter-relay-engines' into vishal/remove_unsta…
vishalchangrani Aug 23, 2021
b432ad6
wip - integration tests for consensus follower
vishalchangrani Aug 23, 2021
dde1c4e
wip
vishalchangrani Aug 23, 2021
1c8bb50
wip
vishalchangrani Aug 24, 2021
867ee25
fixing unstaked_node_test integration test
vishalchangrani Aug 24, 2021
d674ac9
making the unstaked node test similar to the mvp test to allow block …
vishalchangrani Aug 24, 2021
80a4ad9
adding default size for cache
vishalchangrani Aug 24, 2021
6416191
moving builder.deriveBootstrapPeerIdentities to access_node_builder f…
vishalchangrani Aug 24, 2021
f53b07c
exposing libp2p port externally in integration test docker container …
vishalchangrani Aug 24, 2021
b9de82c
update node id logic
synzhu Aug 24, 2021
6622214
Add NetworkingIdentifierProider to staked node
synzhu Aug 24, 2021
dfcd821
Use hierarchical translator for unstaked node
synzhu Aug 24, 2021
03492e2
Add identity delta for staked and unstaked AN's
synzhu Aug 24, 2021
e2a315f
Implement proper message validators for unstaked network.
synzhu Aug 24, 2021
e979066
recover sync engine participants provider
synzhu Aug 24, 2021
f84d953
add sync provider to staked AN
synzhu Aug 24, 2021
1b56de4
implement reassignment of sync engine channel
synzhu Aug 24, 2021
823db49
[bootstrap] define & doc consts for magic numbers in decompression
huitseeker Aug 24, 2021
56bf8db
[bootstrap] Key Generation for Unstaked Access Nodes
huitseeker Aug 20, 2021
bef943c
using the unstaked networking key for the consesus follower
vishalchangrani Aug 24, 2021
ae3f743
chance consensus follower to use build function
synzhu Aug 24, 2021
c3f9992
Merge branch 'vishal/remove_unstaked_network' of https://github.com/o…
synzhu Aug 24, 2021
136066a
derive node id from key
synzhu Aug 24, 2021
05fe476
remove comment
synzhu Aug 24, 2021
46b0d35
remove topology
synzhu Aug 24, 2021
6cdbab3
fix node ID
synzhu Aug 24, 2021
44f8ae9
removing unneeded flags
vishalchangrani Aug 24, 2021
746be15
Merge branch 'vishal/remove_unstaked_network' of github.com:onflow/fl…
vishalchangrani Aug 24, 2021
84bd406
initializing middleware before initiliazing the sync follower for the…
vishalchangrani Aug 24, 2021
f8343bd
initial
synzhu Aug 12, 2021
a7ecbce
Update idTranslator.go
synzhu Aug 12, 2021
847e0f2
add dht peer lookup
synzhu Aug 17, 2021
9138a4f
Add dht lookup and default peerstore addresses
synzhu Aug 17, 2021
931b74f
undo topology changes
synzhu Aug 17, 2021
9d23c0b
add custom id translator
synzhu Aug 17, 2021
e82d2d2
validate ID translation
huitseeker Aug 18, 2021
c9f2e35
[network] make sure TestUnstakedTranslationRoundTrip runs 50 times
huitseeker Aug 18, 2021
4e76159
TODOs
synzhu Aug 18, 2021
3584488
Update peerManager_test.go
synzhu Aug 19, 2021
a2e7db0
Add new DHT test
synzhu Aug 20, 2021
f3c404a
fix test
synzhu Aug 20, 2021
2f6b4d2
Update testUtil.go
synzhu Aug 20, 2021
c391f1b
[network] Create a simple FixedTableIdentityTranslator, fix compilation
huitseeker Aug 20, 2021
1dc1970
Update libp2pNode_test.go
synzhu Aug 20, 2021
478342b
Last testUtil fixes
synzhu Aug 20, 2021
e19d348
add test for hierchical translator
synzhu Aug 20, 2021
a35e6ea
Update peerManager_test.go
synzhu Aug 20, 2021
cbc4be2
Address comments
synzhu Aug 21, 2021
1f0582f
protocol state provider test
synzhu Aug 21, 2021
c8bce78
updating origin ID checks to use idtranslator
synzhu Aug 21, 2021
b7f5a2a
new providers
synzhu Aug 21, 2021
082dfc2
add basic test for identity provider
huitseeker Aug 23, 2021
2a69f13
basic test for filtered identifier provider
huitseeker Aug 23, 2021
03f35df
add default identifier provider for sync engine
synzhu Aug 23, 2021
b22f4db
add comments
synzhu Aug 23, 2021
a7448d2
using a factory method for the SyncEngineParticipantsProvider to wait…
vishalchangrani Aug 24, 2021
574b4e0
define new subscription manager
synzhu Aug 24, 2021
37a648f
adding comment
vishalchangrani Aug 24, 2021
a5bdab4
Merge branch 'vishal/remove_unstaked_network' into smnzhu/staked-sync…
synzhu Aug 24, 2021
0efaf20
Update unstaked_access_node_builder.go
synzhu Aug 24, 2021
052cafa
Merge branch 'smnzhu/identity-provider' into vishal/remove_unstaked_n…
vishalchangrani Aug 24, 2021
08bf26b
wip
vishalchangrani Aug 24, 2021
589dfc2
reverting change to network/p2p/libp2pNode.go
vishalchangrani Aug 24, 2021
07b6705
added back conerteer network
synzhu Aug 25, 2021
9617911
define unsatked metrics
synzhu Aug 25, 2021
5d38cc4
[network] test for peerstore ID provider
huitseeker Aug 25, 2021
c8791ca
[network] Correct bug in ID validation
huitseeker Aug 25, 2021
3914d2c
Merge branch 'vishal/remove_unstaked_network' into smnzhu/staked-sync…
synzhu Aug 25, 2021
c0e1fda
add new network metrics
synzhu Aug 25, 2021
c51af41
Merge branch 'smnzhu/staked-sync-provider' into smnzhu/unstaked_metrics
synzhu Aug 25, 2021
305e462
update metrics
synzhu Aug 25, 2021
8cfccde
name namespaces and subsystems back to private
synzhu Aug 25, 2021
702b557
add idProvider argument to libp2pnodefactory
synzhu Aug 25, 2021
5024900
Merge pull request #1197 from onflow/smnzhu/unstaked_metrics
synzhu Aug 25, 2021
3416bf7
[network] Move UpdatableIDProvider to tests
huitseeker Aug 25, 2021
9208f00
add NotEjectedFilter to sync engine id provider
synzhu Aug 25, 2021
a5b836e
update assert.Subset to assert.ElementsMatch
synzhu Aug 25, 2021
ba13bc5
remove time.Sleep
synzhu Aug 25, 2021
2f0bd71
init
synzhu Aug 7, 2021
29cd93e
s
synzhu Aug 9, 2021
1306c29
Update unstaked_node_test.go
synzhu Aug 9, 2021
d5985ec
Add consensus follower to integration tests
synzhu Aug 9, 2021
bc16f5a
Update staked_access_node_builder.go
synzhu Aug 9, 2021
f19a1be
update to reflect new design
synzhu Aug 14, 2021
2acd775
Update staked_access_node_builder.go
synzhu Aug 14, 2021
bc7322c
Update consensus_follower.go
synzhu Aug 14, 2021
a2d490c
f
synzhu Aug 14, 2021
245526a
removing unstaked network
vishalchangrani Aug 23, 2021
a350740
wip
vishalchangrani Aug 23, 2021
bfb8bb2
wip - integration tests for consensus follower
vishalchangrani Aug 23, 2021
1e09384
wip
vishalchangrani Aug 23, 2021
318c2a4
wip
vishalchangrani Aug 24, 2021
76c2ccd
fixing unstaked_node_test integration test
vishalchangrani Aug 24, 2021
54520e2
making the unstaked node test similar to the mvp test to allow block …
vishalchangrani Aug 24, 2021
227bbb9
adding default size for cache
vishalchangrani Aug 24, 2021
2eaf3be
moving builder.deriveBootstrapPeerIdentities to access_node_builder f…
vishalchangrani Aug 24, 2021
e04819a
exposing libp2p port externally in integration test docker container …
vishalchangrani Aug 24, 2021
87bba78
update node id logic
synzhu Aug 24, 2021
77a3daf
Add NetworkingIdentifierProider to staked node
synzhu Aug 24, 2021
e5b2800
Use hierarchical translator for unstaked node
synzhu Aug 24, 2021
dad1ae9
Add identity delta for staked and unstaked AN's
synzhu Aug 24, 2021
923fe2d
Implement proper message validators for unstaked network.
synzhu Aug 24, 2021
2ac44b7
recover sync engine participants provider
synzhu Aug 24, 2021
f1cc3c4
using the unstaked networking key for the consesus follower
vishalchangrani Aug 24, 2021
290e0e8
chance consensus follower to use build function
synzhu Aug 24, 2021
15a20bb
derive node id from key
synzhu Aug 24, 2021
5c10daa
remove comment
synzhu Aug 24, 2021
1175e07
remove topology
synzhu Aug 24, 2021
5bef8e7
fix node ID
synzhu Aug 24, 2021
2464eb4
removing unneeded flags
vishalchangrani Aug 24, 2021
b906c04
initializing middleware before initiliazing the sync follower for the…
vishalchangrani Aug 24, 2021
d3f6845
using a factory method for the SyncEngineParticipantsProvider to wait…
vishalchangrani Aug 24, 2021
c2d6f7c
adding comment
vishalchangrani Aug 24, 2021
37300b2
wip
vishalchangrani Aug 24, 2021
45c2d3e
go mod tidy & lints
vishalchangrani Aug 24, 2021
26c56c2
fix integration tests compilation
huitseeker Aug 25, 2021
db95a99
fixing scaffold
vishalchangrani Aug 26, 2021
078346e
fix flakiness in peerstore provider test
synzhu Aug 26, 2021
8170978
Fix flaky middleware test
synzhu Aug 26, 2021
8955307
Merge branch 'vishal/remove_unstaked_network' of https://github.com/o…
synzhu Aug 26, 2021
5826a23
Revert "Merge branch 'vishal/remove_unstaked_network' of https://gith…
synzhu Aug 26, 2021
9c8b0de
Merge branch 'vishal/remove_unstaked_network' into smnzhu/staked-sync…
synzhu Aug 26, 2021
6a7160f
Disable queueing missing heights for unstaked sync provider
synzhu Aug 26, 2021
ce8642f
Update subscriptionFilter_test.go
synzhu Aug 26, 2021
3e39e10
Create subscription filter
synzhu Aug 26, 2021
ad94d71
Add subscription filter to pubsub options
synzhu Aug 26, 2021
8258159
initial
synzhu Aug 12, 2021
d0bb4a5
Update idTranslator.go
synzhu Aug 12, 2021
130a859
add dht peer lookup
synzhu Aug 17, 2021
e2529aa
Add dht lookup and default peerstore addresses
synzhu Aug 17, 2021
b81c627
undo topology changes
synzhu Aug 17, 2021
1647999
add custom id translator
synzhu Aug 17, 2021
098c071
validate ID translation
huitseeker Aug 18, 2021
066e188
[network] make sure TestUnstakedTranslationRoundTrip runs 50 times
huitseeker Aug 18, 2021
13507d7
TODOs
synzhu Aug 18, 2021
22eb78e
Update peerManager_test.go
synzhu Aug 19, 2021
95844b9
Add new DHT test
synzhu Aug 20, 2021
0b4a6af
fix test
synzhu Aug 20, 2021
f846241
Update testUtil.go
synzhu Aug 20, 2021
3640594
[network] Create a simple FixedTableIdentityTranslator, fix compilation
huitseeker Aug 20, 2021
65f2852
Update libp2pNode_test.go
synzhu Aug 20, 2021
92e13fc
Last testUtil fixes
synzhu Aug 20, 2021
b3ab250
add test for hierchical translator
synzhu Aug 20, 2021
48b303e
Update peerManager_test.go
synzhu Aug 20, 2021
02f7302
Address comments
synzhu Aug 21, 2021
959e6f9
protocol state provider test
synzhu Aug 21, 2021
d4b2e96
updating origin ID checks to use idtranslator
synzhu Aug 21, 2021
3535355
new providers
synzhu Aug 21, 2021
d35b2ba
add basic test for identity provider
huitseeker Aug 23, 2021
957dbbf
basic test for filtered identifier provider
huitseeker Aug 23, 2021
60f6591
add default identifier provider for sync engine
synzhu Aug 23, 2021
5b87c52
add comments
synzhu Aug 23, 2021
787fedb
[network] test for peerstore ID provider
huitseeker Aug 25, 2021
3d7b119
[network] Correct bug in ID validation
huitseeker Aug 25, 2021
98bb012
[network] Move UpdatableIDProvider to tests
huitseeker Aug 25, 2021
f0db946
add NotEjectedFilter to sync engine id provider
synzhu Aug 25, 2021
4915975
update assert.Subset to assert.ElementsMatch
synzhu Aug 25, 2021
c8c15a9
remove time.Sleep
synzhu Aug 25, 2021
a9c5340
fix flakiness in peerstore provider test
synzhu Aug 26, 2021
9c234dc
Fix flaky middleware test
synzhu Aug 26, 2021
f5622b9
init
synzhu Aug 7, 2021
2bcf3f9
Update unstaked_node_test.go
synzhu Aug 9, 2021
b314b96
Add consensus follower to integration tests
synzhu Aug 9, 2021
9e3e807
Update staked_access_node_builder.go
synzhu Aug 9, 2021
593fa50
update to reflect new design
synzhu Aug 14, 2021
8c90b8d
Update staked_access_node_builder.go
synzhu Aug 14, 2021
135faa0
Update consensus_follower.go
synzhu Aug 14, 2021
91ead41
removing unstaked network
vishalchangrani Aug 23, 2021
57393f6
fixing unstaked_node_test integration test
vishalchangrani Aug 24, 2021
74a59e8
making the unstaked node test similar to the mvp test to allow block …
vishalchangrani Aug 24, 2021
5adf91e
adding default size for cache
vishalchangrani Aug 24, 2021
4aa6892
moving builder.deriveBootstrapPeerIdentities to access_node_builder f…
vishalchangrani Aug 24, 2021
3f2ac35
exposing libp2p port externally in integration test docker container …
vishalchangrani Aug 24, 2021
0eec24f
update node id logic
synzhu Aug 24, 2021
c039b5b
Add NetworkingIdentifierProider to staked node
synzhu Aug 24, 2021
9b64426
Use hierarchical translator for unstaked node
synzhu Aug 24, 2021
68a906e
Add identity delta for staked and unstaked AN's
synzhu Aug 24, 2021
5e6db19
Implement proper message validators for unstaked network.
synzhu Aug 24, 2021
e893893
recover sync engine participants provider
synzhu Aug 24, 2021
5047a72
using the unstaked networking key for the consesus follower
vishalchangrani Aug 24, 2021
7ff21f9
chance consensus follower to use build function
synzhu Aug 24, 2021
358eb5c
derive node id from key
synzhu Aug 24, 2021
6930416
remove comment
synzhu Aug 24, 2021
f4bc94e
remove topology
synzhu Aug 24, 2021
ca47233
fix node ID
synzhu Aug 24, 2021
8a8e0a1
removing unneeded flags
vishalchangrani Aug 24, 2021
14b1909
initializing middleware before initiliazing the sync follower for the…
vishalchangrani Aug 24, 2021
d9f04e0
using a factory method for the SyncEngineParticipantsProvider to wait…
vishalchangrani Aug 24, 2021
0f7f6c8
adding comment
vishalchangrani Aug 24, 2021
7a25e93
go mod tidy & lints
vishalchangrani Aug 24, 2021
9689cbb
fix integration tests compilation
huitseeker Aug 25, 2021
a6c7043
fixing scaffold
vishalchangrani Aug 26, 2021
c3f87e4
adding assertions to the TestReceiveBlocks consumer follower test
vishalchangrani Aug 26, 2021
8d050bc
Add subscription filter test
synzhu Aug 26, 2021
322f1d3
Merge branch 'smnzhu/identity-provider' into smnzhu/subscription-filter
synzhu Aug 26, 2021
588c262
fix connectionmanager compilation errors
synzhu Aug 26, 2021
b9411ed
Merge branch 'smnzhu/staked-sync-provider' into smnzhu/subscription-f…
synzhu Aug 26, 2021
700d619
Merge branch 'vishal/subscription_validator' into smnzhu/staked-sync-…
synzhu Aug 26, 2021
0e2622d
Merge branch 'smnzhu/staked-sync-provider' into smnzhu/subscription-f…
synzhu Aug 26, 2021
3609042
Remove unneeded files
synzhu Aug 26, 2021
4fa43f4
Merge branch 'vishal/remove_unstaked_network' into smnzhu/staked-sync…
synzhu Aug 26, 2021
a1080e9
Merge branch 'smnzhu/staked-sync-provider' into smnzhu/subscription-f…
synzhu Aug 26, 2021
2c45578
Add todos, remove string
synzhu Aug 26, 2021
4b698ca
Update connmanager
synzhu Aug 26, 2021
9c84a7d
Merge branch 'smnzhu/staked-sync-provider' into smnzhu/subscription-f…
synzhu Aug 26, 2021
332979d
do things
synzhu Aug 31, 2021
514e17f
Merge branch 'master' into smnzhu/subscription-filter
synzhu Sep 10, 2021
d3d45e7
bleh
synzhu Sep 10, 2021
414d36c
Tests
synzhu Sep 10, 2021
647d753
Update subscription_filter_test.go
synzhu Sep 10, 2021
9d542c4
address comments
synzhu Sep 17, 2021
25968e0
Disable for ghost nodes
synzhu Sep 17, 2021
ac49207
address comment
synzhu Sep 17, 2021
2200ae1
Merge branch 'master' into smnzhu/subscription-filter
synzhu Sep 17, 2021
fac03b3
Update subscription_filter_test.go
synzhu Sep 17, 2021
4454c21
renamne
synzhu Sep 17, 2021
a1bf678
Update subscription_filter_test.go
synzhu Sep 17, 2021
a8e0f9e
Update staked_access_node_builder.go
synzhu Sep 17, 2021
bd78b01
Update unstaked_access_node_builder.go
synzhu Sep 17, 2021
b6ccebc
fix bug
synzhu Sep 17, 2021
92328e6
Update channels_test.go
synzhu Sep 17, 2021
8f3a0d9
Update channels_test.go
synzhu Sep 17, 2021
39a1712
Update channels.go
synzhu Sep 17, 2021
feb5c01
Address comments
synzhu Sep 20, 2021
9e81a81
update channels
synzhu Sep 20, 2021
9e2e5bd
fix bug
synzhu Sep 21, 2021
9ad681c
Update unique channels test
synzhu Sep 22, 2021
9da07b2
Merge branch 'master' into smnzhu/subscription-filter
synzhu Sep 22, 2021
2165ba5
Merge branch 'master' into smnzhu/subscription-filter
synzhu Sep 22, 2021
f93c508
address comments
synzhu Sep 24, 2021
131298e
Merge branch 'master' into smnzhu/subscription-filter
synzhu Sep 24, 2021
6c9813f
Refactor channels code and subscription filter
synzhu Sep 24, 2021
568a63e
fix build errors
synzhu Sep 24, 2021
d0eebc8
fix build error
synzhu Sep 24, 2021
e31a0c2
fix tests
synzhu Sep 24, 2021
3dab281
Update libp2pNode.go
synzhu Sep 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions cmd/access/node_builder/staked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"

"github.com/libp2p/go-libp2p-core/host"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/crypto"
Expand Down Expand Up @@ -100,11 +102,11 @@ func (anb *StakedAccessNodeBuilder) Build() AccessNodeBuilder {

anb.
Component("unstaked sync request proxy", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
proxyEngine = splitter.New(node.Logger, engine.UnstakedSyncCommittee)
proxyEngine = splitter.New(node.Logger, engine.PublicSyncCommittee)

// register the proxy engine with the unstaked network
var err error
unstakedNetworkConduit, err = node.Network.Register(engine.UnstakedSyncCommittee, proxyEngine)
unstakedNetworkConduit, err = node.Network.Register(engine.PublicSyncCommittee, proxyEngine)
if err != nil {
return nil, fmt.Errorf("could not register unstaked sync request proxy: %w", err)
}
Expand Down Expand Up @@ -214,13 +216,19 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
}

return func() (*p2p.Node, error) {
psOpts := p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
h.ID(), builder.RootBlock.ID(), builder.RootChainID, builder.IdentityProvider,
)), nil
})
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, myAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID().String()).
SetRootBlockID(builder.RootBlock.ID()).
// no connection gater
SetConnectionManager(connManager).
// act as a DHT server
SetDHTOptions(dhtOptions...).
SetPubsubOptions(p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)...).
SetPubsubOptions(psOpts...).
SetLogger(builder.Logger).
SetResolver(resolver).
Build(ctx)
Expand All @@ -247,7 +255,7 @@ func (builder *StakedAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
factoryFunc,
nodeID,
networkMetrics,
builder.RootBlock.ID().String(),
builder.RootBlock.ID(),
p2p.DefaultUnicastTimeout,
false, // no connection gating to allow unstaked nodes to connect
builder.IDTranslator,
Expand Down
6 changes: 3 additions & 3 deletions cmd/access/node_builder/unstaked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,

return func() (*p2p.Node, error) {
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, builder.BaseConfig.BindAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID().String()).
SetRootBlockID(builder.RootBlock.ID()).
SetConnectionManager(connManager).
// unlike the staked side of the network where currently all the node addresses are known upfront,
// for the unstaked side of the network, the nodes need to discover each other using DHT Discovery.
Expand Down Expand Up @@ -249,7 +249,7 @@ func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Con
return nil, err
}

anb.Network = converter.NewNetwork(network, engine.SyncCommittee, engine.UnstakedSyncCommittee)
anb.Network = converter.NewNetwork(network, engine.SyncCommittee, engine.PublicSyncCommittee)

anb.Logger.Info().Msgf("network will run on address: %s", anb.BindAddr)

Expand Down Expand Up @@ -284,7 +284,7 @@ func (anb *UnstakedAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
factoryFunc,
nodeID,
networkMetrics,
anb.RootBlock.ID().String(),
anb.RootBlock.ID(),
p2p.DefaultUnicastTimeout,
false, // no connection gating for the unstaked nodes
anb.IDTranslator,
Expand Down
6 changes: 4 additions & 2 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
fnb.Me.NodeID(),
myAddr,
fnb.NetworkKey,
fnb.RootBlock.ID().String(),
fnb.RootBlock.ID(),
fnb.RootChainID,
fnb.IdentityProvider,
p2p.DefaultMaxPubSubMsgSize,
fnb.Metrics.Network,
pingProvider,
Expand All @@ -204,7 +206,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
libP2PNodeFactory,
fnb.Me.NodeID(),
fnb.Metrics.Network,
fnb.RootBlock.ID().String(),
fnb.RootBlock.ID(),
fnb.BaseConfig.UnicastMessageTimeout,
true,
fnb.IDTranslator,
Expand Down
49 changes: 26 additions & 23 deletions engine/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ var channelRoleMap map[network.Channel]flow.RoleList

// RolesByChannel returns list of flow roles involved in the channel.
func RolesByChannel(channel network.Channel) (flow.RoleList, bool) {
if clusterChannel, isCluster := ClusterChannel(channel); isCluster {
// replaces channel with the stripped-off prefix
channel = clusterChannel
if _, isCluster := ClusterChannel(channel); isCluster {
return ClusterChannelRoles(), true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the general idea of defining these filter functions (e.g., ClusterChannelRoles), however, I would still recommend keeping the information-centric architecture, i.e., having channelRoleMap as the source-of-truth and then defining these filter functions on top of it, e.g.,

func ClusterChannelRoles() flow.RoleList {
	clusterRoles := flow.RoleList{}
	for channel, roles := range channelRoleMap{
		if _, ok := ClusterChannel(channel); ok{
			clusterRoles.Union(roles)
		}
	}
	
	return clusterRoles
}

By keeping the design information-centric around channelRoleMap instead of ad-hoc functions with hard-coded parameters, we preserve the safety of extensibility in a longer-term outlook.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, but channelRoleMap doesn't make sense for cluster-based channels imo because they are unique to the chain ID. ie, the value stored in channelRoleMap is not actually a "real" channel

Copy link
Contributor Author

@synzhu synzhu Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the real solution is to remove ChannelSyncCluster() and ChannelConsensusCluster entirely, and simply redefine TopicFromChannel to take in a chain ID in addition to a RootBlockID instead. Or to go even further, we can define a closure or a struct ChannelToTopicConverter which does the conversions.

Then we'd be able to rename syncClusterPrefix to simply SyncCluster (similary for consensus) and add it to the channel role map.

}
roles, ok := channelRoleMap[channel]
return roles, ok
}

// Exists returns true if channel exists in channelRoleMap.
// At the current state, any developer-defined channel should be added
// to channelRoleMap as a constant channel type manually.
// Exists returns true if the channel exists.
func Exists(channel network.Channel) bool {
_, exists := RolesByChannel(channel)
return exists || UnstakedChannels().Contains(channel)
if _, ok := RolesByChannel(channel); !ok {
if _, isClusterChannel := ClusterChannel(channel); !isClusterChannel {
return PublicChannels().Contains(channel)
}
}

return true
synzhu marked this conversation as resolved.
Show resolved Hide resolved
}

// ChannelsByRole returns a list of all channels the role subscribes to.
Expand Down Expand Up @@ -63,10 +65,9 @@ func UniqueChannels(channels network.ChannelList) network.ChannelList {
// has already been added to uniques.
// We use identifier of RoleList to determine its uniqueness.
for _, channel := range channels {
id := channelRoleMap[channel].ID()

// non-cluster channel deduplicated based identifier of role list
if _, cluster := ClusterChannel(channel); !cluster {
id := channelRoleMap[channel].ID()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ID() method is quite resource-heavy, and that is why we kept it out of the loop there. I could not find any reason we need to recompute it over each iteration. If that is the case, please move it to its old place out of the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old place is already inside the loop 🧐

I have only placed it inside the if, which is only saving computation because it will be computed less often.

if _, ok := added[id]; ok {
// a channel with same RoleList already added, hence skips
continue
Expand All @@ -90,10 +91,10 @@ func Channels() network.ChannelList {
return channels
}

// UnstakedChannels returns all channels that unstaked nodes can send messages on.
func UnstakedChannels() network.ChannelList {
// PublicChannels returns all channels that on the public network.
synzhu marked this conversation as resolved.
Show resolved Hide resolved
func PublicChannels() network.ChannelList {
return network.ChannelList{
UnstakedSyncCommittee,
PublicSyncCommittee,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my earlier comment about keeping channelRoleMap as the source of truth, I would suggest not to hard code these in the body of functions. Instead, we can have a naming convention that all channels starting with public are public channels, and then do:

// all public channels should start with "public-" prefix.
const publicChannelPrefix = "public-"

// PublicChannels returns all channels that on the public network.
func PublicChannels() network.ChannelList {
	publicChannels := network.ChannelList{}
	for channel := range channelRoleMap {
		if strings.HasPrefix(channel.String(), publicChannelPrefix) {
			publicChannels = append(publicChannels, channel)
		}
	}
	return publicChannels
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo, channelRoleMap is only relevant to the channels where roles matter, ie private channels.

Public channels can be joined by anyone, regardless of role (and even unstaked nodes who don't have a role whatsoever), so I think it makes sense to treat them separately here.

}
}

Expand Down Expand Up @@ -141,8 +142,8 @@ const (
ProvideReceiptsByBlockID = RequestReceiptsByBlockID
ProvideApprovalsByChunk = RequestApprovalsByChunk

// Unstaked network channels
UnstakedSyncCommittee = network.Channel("unstaked-sync-committee")
// Public network channels
PublicSyncCommittee = network.Channel("public-sync-committee")
)

// initializeChannelRoleMap initializes an instance of channelRoleMap and populates it with the channels and their
Expand All @@ -161,7 +162,7 @@ func initializeChannelRoleMap() {
channelRoleMap[ConsensusCommittee] = flow.RoleList{flow.RoleConsensus}

// Channels for protocols actively synchronizing state across nodes
channelRoleMap[SyncCommittee] = flow.RoleList{flow.RoleConsensus}
channelRoleMap[SyncCommittee] = flow.Roles()
channelRoleMap[SyncExecution] = flow.RoleList{flow.RoleExecution}

// Channels for DKG communication
Expand All @@ -177,7 +178,7 @@ func initializeChannelRoleMap() {
channelRoleMap[PushApprovals] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}

// Channels for actively requesting missing entities
channelRoleMap[RequestCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution}
channelRoleMap[RequestCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution, flow.RoleAccess}
channelRoleMap[RequestChunks] = flow.RoleList{flow.RoleExecution, flow.RoleVerification}
channelRoleMap[RequestReceiptsByBlockID] = flow.RoleList{flow.RoleConsensus, flow.RoleExecution}
channelRoleMap[RequestApprovalsByChunk] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}
Expand All @@ -190,13 +191,15 @@ func initializeChannelRoleMap() {
flow.RoleAccess}
channelRoleMap[ReceiveApprovals] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}

channelRoleMap[ProvideCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution}
channelRoleMap[ProvideCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution, flow.RoleAccess}
channelRoleMap[ProvideChunks] = flow.RoleList{flow.RoleExecution, flow.RoleVerification}
channelRoleMap[ProvideReceiptsByBlockID] = flow.RoleList{flow.RoleConsensus, flow.RoleExecution}
channelRoleMap[ProvideApprovalsByChunk] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification}
}

channelRoleMap[syncClusterPrefix] = flow.RoleList{flow.RoleCollection}
channelRoleMap[consensusClusterPrefix] = flow.RoleList{flow.RoleCollection}
synzhu marked this conversation as resolved.
Show resolved Hide resolved
// ClusterChannelRoles returns the list of roles that are involved in cluster-based channels.
func ClusterChannelRoles() flow.RoleList {
return flow.RoleList{flow.RoleCollection}
}

// ClusterChannel returns true if channel is cluster-based.
Expand All @@ -217,13 +220,13 @@ func ClusterChannel(channel network.Channel) (network.Channel, bool) {
// TopicFromChannel returns the unique LibP2P topic form the channel.
// The channel is made up of name string suffixed with root block id.
// The root block id is used to prevent cross talks between nodes on different sporks.
func TopicFromChannel(channel network.Channel, rootBlockID string) network.Topic {
func TopicFromChannel(channel network.Channel, rootBlockID flow.Identifier) network.Topic {
// skip root block suffix, if this is a cluster specific channel. A cluster specific channel is inherently
// unique for each epoch
if strings.HasPrefix(channel.String(), syncClusterPrefix.String()) || strings.HasPrefix(string(channel), consensusClusterPrefix.String()) {
if _, isClusterChannel := ClusterChannel(channel); isClusterChannel {
return network.Topic(channel)
}
return network.Topic(fmt.Sprintf("%s/%s", string(channel), rootBlockID))
return network.Topic(fmt.Sprintf("%s/%s", string(channel), rootBlockID.String()))
}

// ChannelConsensusCluster returns a dynamic cluster consensus channel based on
Expand Down
12 changes: 8 additions & 4 deletions engine/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ func TestGetChannelByRole(t *testing.T) {
// - TestMetric
// the roles list should contain collection and consensus roles
topics := ChannelsByRole(flow.RoleVerification)
assert.Len(t, topics, 7)
assert.Len(t, topics, 8)
assert.Contains(t, topics, PushBlocks)
assert.Contains(t, topics, PushReceipts)
assert.Contains(t, topics, PushApprovals)
assert.Contains(t, topics, ProvideApprovalsByChunk)
assert.Contains(t, topics, RequestChunks)
assert.Contains(t, topics, TestMetrics)
assert.Contains(t, topics, TestNetwork)
assert.Contains(t, topics, SyncCommittee)
}

// TestIsClusterChannel verifies the correctness of ClusterChannel method
Expand Down Expand Up @@ -121,10 +122,13 @@ func TestUniqueChannels_Uniqueness(t *testing.T) {
// We use the identifier of RoleList to determine their uniqueness.
func TestUniqueChannels_ClusterChannels(t *testing.T) {
channels := ChannelsByRole(flow.RoleCollection)
consensusCluster := ChannelConsensusCluster(flow.Emulator)
syncCluster := ChannelSyncCluster(flow.Emulator)
channels = append(channels, consensusCluster, syncCluster)
uniques := UniqueChannels(channels)
// collection role has two cluster and one non-cluster channels all with the same RoleList.
// Hence all of them should be returned as unique channels.
require.Contains(t, uniques, syncClusterPrefix) // cluster channel
require.Contains(t, uniques, consensusClusterPrefix) // cluster channel
require.Contains(t, uniques, PushTransactions) // non-cluster channel
require.Contains(t, uniques, syncCluster) // cluster channel
require.Contains(t, uniques, consensusCluster) // cluster channel
require.Contains(t, uniques, PushTransactions) // non-cluster channel
}
2 changes: 1 addition & 1 deletion model/flow/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *Role) UnmarshalText(text []byte) error {
return err
}

func Roles() []Role {
func Roles() RoleList {
return []Role{RoleCollection, RoleConsensus, RoleExecution, RoleVerification, RoleAccess}
}

Expand Down
30 changes: 18 additions & 12 deletions network/p2p/libp2pNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
fcrypto "github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/id"
flownet "github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/message"
"github.com/onflow/flow-go/network/p2p/dns"
Expand Down Expand Up @@ -65,7 +66,9 @@ func DefaultLibP2PNodeFactory(ctx context.Context,
me flow.Identifier,
address string,
flowKey fcrypto.PrivateKey,
rootBlockID string,
rootBlockID flow.Identifier,
chainID flow.ChainID,
idProvider id.IdentityProvider,
maxPubSubMsgSize int,
metrics module.NetworkMetrics,
pingInfoProvider PingInfoProvider,
Expand All @@ -80,12 +83,21 @@ func DefaultLibP2PNodeFactory(ctx context.Context,
return nil, fmt.Errorf("could not create dns resolver: %w", err)
}

psOpts := DefaultPubsubOptions(maxPubSubMsgSize)
if chainID == flow.Mainnet {
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(NewRoleBasedFilter(
h.ID(), rootBlockID, chainID, idProvider,
)), nil
})
}

return func() (*Node, error) {
return NewDefaultLibP2PNodeBuilder(me, address, flowKey).
SetRootBlockID(rootBlockID).
SetConnectionGater(connGater).
SetConnectionManager(connManager).
SetPubsubOptions(DefaultPubsubOptions(maxPubSubMsgSize)...).
SetPubsubOptions(psOpts...).
SetPingInfoProvider(pingInfoProvider).
SetLogger(log).
SetResolver(resolver).
Expand All @@ -94,7 +106,7 @@ func DefaultLibP2PNodeFactory(ctx context.Context,
}

type NodeBuilder interface {
SetRootBlockID(string) NodeBuilder
SetRootBlockID(flow.Identifier) NodeBuilder
SetConnectionManager(TagLessConnManager) NodeBuilder
SetConnectionGater(*ConnGater) NodeBuilder
SetPubsubOptions(...PubsubOption) NodeBuilder
Expand All @@ -108,7 +120,7 @@ type NodeBuilder interface {

type DefaultLibP2PNodeBuilder struct {
id flow.Identifier
rootBlockID string
rootBlockID flow.Identifier
logger zerolog.Logger
connGater *ConnGater
connMngr TagLessConnManager
Expand Down Expand Up @@ -144,7 +156,7 @@ func (builder *DefaultLibP2PNodeBuilder) SetTopicValidation(enabled bool) NodeBu
return builder
}

func (builder *DefaultLibP2PNodeBuilder) SetRootBlockID(rootBlockId string) NodeBuilder {
func (builder *DefaultLibP2PNodeBuilder) SetRootBlockID(rootBlockId flow.Identifier) NodeBuilder {
builder.rootBlockID = rootBlockId
return builder
}
Expand Down Expand Up @@ -196,7 +208,7 @@ func (builder *DefaultLibP2PNodeBuilder) Build(ctx context.Context) (*Node, erro
return nil, errors.New("unable to create libp2p pubsub: factory function not provided")
}

if builder.rootBlockID == "" {
if builder.rootBlockID == flow.ZeroID {
synzhu marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("root block ID must be provided")
}
node.flowLibP2PProtocolID = generateFlowProtocolID(builder.rootBlockID)
Expand All @@ -213,11 +225,6 @@ func (builder *DefaultLibP2PNodeBuilder) Build(ctx context.Context) (*Node, erro
node.connMgr = builder.connMngr
}

if builder.rootBlockID == "" {
return nil, errors.New("root block ID must be provided")
}
node.flowLibP2PProtocolID = generateFlowProtocolID(builder.rootBlockID)

if builder.pingInfoProvider != nil {
opts = append(opts, libp2p.Ping(true))
}
Expand Down Expand Up @@ -729,7 +736,6 @@ func DefaultPubsubOptions(maxPubSubMsgSize int) []PubsubOption {
pubSubOptionFunc(pubsub.WithStrictSignatureVerification(true)),
// set max message size limit for 1-k PubSub messaging
pubSubOptionFunc(pubsub.WithMaxMessageSize(maxPubSubMsgSize)),
// no discovery
synzhu marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
4 changes: 2 additions & 2 deletions network/p2p/libp2pNode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const tickForAssertEventually = 100 * time.Millisecond
// "0.0.0.0:<selected-port-by-os>
const defaultAddress = "0.0.0.0:0"

var rootBlockID = unittest.IdentifierFixture().String()
var rootBlockID = unittest.IdentifierFixture()

type LibP2PNodeTestSuite struct {
suite.Suite
Expand Down Expand Up @@ -685,7 +685,7 @@ func (suite *LibP2PNodeTestSuite) NodesFixture(count int, handler func(t *testin

// NodeFixture creates a single LibP2PNodes with the given key, root block id, and callback function for stream handling.
// It returns the nodes and their identities.
func NodeFixture(t *testing.T, log zerolog.Logger, key fcrypto.PrivateKey, rootID string, handler func(t *testing.T) network.StreamHandler, allowList bool, address string) (*Node, flow.Identity) {
func NodeFixture(t *testing.T, log zerolog.Logger, key fcrypto.PrivateKey, rootID flow.Identifier, handler func(t *testing.T) network.StreamHandler, allowList bool, address string) (*Node, flow.Identity) {

identity := unittest.IdentityFixture(unittest.WithNetworkingKey(key.PublicKey()), unittest.WithAddress(address))

Expand Down
8 changes: 4 additions & 4 deletions network/p2p/libp2pUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ func IPPortFromMultiAddress(addrs ...multiaddr.Multiaddr) (string, string, error
return "", "", fmt.Errorf("ip address or hostname not found")
}

func generateFlowProtocolID(rootBlockID string) protocol.ID {
return protocol.ID(FlowLibP2POneToOneProtocolIDPrefix + rootBlockID)
func generateFlowProtocolID(rootBlockID flow.Identifier) protocol.ID {
return protocol.ID(FlowLibP2POneToOneProtocolIDPrefix + rootBlockID.String())
}

func generatePingProtcolID(rootBlockID string) protocol.ID {
return protocol.ID(FlowLibP2PPingProtocolPrefix + rootBlockID)
func generatePingProtcolID(rootBlockID flow.Identifier) protocol.ID {
return protocol.ID(FlowLibP2PPingProtocolPrefix + rootBlockID.String())
}

// PeerAddressInfo generates the libp2p peer.AddrInfo for the given Flow.Identity.
Expand Down
Loading