-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Subscription filter #1210
Subscription filter #1210
Changes from all commits
01eef9e
2f7b178
112c27d
96cf07f
93a573a
31c46bf
9dd6d5e
c7b4ba6
47c6ad8
7a994ce
9c0c1c8
24ec4a1
ca95f68
9c6136a
416583a
b432ad6
dde1c4e
1c8bb50
867ee25
d674ac9
80a4ad9
6416191
f53b07c
b9de82c
6622214
dfcd821
03492e2
e2a315f
e979066
f84d953
1b56de4
823db49
56bf8db
bef943c
ae3f743
c3f9992
136066a
05fe476
46b0d35
6cdbab3
44f8ae9
746be15
84bd406
f8343bd
a7ecbce
847e0f2
9138a4f
931b74f
9d23c0b
e82d2d2
c9f2e35
4e76159
3584488
a2e7db0
f3c404a
2f6b4d2
c391f1b
1dc1970
478342b
e19d348
a35e6ea
cbc4be2
1f0582f
c8bce78
b7f5a2a
082dfc2
2a69f13
03f35df
b22f4db
a7448d2
574b4e0
37a648f
a5bdab4
0efaf20
052cafa
08bf26b
589dfc2
07b6705
9617911
5d38cc4
c8791ca
3914d2c
c0e1fda
c51af41
305e462
8cfccde
702b557
5024900
3416bf7
9208f00
a5b836e
ba13bc5
2f0bd71
29cd93e
1306c29
d5985ec
bc16f5a
f19a1be
2acd775
bc7322c
a2d490c
245526a
a350740
bfb8bb2
1e09384
318c2a4
76c2ccd
54520e2
227bbb9
2eaf3be
e04819a
87bba78
77a3daf
e5b2800
dad1ae9
923fe2d
2ac44b7
f1cc3c4
290e0e8
15a20bb
5c10daa
1175e07
5bef8e7
2464eb4
b906c04
d3f6845
c2d6f7c
37300b2
45c2d3e
26c56c2
db95a99
078346e
8170978
8955307
5826a23
9c8b0de
6a7160f
ce8642f
3e39e10
ad94d71
8258159
d0bb4a5
130a859
e2529aa
b81c627
1647999
098c071
066e188
13507d7
22eb78e
95844b9
0b4a6af
f846241
3640594
65f2852
92e13fc
b3ab250
48b303e
02f7302
959e6f9
d4b2e96
3535355
d35b2ba
957dbbf
60f6591
5b87c52
787fedb
3d7b119
98bb012
f0db946
4915975
c8c15a9
a9c5340
9c234dc
f5622b9
2bcf3f9
b314b96
9e3e807
593fa50
8c90b8d
135faa0
91ead41
57393f6
74a59e8
5adf91e
4aa6892
3f2ac35
0eec24f
c039b5b
9b64426
68a906e
5e6db19
e893893
5047a72
7ff21f9
358eb5c
6930416
f4bc94e
ca47233
8a8e0a1
14b1909
d9f04e0
0f7f6c8
7a25e93
9689cbb
a6c7043
c3f87e4
8d050bc
322f1d3
588c262
b9411ed
700d619
0e2622d
3609042
4fa43f4
a1080e9
2c45578
4b698ca
9c84a7d
332979d
514e17f
d3d45e7
414d36c
647d753
9d542c4
25968e0
ac49207
2200ae1
fac03b3
4454c21
a1bf678
a8e0f9e
bd78b01
b6ccebc
92328e6
8f3a0d9
39a1712
feb5c01
9e81a81
9e2e5bd
9ad681c
9da07b2
2165ba5
f93c508
131298e
6c9813f
568a63e
d0eebc8
e31a0c2
3dab281
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,33 +11,41 @@ import ( | |
) | ||
|
||
// init is called first time this package is imported. | ||
// It creates and initializes the channelRoleMap map. | ||
// It creates and initializes channelRoleMap and clusterChannelPrefixRoleMap. | ||
func init() { | ||
initializeChannelRoleMap() | ||
} | ||
|
||
// channelRoleMap keeps a map between channels and the list of flow roles involved in them. | ||
var channelRoleMap map[network.Channel]flow.RoleList | ||
|
||
// clusterChannelPrefixRoleMap keeps a map between cluster channel prefixes and the list of flow roles involved in them. | ||
var clusterChannelPrefixRoleMap map[string]flow.RoleList | ||
|
||
// RolesByChannel returns list of flow roles involved in the channel. | ||
// If the given channel is a public channel, the returned list will | ||
// contain all roles. | ||
func RolesByChannel(channel network.Channel) (flow.RoleList, bool) { | ||
if clusterChannel, isCluster := ClusterChannel(channel); isCluster { | ||
// replaces channel with the stripped-off prefix | ||
channel = clusterChannel | ||
if IsClusterChannel(channel) { | ||
return ClusterChannelRoles(channel), true | ||
} | ||
if PublicChannels().Contains(channel) { | ||
return flow.Roles(), true | ||
} | ||
roles, ok := channelRoleMap[channel] | ||
return roles, ok | ||
} | ||
|
||
// Exists returns true if channel exists in channelRoleMap. | ||
// At the current state, any developer-defined channel should be added | ||
// to channelRoleMap as a constant channel type manually. | ||
// Exists returns true if the channel exists. | ||
func Exists(channel network.Channel) bool { | ||
_, exists := RolesByChannel(channel) | ||
return exists || UnstakedChannels().Contains(channel) | ||
if _, ok := RolesByChannel(channel); ok { | ||
return true | ||
} | ||
|
||
return false | ||
} | ||
|
||
// ChannelsByRole returns a list of all channels the role subscribes to. | ||
// ChannelsByRole returns a list of all channels the role subscribes to (except cluster-based channels and public channels). | ||
func ChannelsByRole(role flow.Role) network.ChannelList { | ||
channels := make(network.ChannelList, 0) | ||
for channel, roles := range channelRoleMap { | ||
|
@@ -63,10 +71,9 @@ func UniqueChannels(channels network.ChannelList) network.ChannelList { | |
// has already been added to uniques. | ||
// We use identifier of RoleList to determine its uniqueness. | ||
for _, channel := range channels { | ||
id := channelRoleMap[channel].ID() | ||
|
||
// non-cluster channel deduplicated based identifier of role list | ||
if _, cluster := ClusterChannel(channel); !cluster { | ||
if !IsClusterChannel(channel) { | ||
id := channelRoleMap[channel].ID() | ||
if _, ok := added[id]; ok { | ||
// a channel with same RoleList already added, hence skips | ||
continue | ||
|
@@ -80,20 +87,21 @@ func UniqueChannels(channels network.ChannelList) network.ChannelList { | |
return uniques | ||
} | ||
|
||
// Channels returns all channels that nodes of any role have subscribed to. | ||
// Channels returns all channels that nodes of any role have subscribed to (except cluster-based channels). | ||
func Channels() network.ChannelList { | ||
channels := make(network.ChannelList, 0) | ||
for channel := range channelRoleMap { | ||
channels = append(channels, channel) | ||
} | ||
channels = append(channels, PublicChannels()...) | ||
|
||
return channels | ||
} | ||
|
||
// UnstakedChannels returns all channels that unstaked nodes can send messages on. | ||
func UnstakedChannels() network.ChannelList { | ||
// PublicChannels returns all channels that are used on the public network. | ||
func PublicChannels() network.ChannelList { | ||
return network.ChannelList{ | ||
UnstakedSyncCommittee, | ||
PublicSyncCommittee, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to my earlier comment about keeping
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Imo, Public channels can be joined by anyone, regardless of role (and even unstaked nodes who don't have a role whatsoever), so I think it makes sense to treat them separately here. |
||
} | ||
} | ||
|
||
|
@@ -106,11 +114,11 @@ const ( | |
|
||
// Channels for consensus protocols | ||
ConsensusCommittee = network.Channel("consensus-committee") | ||
consensusClusterPrefix = network.Channel("consensus-cluster") // dynamic channel, use ChannelConsensusCluster function | ||
consensusClusterPrefix = "consensus-cluster" // dynamic channel, use ChannelConsensusCluster function | ||
|
||
// Channels for protocols actively synchronizing state across nodes | ||
SyncCommittee = network.Channel("sync-committee") | ||
syncClusterPrefix = network.Channel("sync-cluster") // dynamic channel, use ChannelSyncCluster function | ||
syncClusterPrefix = "sync-cluster" // dynamic channel, use ChannelSyncCluster function | ||
SyncExecution = network.Channel("sync-execution") | ||
|
||
// Channels for dkg communication | ||
|
@@ -141,8 +149,8 @@ const ( | |
ProvideReceiptsByBlockID = RequestReceiptsByBlockID | ||
ProvideApprovalsByChunk = RequestApprovalsByChunk | ||
|
||
// Unstaked network channels | ||
UnstakedSyncCommittee = network.Channel("unstaked-sync-committee") | ||
// Public network channels | ||
PublicSyncCommittee = network.Channel("public-sync-committee") | ||
) | ||
|
||
// initializeChannelRoleMap initializes an instance of channelRoleMap and populates it with the channels and their | ||
|
@@ -161,7 +169,7 @@ func initializeChannelRoleMap() { | |
channelRoleMap[ConsensusCommittee] = flow.RoleList{flow.RoleConsensus} | ||
|
||
// Channels for protocols actively synchronizing state across nodes | ||
channelRoleMap[SyncCommittee] = flow.RoleList{flow.RoleConsensus} | ||
channelRoleMap[SyncCommittee] = flow.Roles() | ||
channelRoleMap[SyncExecution] = flow.RoleList{flow.RoleExecution} | ||
|
||
// Channels for DKG communication | ||
|
@@ -177,7 +185,7 @@ func initializeChannelRoleMap() { | |
channelRoleMap[PushApprovals] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification} | ||
|
||
// Channels for actively requesting missing entities | ||
channelRoleMap[RequestCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution} | ||
channelRoleMap[RequestCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution, flow.RoleAccess} | ||
channelRoleMap[RequestChunks] = flow.RoleList{flow.RoleExecution, flow.RoleVerification} | ||
channelRoleMap[RequestReceiptsByBlockID] = flow.RoleList{flow.RoleConsensus, flow.RoleExecution} | ||
channelRoleMap[RequestApprovalsByChunk] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification} | ||
|
@@ -190,40 +198,65 @@ func initializeChannelRoleMap() { | |
flow.RoleAccess} | ||
channelRoleMap[ReceiveApprovals] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification} | ||
|
||
channelRoleMap[ProvideCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution} | ||
channelRoleMap[ProvideCollections] = flow.RoleList{flow.RoleCollection, flow.RoleExecution, flow.RoleAccess} | ||
channelRoleMap[ProvideChunks] = flow.RoleList{flow.RoleExecution, flow.RoleVerification} | ||
channelRoleMap[ProvideReceiptsByBlockID] = flow.RoleList{flow.RoleConsensus, flow.RoleExecution} | ||
channelRoleMap[ProvideApprovalsByChunk] = flow.RoleList{flow.RoleConsensus, flow.RoleVerification} | ||
|
||
channelRoleMap[syncClusterPrefix] = flow.RoleList{flow.RoleCollection} | ||
channelRoleMap[consensusClusterPrefix] = flow.RoleList{flow.RoleCollection} | ||
synzhu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
clusterChannelPrefixRoleMap = make(map[string]flow.RoleList) | ||
|
||
clusterChannelPrefixRoleMap[syncClusterPrefix] = flow.RoleList{flow.RoleCollection} | ||
clusterChannelPrefixRoleMap[consensusClusterPrefix] = flow.RoleList{flow.RoleCollection} | ||
} | ||
|
||
// ClusterChannel returns true if channel is cluster-based. | ||
// At the current implementation, only collection nodes are involved in a cluster-based channels. | ||
// If the channel is a cluster-based one, this method also strips off the channel prefix and returns it. | ||
func ClusterChannel(channel network.Channel) (network.Channel, bool) { | ||
if strings.HasPrefix(channel.String(), syncClusterPrefix.String()) { | ||
return syncClusterPrefix, true | ||
// ClusterChannelRoles returns the list of roles that are involved in the given cluster-based channel. | ||
func ClusterChannelRoles(clusterChannel network.Channel) flow.RoleList { | ||
if prefix, ok := clusterChannelPrefix(clusterChannel); ok { | ||
return clusterChannelPrefixRoleMap[prefix] | ||
} | ||
|
||
if strings.HasPrefix(channel.String(), consensusClusterPrefix.String()) { | ||
return consensusClusterPrefix, true | ||
return flow.RoleList{} | ||
} | ||
|
||
func clusterChannelPrefix(clusterChannel network.Channel) (string, bool) { | ||
for prefix := range clusterChannelPrefixRoleMap { | ||
if strings.HasPrefix(clusterChannel.String(), prefix) { | ||
return prefix, true | ||
} | ||
} | ||
|
||
return "", false | ||
} | ||
|
||
// IsClusterChannel returns true if channel is cluster-based. | ||
// Currently, only collection nodes are involved in a cluster-based channels. | ||
func IsClusterChannel(channel network.Channel) bool { | ||
_, ok := clusterChannelPrefix(channel) | ||
return ok | ||
} | ||
|
||
// TopicFromChannel returns the unique LibP2P topic form the channel. | ||
// The channel is made up of name string suffixed with root block id. | ||
// The root block id is used to prevent cross talks between nodes on different sporks. | ||
func TopicFromChannel(channel network.Channel, rootBlockID string) network.Topic { | ||
func TopicFromChannel(channel network.Channel, rootBlockID flow.Identifier) network.Topic { | ||
// skip root block suffix, if this is a cluster specific channel. A cluster specific channel is inherently | ||
// unique for each epoch | ||
if strings.HasPrefix(channel.String(), syncClusterPrefix.String()) || strings.HasPrefix(string(channel), consensusClusterPrefix.String()) { | ||
if IsClusterChannel(channel) { | ||
return network.Topic(channel) | ||
} | ||
return network.Topic(fmt.Sprintf("%s/%s", string(channel), rootBlockID)) | ||
return network.Topic(fmt.Sprintf("%s/%s", string(channel), rootBlockID.String())) | ||
} | ||
|
||
func ChannelFromTopic(topic network.Topic) (network.Channel, bool) { | ||
if IsClusterChannel(network.Channel(topic)) { | ||
return network.Channel(topic), true | ||
} | ||
|
||
if index := strings.LastIndex(topic.String(), "/"); index != -1 { | ||
return network.Channel(topic[:index]), true | ||
} | ||
|
||
return "", false | ||
} | ||
|
||
// ChannelConsensusCluster returns a dynamic cluster consensus channel based on | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ID()
method is quite resource-heavy, and that is why we kept it out of the loop there. I could not find any reason we need to recompute it over each iteration. If that is the case, please move it to its old place out of the loop.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old place is already inside the loop 🧐
I have only placed it inside the
if
, which is only saving computation because it will be computed less often.