-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make a crate of grpc_stream #397
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies if I left comments for things that are just copy pasted. If this isn't the right place to address them, please open issues / a tracker item for those rather than fixing them here.
} | ||
} | ||
|
||
/// These metrics are temporary (suffixed with _temp to avoid conflict with metrics in processor crate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it matter if they clash, don't they still mean the same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prometheus will try to register the metric with the same name and will error. I can't import the metrics from processor crate because of cyclic dep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest renaming them and changing them- ideally each crate is responsible for emitting its own metrics; if anything, we can allow for a "metrics_prefix" and then allow dynamically creating them (instead of a single global lazy static) per instance of the streamer.
We can then create nice and specific metrics- maybe an "items received", and then a "filtered" either book or separate series, etc
Ie there should not be any overlap between crates and the SDK because the SDK will theoretically never have the same context on what's happening inside a given step as the step does (especially for complicated things like this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ie we should move away from thinking "this is the processor GRPC client" and think of it as "this is the way anyone in any application, even python, should use our GRPC APIs, and will put it on crates.io and it'll go viral and the world will love us"- I find that generalized approach helps create nice, hermetic, batteries-included crates, vs crates that rely on specific behavior from other implementing crates for our own purposes (within reason ofc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also duplicate metrics- ie the GRPC emitting channel size and also processor with the instrumented channel- it's fine :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, didn't realize you'd be using this crate alongside the old code and the registration itself would clash.
These clients support setting a prefix. If we don't do that today, I agree we should. Example from API Gateway: https://github.com/aptos-labs/api-gateway/blob/fa335677d1c80de9dc103396d563d4e47006a661/src/metrics/labels.rs#L37.
ReceivedTxnsFromGrpc, | ||
// Received transactions from GRPC. Sending transactions to channel. | ||
ProcessedBatch, | ||
// Processor finished processing one batch of transaction | ||
ProcessedMultipleBatches, // Processor finished processing multiple batches of transactions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this instead:
pub enum ProcessorStep {
/// Received transactions from GRPC. Sending transactions to channel.
ReceivedTxnsFromGrpc,
/// Processor finished processing one batch of transaction
ProcessedBatch,
/// Processor finished processing multiple batches of transactions
ProcessedMultipleBatches,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also what do we mean by "Sending transactions to channel.". Did that happen as part of this step or is that what happens next?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sending transactions to channel
is what happens next. This is copy/pasted from the processors crate. We'll redo all these metrics as part of SDK, so this is just throwaway for backwards compatibility for our dashboards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we kill this enum and giant, single overloaded logging method as a pattern? It's quite awful and confusing hahaha
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
giant, single overloaded logging method as a pattern
We can kill it in this crate. For more context, "giant, single overloaded logging method as a pattern" got introduced in the indexer grpc stack to reduce duplicated code and make sure we are logging all the same metrics in every relevant place. I'm open to other suggestions on how to do this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually don't mind this enum ¯_(ツ)_/¯. I like it for the reason Renee mentions, it's easier to do things consistently. If anything I'd go further and wrap this in a struct that also contains all the relevant labels, and then some methods on that struct bump the relevant counts (so the counters become private to this file). You can see the API Gateway code for some inspiration on that: https://github.com/aptos-labs/api-gateway/blob/fa335677d1c80de9dc103396d563d4e47006a661/src/metrics/labels.rs#L134-L135. In this case we only bump the relevant metrics when the struct is dropped, but you could also have explicit functions for it (which makes more sense in this case).
rust/aptos-indexer-transaction-stream/src/transaction_stream.rs
Outdated
Show resolved
Hide resolved
rust/aptos-indexer-transaction-stream/src/transaction_stream.rs
Outdated
Show resolved
Hide resolved
); | ||
|
||
transaction_stream.init_stream().await; | ||
let mut send_ma = MovingAverage::new(3000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this 3k value be configurable? (with a default).
Description
Clean up
transaction_stream.rs' (renamed from
grpc_stream.rs`) so we can make a crate of it, which will be used in the SDK.TransactionStream
transaction_stream.rs
defines a structTransactionStream
that has these methods:init_stream
establishes a connection with the grpc streamget_next_transaction_batch
fetches the next transaction response from grpcis_end_of_stream
signals if you've reached the end of the streamreconnect_to_grpc
will attempt to reconnect with grpc if there's an error with the streamreconnect_to_grpc_with_retries
is a wrapper aroundreconnect_to_grpc
that adds retry logicHow processors use TransactionStream
In
worker.rs
,fetch_transactions_from_transaction_stream
will initialize and manage theTransactionStream
, filtering transactions, sending the transactions to channel, handle reconnection if necessary.Testing
cargo run --release -- -c config.yaml