-
Notifications
You must be signed in to change notification settings - Fork 6
WIP: Rust Indexer/Operator/Relayer #299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 58 commits
a4ec9b3
4c7582d
1047caf
cd0ad87
0eedfad
7e332f6
6307c17
9a97d5d
9826a50
bbc019d
e3bb9a8
d4a2229
3d155f4
4b52feb
ec0b13e
9bbc73b
9560e6c
e5879da
9b8e6ea
a79ead3
4e36b32
3b9760d
09d1054
a02cbea
ce3fce4
00d88c9
bc57e36
e51f66a
ea020a0
7f43c80
cf74716
e7c5d18
ed2dbe6
f092561
3e650df
683f887
e4919d9
88227d8
ccfc34b
916e42b
1751c0c
8c9161a
20b9bf2
01a7321
97fe987
85fc220
bed95b2
5766be1
48f162a
bfadaa8
bc2e407
bc93181
5624c06
8dc05d7
0d250e0
cf1b4a4
b14dfa6
016ebd0
2a08d9d
49db338
392c1ce
1e933b9
9fc8be8
9d720c5
9e1c1ee
1b02a1e
d4b3c53
a9508f8
af548f8
f825036
131c351
4a5cc5c
4894f4a
290bb85
263cb0e
b9b81fb
62994a8
5da950a
58d2454
c5359dc
f1624fd
5bf79ef
90c0e70
1433aac
9293854
ce2d6e6
2d9d590
ed43e71
0ed696f
ec06783
20a711c
b19f68e
20ef139
fbedff6
2931fee
5cbfb64
22af924
bbba6d4
99ad04b
9287a5c
f288183
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| [package] | ||
| name = "core-rs" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
|
|
||
| [dependencies] | ||
| tokio = { workspace = true } | ||
| async-trait = { workspace = true } | ||
| thiserror = "1.0" | ||
| anyhow = { workspace = true } | ||
|
|
||
| alloy = { workspace = true } | ||
| mockall = "0.13.0" | ||
|
|
||
| serde = { workspace = true } | ||
| serde_json = { workspace = true } | ||
| sha2 = "0.10.6" | ||
|
|
||
| log = { workspace = true } | ||
| futures-util = { workspace = true } | ||
|
|
||
| [dev-dependencies] | ||
| tokio-test = "0.4" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| // pub mod chainio; | ||
| // pub mod config; | ||
| // pub mod metricable; | ||
| // pub mod smt; | ||
| // pub mod types; | ||
| pub mod safeclient; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| use alloy::eips::BlockNumberOrTag; | ||
| use alloy::primitives::{B256, U256}; | ||
| use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect}; | ||
| use alloy::pubsub::PubSubFrontend; | ||
| use alloy::rpc::types::{Block, Filter, Header, Log, Transaction, TransactionReceipt}; | ||
| use anyhow::Result; | ||
| use async_trait::async_trait; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
| use tokio::sync::broadcast; | ||
| use futures_util::StreamExt; | ||
| use log::{error, warn}; | ||
|
|
||
|
|
||
| #[async_trait] | ||
| pub trait SafeClient: Send + Sync { | ||
| async fn block_number(&self) -> Result<U256>; | ||
| async fn get_block(&self, block_number: BlockNumberOrTag) -> Result<Option<Block>>; | ||
| async fn get_transaction(&self, tx_hash: B256) -> Result<Option<Transaction>>; | ||
| async fn get_transaction_receipt(&self, tx_hash: B256) -> Result<Option<TransactionReceipt>>; | ||
| async fn get_logs(&self, filter: Filter) -> Result<Vec<Log>>; | ||
| async fn subscribe_logs(&self, filter: Filter) -> Result<broadcast::Receiver<Log>>; | ||
| async fn subscribe_new_heads(&self) -> Result<broadcast::Receiver<Header>>; | ||
| fn close(&self); | ||
| } | ||
|
|
||
| pub struct SafeEthClient { | ||
| provider: Arc<RootProvider<PubSubFrontend>>, | ||
| log_resub_interval: Duration, | ||
| header_timeout: Duration, | ||
| block_chunk_size: u64, | ||
| block_max_range: u64, | ||
| close_sender: broadcast::Sender<()>, | ||
| } | ||
|
|
||
| pub struct SafeEthClientOptions { | ||
| pub log_resub_interval: Duration, | ||
| pub header_timeout: Duration, | ||
| pub block_chunk_size: u64, | ||
| pub block_max_range: u64, | ||
| } | ||
|
|
||
| impl Default for SafeEthClientOptions { | ||
| fn default() -> Self { | ||
| Self { | ||
| log_resub_interval: Duration::from_secs(300), | ||
| header_timeout: Duration::from_secs(30), | ||
| block_chunk_size: 100, | ||
| block_max_range: 100, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl SafeEthClient { | ||
| pub async fn new(ws_url: &str, options: SafeEthClientOptions) -> Result<Self> { | ||
| let ws = WsConnect::new(ws_url); | ||
| let provider = ProviderBuilder::new().on_ws(ws).await?; | ||
| let (close_sender, _) = broadcast::channel(1); | ||
|
|
||
| Ok(Self { | ||
| provider: Arc::new(provider), | ||
| log_resub_interval: options.log_resub_interval, | ||
| header_timeout: options.header_timeout, | ||
| block_chunk_size: options.block_chunk_size, | ||
| block_max_range: options.block_max_range, | ||
| close_sender, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl SafeClient for SafeEthClient { | ||
| async fn block_number(&self) -> Result<U256> { | ||
| Ok(U256::from(self.provider.get_block_number().await?)) | ||
| } | ||
|
|
||
| async fn get_block(&self, block_number: BlockNumberOrTag) -> Result<Option<Block>> { | ||
| Ok(self.provider.get_block(alloy::eips::BlockId::Number(block_number), alloy::rpc::types::BlockTransactionsKind::Hashes).await?) | ||
| } | ||
|
|
||
| async fn get_transaction(&self, tx_hash: B256) -> Result<Option<Transaction>> { | ||
| Ok(self.provider.get_transaction_by_hash(tx_hash).await?) | ||
| } | ||
|
|
||
| async fn get_transaction_receipt(&self, tx_hash: B256) -> Result<Option<TransactionReceipt>> { | ||
| Ok(self.provider.get_transaction_receipt(tx_hash).await?) | ||
| } | ||
|
|
||
| async fn get_logs(&self, filter: Filter) -> Result<Vec<Log>> { | ||
| Ok(self.provider.get_logs(&filter).await?) | ||
| } | ||
|
|
||
| async fn subscribe_logs(&self, filter: Filter) -> Result<broadcast::Receiver<Log>> { | ||
| let (tx, rx) = broadcast::channel(100); | ||
| let log_resub_interval = self.log_resub_interval; | ||
| let subscription = self.provider.subscribe_logs(&filter).await?; | ||
| let mut stream = subscription.into_stream(); | ||
|
|
||
| tokio::spawn(async move { | ||
| loop { | ||
| tokio::select! { | ||
| Some(log) = stream.next() => { | ||
| if tx.send(log).is_err() { | ||
| error!("Error sending log: channel closed"); | ||
| break; | ||
| } | ||
| } | ||
| _ = tokio::time::sleep(log_resub_interval) => { | ||
| warn!("Timeout waiting for new log"); | ||
| break; | ||
| } | ||
| else => { | ||
| error!("Log stream ended unexpectedly"); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| Ok(rx) | ||
| } | ||
|
|
||
| async fn subscribe_new_heads(&self) -> Result<broadcast::Receiver<Header>> { | ||
| let (tx, rx) = broadcast::channel(100); | ||
| let header_timeout = self.header_timeout; | ||
| let subscription = self.provider.subscribe_blocks().await?; | ||
| let mut stream = subscription.into_stream(); | ||
|
|
||
| tokio::spawn(async move { | ||
| loop { | ||
| tokio::select! { | ||
| Some(block) = stream.next() => { | ||
| if tx.send(block.header).is_err() { | ||
| error!("Error sending header: channel closed"); | ||
| break; | ||
| } | ||
| } | ||
| _ = tokio::time::sleep(header_timeout) => { | ||
| warn!("Timeout waiting for new header"); | ||
| break; | ||
| } | ||
| else => { | ||
| error!("Header stream ended unexpectedly"); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| Ok(rx) | ||
| } | ||
|
|
||
| fn close(&self) { | ||
| let _ = self.close_sender.send(()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| mod client; | ||
| mod utils; | ||
|
|
||
| pub use client::{SafeClient, SafeEthClient, SafeEthClientOptions}; | ||
| pub use utils::*; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| use alloy::{primitives::B256, rpc::types::Log}; | ||
| use sha2::{Digest, Sha256}; | ||
|
|
||
| pub fn hash_log(log: &Log) -> B256 { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this used? Are logs hashed with SHA256 or Keccak? |
||
| let mut hasher = Sha256::new(); | ||
|
|
||
| // Hash the log fields | ||
| hasher.update(log.address().as_slice()); | ||
| for topic in log.topics() { | ||
| hasher.update(topic.as_slice()); | ||
| } | ||
| hasher.update(log.data().data.clone()); | ||
|
|
||
| // Hash additional block and tx info | ||
| if let Some(block_hash) = log.block_hash { | ||
| hasher.update(block_hash.as_slice()); | ||
| } | ||
| if let Some(transaction_hash) = log.transaction_hash { | ||
| hasher.update(transaction_hash.as_slice()); | ||
| } | ||
| if let Some(log_index) = log.log_index { | ||
| hasher.update(&log_index.to_be_bytes()); | ||
| } | ||
|
|
||
| B256::from_slice(&hasher.finalize()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,29 +9,33 @@ edition = "2021" | |
| actix = "0.13.1" | ||
| actix-web = "4.5.1" | ||
| futures = "0.3.5" | ||
| tokio = { version = "1.28.2", features = ["sync", "time"] } | ||
| tokio = { workspace = true } | ||
| deadpool = "0.10.0" | ||
| lapin = "2.3.1" | ||
| deadpool-lapin = "0.11.0" | ||
| tokio-executor-trait = "2.1.0" | ||
| tokio-reactor-trait = "1.1.0" | ||
| prometheus = "0.13.3" | ||
| prometheus = { workspace = true } | ||
|
|
||
| clap = { version = "4.4.11", features = ["color", "derive", "env"] } | ||
| clap = { workspace = true } | ||
| openssl-probe = "0.1.4" | ||
| serde = { version = "1", features = ["derive"] } | ||
| serde_json = "1.0.68" | ||
| serde = { workspace = true } | ||
| serde_json = { workspace = true } | ||
|
|
||
| tracing = { version = "0.1.36", features = ["std"] } | ||
| tracing = { workspace = true } | ||
| thiserror = "1.0.56" | ||
| anyhow = "1.0.79" | ||
| anyhow = { workspace = true } | ||
|
|
||
| near-indexer = { git = "https://github.com/near/nearcore", rev = "b3d767e7664d8e123a35313ccc66c8ac1afb2058" } | ||
| near-client = { git = "https://github.com/near/nearcore", rev = "b3d767e7664d8e123a35313ccc66c8ac1afb2058" } | ||
| near-o11y = { git = "https://github.com/near/nearcore", rev = "b3d767e7664d8e123a35313ccc66c8ac1afb2058" } | ||
| near-client-primitives = { git = "https://github.com/near/nearcore", rev = "b3d767e7664d8e123a35313ccc66c8ac1afb2058" } | ||
| reqwest = { version = "0.12.7", features = ["json"] } | ||
| borsh = { version = "1.0.0", features = ["derive", "rc"] } | ||
| serde_yaml = "0.9.34" | ||
| serde_yaml = { workspace = true } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gud |
||
|
|
||
| [dev-dependencies] | ||
| near-crypto = { git = "https://github.com/near/nearcore", rev = "b3d767e7664d8e123a35313ccc66c8ac1afb2058" } | ||
|
|
||
| [features] | ||
| use_fastnear = [] | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,47 @@ | ||||||||||
| FROM rust:1.79 AS builder | ||||||||||
| WORKDIR /tmp/indexer | ||||||||||
|
||||||||||
| FROM rust:1.79 AS builder | |
| WORKDIR /tmp/indexer | |
| FROM rust:1.79.0-bookworm AS builder | |
| WORKDIR /tmp/indexer |
This will make sure the builder doesn't break since it's using bookworm. You could also elect to use bookworm-slim
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a likelihood of more than 100 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good one - depends on how successful NFFL is really :) Maybe I'll make this into a runtime config.