-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Add implementation for events data providers and account statuses data providers #6766
Merged
AndriiDiachuk
merged 69 commits into
onflow:master
from
The-K-R-O-K:AndriiDiachuk/6588-events-data-provider
Dec 26, 2024
Merged
Changes from 68 commits
Commits
Show all changes
69 commits
Select commit
Hold shift + click to select a range
fed835a
Added skeleton for events data provider
AndriiDiachuk ea73371
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk 9c10d24
Added initializing of events filter, added missing data to factory
AndriiDiachuk 9547b5d
Fixed factory test
AndriiDiachuk b7f5ca7
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk 0f34ae1
Added test skeleton for testing invalid arguments
AndriiDiachuk 411f9e5
Added test for messageIndex check
AndriiDiachuk 9a402de
Merged
AndriiDiachuk 636740a
Added check for a valid event types in parse function
AndriiDiachuk f70c8e1
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk 588688e
Changed type of arguments for consistency
AndriiDiachuk b537a5f
Added test case for event provider in factory_test
AndriiDiachuk 4f63403
Added implementations for account provider functions
AndriiDiachuk 867fcf7
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk dca9a25
Fixed remarks
AndriiDiachuk 867fc98
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk 9624894
Added test for invalid arguments and for message index
AndriiDiachuk 309b148
Added check for starting index value
AndriiDiachuk 756c21d
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk 47a4c19
Added check for msgIndex
AndriiDiachuk 8f1f99c
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk 0abe203
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk 8ffe023
changed handleResponse to generic
AndriiDiachuk 1384405
Added happy path for testing all subscribe methods
AndriiDiachuk 57a7c0f
Linted
AndriiDiachuk 3e48960
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
peterargue e076072
Changed order of params
AndriiDiachuk 89bc4c2
Fixed issues with params order
AndriiDiachuk 1cc0a74
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiDi…
AndriiDiachuk a1d7aa7
Refactored parse function
AndriiDiachuk 8d9e090
Using json arrays instead of comma separeted lists
AndriiDiachuk 416ff58
Added heartbeat handling in handleResponse, updated type of expected …
AndriiDiachuk 2239802
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk 0e85d16
Merged
AndriiDiachuk 179a664
Fixed small remarks
AndriiDiachuk e0aa808
Made parse function private
AndriiDiachuk 19bf3c9
Changed Arguments type and refactored code
AndriiDiachuk 8d0543d
Removed comment
AndriiDiachuk d6fc9df
Fixed parse function for event privoder
AndriiDiachuk 1bb4112
Linted
AndriiDiachuk 2e90387
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk 30e27df
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiDi…
AndriiDiachuk bb04f4a
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk df387ec
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk f81c794
Changed parse args function, added hearbeat for hadnling
AndriiDiachuk 50fd289
Fixed error msg
AndriiDiachuk 37836f9
Added happy path cases, fixed remarks from event provider PR
AndriiDiachuk 88e0333
Merge branch 'AndriiDiachuk/6587-accounts-data-provider' of github.co…
AndriiDiachuk 8432297
Fixed account statuses test
AndriiDiachuk 3ba47e8
Refacored events provider tests to use generic testHappyPath function
AndriiDiachuk 85e4464
Refactored account statuses test
AndriiDiachuk 03e5e42
Decreased expected events count
AndriiDiachuk 32d4850
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk de5f077
Added description for missing func params
AndriiDiachuk e83930e
Refactored parse functions
AndriiDiachuk 5c9f16c
Refactored args filter init
AndriiDiachuk c002841
Created separate file for generic testHappyPath method and for testTy…
AndriiDiachuk 90d86fd
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiDi…
AndriiDiachuk 2f0142d
Removed commented code
AndriiDiachuk cb81354
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk 13c4102
Renamed util filke for tests
AndriiDiachuk 1360891
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk 7b8ee77
Initializing msg index by 0
AndriiDiachuk dd23b79
Changed type of msgIndex in model to uint64:
AndriiDiachuk bf28e2f
Refactored parse arguments function
AndriiDiachuk f419a5c
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk 907c708
Fixed msg index start from 0
AndriiDiachuk 101ff68
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk f158e6f
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
2 changes: 1 addition & 1 deletion
2
...ne/access/rest/http/request/event_type.go → ...e/access/rest/common/parser/event_type.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
package request | ||
package parser | ||
|
||
import ( | ||
"fmt" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
176 changes: 176 additions & 0 deletions
176
engine/access/rest/websockets/data_providers/account_statuses_provider.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
package data_providers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
|
||
"github.com/rs/zerolog" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
|
||
"github.com/onflow/flow-go/engine/access/rest/common/parser" | ||
"github.com/onflow/flow-go/engine/access/rest/http/request" | ||
"github.com/onflow/flow-go/engine/access/rest/websockets/models" | ||
"github.com/onflow/flow-go/engine/access/state_stream" | ||
"github.com/onflow/flow-go/engine/access/state_stream/backend" | ||
"github.com/onflow/flow-go/engine/access/subscription" | ||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/module/counters" | ||
) | ||
|
||
// accountStatusesArguments contains the arguments required for subscribing to account statuses | ||
type accountStatusesArguments struct { | ||
StartBlockID flow.Identifier // ID of the block to start subscription from | ||
StartBlockHeight uint64 // Height of the block to start subscription from | ||
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription | ||
} | ||
|
||
type AccountStatusesDataProvider struct { | ||
*baseDataProvider | ||
|
||
logger zerolog.Logger | ||
stateStreamApi state_stream.API | ||
|
||
heartbeatInterval uint64 | ||
} | ||
|
||
var _ DataProvider = (*AccountStatusesDataProvider)(nil) | ||
|
||
// NewAccountStatusesDataProvider creates a new instance of AccountStatusesDataProvider. | ||
func NewAccountStatusesDataProvider( | ||
ctx context.Context, | ||
logger zerolog.Logger, | ||
stateStreamApi state_stream.API, | ||
topic string, | ||
arguments models.Arguments, | ||
send chan<- interface{}, | ||
chain flow.Chain, | ||
eventFilterConfig state_stream.EventFilterConfig, | ||
heartbeatInterval uint64, | ||
) (*AccountStatusesDataProvider, error) { | ||
p := &AccountStatusesDataProvider{ | ||
logger: logger.With().Str("component", "account-statuses-data-provider").Logger(), | ||
stateStreamApi: stateStreamApi, | ||
heartbeatInterval: heartbeatInterval, | ||
} | ||
|
||
// Initialize arguments passed to the provider. | ||
accountStatusesArgs, err := parseAccountStatusesArguments(arguments, chain, eventFilterConfig) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err) | ||
} | ||
|
||
subCtx, cancel := context.WithCancel(ctx) | ||
|
||
p.baseDataProvider = newBaseDataProvider( | ||
topic, | ||
cancel, | ||
send, | ||
p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments. | ||
) | ||
|
||
return p, nil | ||
} | ||
|
||
// Run starts processing the subscription for events and handles responses. | ||
// | ||
// No errors are expected during normal operations. | ||
func (p *AccountStatusesDataProvider) Run() error { | ||
return subscription.HandleSubscription(p.subscription, p.handleResponse()) | ||
} | ||
|
||
// createSubscription creates a new subscription using the specified input arguments. | ||
func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, args accountStatusesArguments) subscription.Subscription { | ||
if args.StartBlockID != flow.ZeroID { | ||
return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, args.StartBlockID, args.Filter) | ||
} | ||
|
||
if args.StartBlockHeight != request.EmptyHeight { | ||
return p.stateStreamApi.SubscribeAccountStatusesFromStartHeight(ctx, args.StartBlockHeight, args.Filter) | ||
} | ||
|
||
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter) | ||
} | ||
|
||
// handleResponse processes an account statuses and sends the formatted response. | ||
// | ||
// No errors are expected during normal operations. | ||
func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error { | ||
blocksSinceLastMessage := uint64(0) | ||
messageIndex := counters.NewMonotonousCounter(0) | ||
|
||
return func(accountStatusesResponse *backend.AccountStatusesResponse) error { | ||
// check if there are any events in the response. if not, do not send a message unless the last | ||
// response was more than HeartbeatInterval blocks ago | ||
if len(accountStatusesResponse.AccountEvents) == 0 { | ||
blocksSinceLastMessage++ | ||
if blocksSinceLastMessage < p.heartbeatInterval { | ||
return nil | ||
} | ||
blocksSinceLastMessage = 0 | ||
} | ||
|
||
index := messageIndex.Value() | ||
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { | ||
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) | ||
} | ||
|
||
p.send <- &models.AccountStatusesResponse{ | ||
BlockID: accountStatusesResponse.BlockID.String(), | ||
Height: strconv.FormatUint(accountStatusesResponse.Height, 10), | ||
AccountEvents: accountStatusesResponse.AccountEvents, | ||
MessageIndex: index, | ||
} | ||
|
||
return nil | ||
} | ||
} | ||
|
||
// parseAccountStatusesArguments validates and initializes the account statuses arguments. | ||
func parseAccountStatusesArguments( | ||
arguments models.Arguments, | ||
chain flow.Chain, | ||
eventFilterConfig state_stream.EventFilterConfig, | ||
) (accountStatusesArguments, error) { | ||
var args accountStatusesArguments | ||
|
||
// Parse block arguments | ||
startBlockID, startBlockHeight, err := ParseStartBlock(arguments) | ||
if err != nil { | ||
return args, err | ||
} | ||
args.StartBlockID = startBlockID | ||
args.StartBlockHeight = startBlockHeight | ||
|
||
// Parse 'event_types' as a JSON array | ||
var eventTypes parser.EventTypes | ||
if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" { | ||
result, ok := eventTypesIn.([]string) | ||
if !ok { | ||
return args, fmt.Errorf("'event_types' must be an array of string") | ||
} | ||
|
||
err := eventTypes.Parse(result) | ||
if err != nil { | ||
return args, fmt.Errorf("invalid 'event_types': %w", err) | ||
} | ||
} | ||
|
||
// Parse 'accountAddresses' as []string{} | ||
var accountAddresses []string | ||
if accountAddressesIn, ok := arguments["account_addresses"]; ok && accountAddressesIn != "" { | ||
accountAddresses, ok = accountAddressesIn.([]string) | ||
if !ok { | ||
return args, fmt.Errorf("'account_addresses' must be an array of string") | ||
} | ||
} | ||
|
||
// Initialize the event filter with the parsed arguments | ||
args.Filter, err = state_stream.NewAccountStatusFilter(eventFilterConfig, chain, eventTypes.Flow(), accountAddresses) | ||
if err != nil { | ||
return args, fmt.Errorf("failed to create event filter: %w", err) | ||
} | ||
|
||
return args, nil | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be outside of the check.
otherwise, it could behave strangely. for example, if
heartbeatInterval
is 5 and there are 4 empty blocks in a row,blocksSinceLastMessage
would be 4. Then if the next block contained events, a message would be sent without resettingblocksSinceLastMessage
, so on the next empty block, a heartbeat message would be sent.looks like we have this bug in all of the places. I opened #6837 to fix this everywhere. you can skip it in this PR