diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d4b14607..d82a7c4db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - Added `GetLimits` endpoint to the RPC server ([#1410](https://github.com/0xMiden/miden-node/pull/1410)). - Added gRPC-Web probe support to the `miden-network-monitor` binary ([#1484](https://github.com/0xMiden/miden-node/pull/1484)). - Add DB schema change check ([#1268](https://github.com/0xMiden/miden-node/pull/1485)). +- Decoupled ntx-builder from block-producer startup by loading network accounts asynchronously via a background task ([#????](https://github.com/0xMiden/miden-node/pull/????)). - Add foreign account support to validator ([#1493](https://github.com/0xMiden/miden-node/pull/1493)). - Improve DB query performance for account queries ([#1496](https://github.com/0xMiden/miden-node/pull/1496). - Limit number of storage map keys in `GetAccount` requests ([#1517](https://github.com/0xMiden/miden-node/pull/1517)). diff --git a/bin/node/src/commands/block_producer.rs b/bin/node/src/commands/block_producer.rs index d50182d87..5cfbc78fc 100644 --- a/bin/node/src/commands/block_producer.rs +++ b/bin/node/src/commands/block_producer.rs @@ -1,10 +1,8 @@ -use std::sync::Arc; use std::time::Duration; use anyhow::Context; use miden_node_block_producer::BlockProducer; use miden_node_utils::grpc::UrlExt; -use tokio::sync::Barrier; use url::Url; use super::{ENV_BLOCK_PRODUCER_URL, ENV_STORE_BLOCK_PRODUCER_URL}; @@ -93,7 +91,6 @@ impl BlockProducerCommand { block_interval: block_producer.block_interval, max_txs_per_batch: block_producer.max_txs_per_batch, max_batches_per_block: block_producer.max_batches_per_block, - production_checkpoint: Arc::new(Barrier::new(1)), grpc_timeout, mempool_tx_capacity: block_producer.mempool_tx_capacity, } diff --git a/bin/node/src/commands/bundled.rs b/bin/node/src/commands/bundled.rs index 1a864a381..28fba84e9 100644 --- a/bin/node/src/commands/bundled.rs +++ b/bin/node/src/commands/bundled.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::path::PathBuf; -use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -14,7 +13,6 @@ use miden_protocol::block::BlockSigner; use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey; use miden_protocol::utils::Deserializable; use tokio::net::TcpListener; -use tokio::sync::Barrier; use tokio::task::JoinSet; use url::Url; @@ -222,19 +220,11 @@ impl BundledCommand { }) .id(); - // A sync point between the ntx-builder and block-producer components. let should_start_ntx_builder = !ntx_builder.disabled; - let checkpoint = if should_start_ntx_builder { - Barrier::new(2) - } else { - Barrier::new(1) - }; - let checkpoint = Arc::new(checkpoint); // Start block-producer. The block-producer's endpoint is available after loading completes. let block_producer_id = join_set .spawn({ - let checkpoint = Arc::clone(&checkpoint); let store_url = Url::parse(&format!("http://{store_block_producer_address}")) .context("Failed to parse URL")?; let validator_url = Url::parse(&format!("http://{validator_address}")) @@ -250,7 +240,6 @@ impl BundledCommand { block_interval: block_producer.block_interval, max_batches_per_block: block_producer.max_batches_per_block, max_txs_per_batch: block_producer.max_txs_per_batch, - production_checkpoint: checkpoint, grpc_timeout, mempool_tx_capacity: block_producer.mempool_tx_capacity, } @@ -323,8 +312,6 @@ impl BundledCommand { block_producer_url, validator_url, ntx_builder.tx_prover_url, - ntx_builder.ticker_interval, - checkpoint, ntx_builder.script_cache_size, ) .run() diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index d8d6bdd97..8245c1ee6 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -18,7 +18,7 @@ use miden_protocol::block::BlockNumber; use miden_protocol::transaction::ProvenTransaction; use miden_protocol::utils::serde::Deserializable; use tokio::net::TcpListener; -use tokio::sync::{Barrier, Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use tonic::Status; use tower_http::trace::TraceLayer; @@ -65,11 +65,6 @@ pub struct BlockProducer { pub max_txs_per_batch: usize, /// The maximum number of batches per block. pub max_batches_per_block: usize, - /// Block production only begins after this checkpoint barrier completes. - /// - /// The block-producers gRPC endpoint will be available before this point, so this lets the - /// mempool synchronize its event stream without risking a race condition. - pub production_checkpoint: Arc, /// Server-side timeout for an individual gRPC request. /// /// If the handler takes longer than this duration, the server cancels the call. @@ -155,12 +150,7 @@ impl BlockProducer { // any complete or fail, we can shutdown the rest (somewhat) gracefully. let mut tasks = tokio::task::JoinSet::new(); - // Launch the gRPC server and wait at the checkpoint for any other components to be in sync. - // - // This is used to ensure the ntx-builder can subscribe to the mempool events without - // playing catch up caused by block-production. - // - // This is a temporary work-around until the ntx-builder can resync on the fly. + // Launch the gRPC server. let rpc_id = tasks .spawn({ let mempool = mempool.clone(); @@ -171,7 +161,6 @@ impl BlockProducer { } }) .id(); - self.production_checkpoint.wait().await; let batch_builder_id = tasks .spawn({ diff --git a/crates/ntx-builder/src/block_producer.rs b/crates/ntx-builder/src/block_producer.rs index 87d3da7e6..ce4d7b9c6 100644 --- a/crates/ntx-builder/src/block_producer.rs +++ b/crates/ntx-builder/src/block_producer.rs @@ -67,9 +67,9 @@ impl BlockProducerClient { loop { match self.subscribe_to_mempool(chain_tip).await { Err(err) if err.code() == tonic::Code::Unavailable => { - // exponential backoff with base 500ms and max 30s + // Exponential backoff with base 500ms and max 30s. let backoff = Duration::from_millis(500) - .saturating_mul(1 << retry_counter) + .saturating_mul(1 << retry_counter.min(6)) .min(Duration::from_secs(30)); tracing::warn!( diff --git a/crates/ntx-builder/src/builder.rs b/crates/ntx-builder/src/builder.rs index 84c711385..c564a5012 100644 --- a/crates/ntx-builder/src/builder.rs +++ b/crates/ntx-builder/src/builder.rs @@ -1,6 +1,5 @@ use std::num::NonZeroUsize; use std::sync::Arc; -use std::time::Duration; use anyhow::Context; use futures::TryStreamExt; @@ -13,8 +12,7 @@ use miden_protocol::block::BlockHeader; use miden_protocol::crypto::merkle::mmr::PartialMmr; use miden_protocol::note::NoteScript; use miden_protocol::transaction::PartialBlockchain; -use tokio::sync::{Barrier, RwLock}; -use tokio::time; +use tokio::sync::{RwLock, mpsc}; use url::Url; use crate::MAX_IN_PROGRESS_TXS; @@ -78,13 +76,6 @@ pub struct NetworkTransactionBuilder { /// Address of the remote prover. If `None`, transactions will be proven locally, which is /// undesirable due to the performance impact. tx_prover_url: Option, - /// Interval for checking pending notes and executing network transactions. - ticker_interval: Duration, - /// A checkpoint used to sync start-up process with the block-producer. - /// - /// This informs the block-producer when we have subscribed to mempool events and that it is - /// safe to begin block-production. - bp_checkpoint: Arc, /// Shared LRU cache for storing retrieved note scripts to avoid repeated store calls. /// This cache is shared across all account actors. script_cache: LruCache, @@ -93,14 +84,15 @@ pub struct NetworkTransactionBuilder { } impl NetworkTransactionBuilder { + /// Channel capacity for account loading. + const ACCOUNT_CHANNEL_CAPACITY: usize = 1_000; + /// Creates a new instance of the network transaction builder. pub fn new( store_url: Url, block_producer_url: Url, validator_url: Url, tx_prover_url: Option, - ticker_interval: Duration, - bp_checkpoint: Arc, script_cache_size: NonZeroUsize, ) -> Self { let script_cache = LruCache::new(script_cache_size); @@ -110,8 +102,6 @@ impl NetworkTransactionBuilder { block_producer_url, validator_url, tx_prover_url, - ticker_interval, - bp_checkpoint, script_cache, coordinator, } @@ -131,15 +121,6 @@ impl NetworkTransactionBuilder { .await .context("failed to subscribe to mempool events")?; - // Unlock the block-producer's block production. The block-producer is prevented from - // producing blocks until we have subscribed to mempool events. - // - // This is a temporary work-around until the ntx-builder can resync on the fly. - self.bp_checkpoint.wait().await; - - let mut interval = tokio::time::interval(self.ticker_interval); - interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); - // Create chain state that will be updated by the coordinator and read by actors. let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))); @@ -152,15 +133,17 @@ impl NetworkTransactionBuilder { script_cache: self.script_cache.clone(), }; - // Create initial set of actors based on all known network accounts. - let account_ids = store.get_network_account_ids().await?; - for account_id in account_ids { - if let Ok(account_id) = NetworkAccountId::try_from(account_id) { - self.coordinator - .spawn_actor(AccountOrigin::store(account_id), &actor_context) - .await?; - } - } + // Spawn a background task to load network accounts from the store. + // Accounts are sent through a channel in batches and processed in the main event loop. + let (account_tx, mut account_rx) = + mpsc::channel::(Self::ACCOUNT_CHANNEL_CAPACITY); + let account_loader_store = store.clone(); + let mut account_loader_handle = tokio::spawn(async move { + account_loader_store + .stream_network_account_ids(account_tx) + .await + .context("failed to load network accounts from store") + }); // Main loop which manages actors and passes mempool events to them. loop { @@ -181,10 +164,43 @@ impl NetworkTransactionBuilder { chain_state.clone(), ).await?; }, + // Handle account batches loaded from the store. + // Once all accounts are loaded, the channel closes and this branch + // becomes inactive (recv returns None and we stop matching). + Some(account_id) = account_rx.recv() => { + self.handle_loaded_account(account_id, &actor_context).await?; + }, + // Handle account loader task completion/failure. + // If the task fails, we abort since the builder would be in a degraded state + // where existing notes against network accounts won't be processed. + result = &mut account_loader_handle => { + result + .context("account loader task panicked") + .flatten()?; + + tracing::info!("account loading from store completed"); + account_loader_handle = tokio::spawn(std::future::pending()); + }, } } } + /// Handles a batch of account IDs loaded from the store by spawning actors for them. + #[tracing::instrument( + name = "ntx.builder.handle_loaded_accounts", + skip(self, account_id, actor_context) + )] + async fn handle_loaded_account( + &mut self, + account_id: NetworkAccountId, + actor_context: &AccountActorContext, + ) -> Result<(), anyhow::Error> { + self.coordinator + .spawn_actor(AccountOrigin::store(account_id), actor_context) + .await?; + Ok(()) + } + /// Handles mempool events by sending them to actors via the coordinator and/or spawning new /// actors as required. #[tracing::instrument( diff --git a/crates/ntx-builder/src/store.rs b/crates/ntx-builder/src/store.rs index 784a27101..1a7c7b309 100644 --- a/crates/ntx-builder/src/store.rs +++ b/crates/ntx-builder/src/store.rs @@ -1,3 +1,4 @@ +use std::ops::RangeInclusive; use std::time::Duration; use miden_node_proto::clients::{Builder, StoreNtxBuilderClient}; @@ -7,6 +8,7 @@ use miden_node_proto::errors::ConversionError; use miden_node_proto::generated::rpc::BlockRange; use miden_node_proto::generated::{self as proto}; use miden_node_proto::try_convert; +use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; use miden_protocol::account::{Account, AccountId}; use miden_protocol::block::{BlockHeader, BlockNumber}; @@ -57,7 +59,7 @@ impl StoreClient { Err(StoreError::GrpcClientError(err)) => { // Exponential backoff with base 500ms and max 30s. let backoff = Duration::from_millis(500) - .saturating_mul(1 << retry_counter) + .saturating_mul(1 << retry_counter.min(6)) .min(Duration::from_secs(30)); tracing::warn!( @@ -173,62 +175,137 @@ impl StoreClient { Ok(all_notes) } - /// Get all network account IDs. + /// Streams network account IDs to the provided sender. /// - /// Since the `GetNetworkAccountIds` method is paginated, we loop through all pages until we - /// reach the end. + /// This method is designed to be run in a background task, sending accounts to the main event + /// loop as they are loaded. This allows the ntx-builder to start processing mempool events + /// without waiting for all accounts to be preloaded. + pub async fn stream_network_account_ids( + &self, + sender: tokio::sync::mpsc::Sender, + ) -> Result<(), StoreError> { + let mut block_range = BlockNumber::from(0)..=BlockNumber::from(u32::MAX); + + while let Some(next_start) = self.load_accounts_page(block_range, &sender).await? { + block_range = next_start..=BlockNumber::from(u32::MAX); + } + + Ok(()) + } + + /// Loads a single page of network accounts and submits them to the sender. /// - /// Each page can return up to `MAX_RESPONSE_PAYLOAD_BYTES / AccountId::SERIALIZED_SIZE` - /// accounts (~289,000). With `100_000` iterations, which is assumed to be sufficient for the - /// foreseeable future. - #[instrument(target = COMPONENT, name = "store.client.get_network_account_ids", skip_all, err)] - pub async fn get_network_account_ids(&self) -> Result, StoreError> { - const MAX_ITERATIONS: u32 = 100_000; + /// Returns the next block number to fetch from, or `None` if the chain tip has been reached. + #[instrument(target = COMPONENT, name = "store.client.load_accounts_page", skip_all, err)] + async fn load_accounts_page( + &self, + block_range: RangeInclusive, + sender: &tokio::sync::mpsc::Sender, + ) -> Result, StoreError> { + let (accounts, pagination_info) = self.fetch_network_account_ids_page(block_range).await?; - let mut block_range = BlockNumber::from(0)..=BlockNumber::from(u32::MAX); + let chain_tip = pagination_info.chain_tip; + let current_height = pagination_info.block_num; - let mut ids = Vec::new(); - let mut iterations_count = 0; + self.send_accounts_to_channel(accounts, sender).await?; - loop { - let response = self + if current_height >= chain_tip { + Ok(None) + } else { + Ok(Some(BlockNumber::from(current_height))) + } + } + + #[instrument(target = COMPONENT, name = "store.client.fetch_network_account_ids_page", skip_all, err)] + async fn fetch_network_account_ids_page( + &self, + block_range: std::ops::RangeInclusive, + ) -> Result<(Vec, proto::rpc::PaginationInfo), StoreError> { + self.fetch_network_account_ids_page_inner(block_range) + .await + .inspect_err(|err| tracing::Span::current().set_error(err)) + } + + async fn fetch_network_account_ids_page_inner( + &self, + block_range: std::ops::RangeInclusive, + ) -> Result<(Vec, proto::rpc::PaginationInfo), StoreError> { + let mut retry_counter = 0u32; + + let response = loop { + match self .inner .clone() .get_network_account_ids(Into::::into(block_range.clone())) - .await? - .into_inner(); - - let accounts: Result, ConversionError> = response - .account_ids - .into_iter() - .map(|account_id| { - AccountId::read_from_bytes(&account_id.id) - .map_err(|err| ConversionError::deserialization_error("account_id", err)) - }) - .collect(); + .await + { + Ok(response) => break response.into_inner(), + Err(err) => { + // Exponential backoff with base 500ms and max 30s. + let backoff = Duration::from_millis(500) + .saturating_mul(1 << retry_counter.min(6)) + .min(Duration::from_secs(30)); + + tracing::warn!( + ?backoff, + %retry_counter, + %err, + "store connection failed while fetching committed accounts page, retrying" + ); - let pagination_info = response.pagination_info.ok_or( - ConversionError::MissingFieldInProtobufRepresentation { - entity: "NetworkAccountIdList", - field_name: "pagination_info", + retry_counter += 1; + tokio::time::sleep(backoff).await; }, - )?; + } + }; + + let accounts = response + .account_ids + .into_iter() + .map(|account_id| { + let account_id = AccountId::read_from_bytes(&account_id.id).map_err(|err| { + StoreError::DeserializationError(ConversionError::deserialization_error( + "account_id", + err, + )) + })?; + NetworkAccountId::try_from(account_id).map_err(|_| { + StoreError::MalformedResponse( + "account id is not a valid network account".into(), + ) + }) + }) + .collect::, StoreError>>()?; - ids.extend(accounts?); - iterations_count += 1; - block_range = - BlockNumber::from(pagination_info.block_num)..=BlockNumber::from(u32::MAX); + let pagination_info = response.pagination_info.ok_or( + ConversionError::MissingFieldInProtobufRepresentation { + entity: "NetworkAccountIdList", + field_name: "pagination_info", + }, + )?; - if pagination_info.block_num >= pagination_info.chain_tip { - break; - } + Ok((accounts, pagination_info)) + } - if iterations_count >= MAX_ITERATIONS { - return Err(StoreError::MaxIterationsReached("GetNetworkAccountIds".to_string())); + #[instrument( + target = COMPONENT, + name = "store.client.send_accounts_to_channel", + skip_all + )] + async fn send_accounts_to_channel( + &self, + accounts: Vec, + sender: &tokio::sync::mpsc::Sender, + ) -> Result<(), StoreError> { + for account in accounts { + // If the receiver is dropped, stop loading. + if sender.send(account).await.is_err() { + tracing::warn!("Account receiver dropped"); + return Ok(()); } } - Ok(ids) + Ok(()) } #[instrument(target = COMPONENT, name = "store.client.get_note_script_by_root", skip_all, err)] @@ -268,6 +345,4 @@ pub enum StoreError { MalformedResponse(String), #[error("failed to parse response")] DeserializationError(#[from] ConversionError), - #[error("max iterations reached: {0}")] - MaxIterationsReached(String), } diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index 1fc0c266b..45e4bf895 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -131,9 +131,9 @@ impl RpcService { return Ok(header); }, Err(err) if err.code() == tonic::Code::Unavailable => { - // exponential backoff with base 500ms and max 30s + // Exponential backoff with base 500ms and max 30s. let backoff = Duration::from_millis(500) - .saturating_mul(1 << retry_counter) + .saturating_mul(1 << retry_counter.min(6)) .min(Duration::from_secs(30)); tracing::warn!(