Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- Added `GetLimits` endpoint to the RPC server ([#1410](https://github.com/0xMiden/miden-node/pull/1410)).
- Added gRPC-Web probe support to the `miden-network-monitor` binary ([#1484](https://github.com/0xMiden/miden-node/pull/1484)).
- Add DB schema change check ([#1268](https://github.com/0xMiden/miden-node/pull/1485)).
- Decoupled ntx-builder from block-producer startup by loading network accounts asynchronously via a background task ([#????](https://github.com/0xMiden/miden-node/pull/????)).
- Improve DB query performance for account queries ([#1496](https://github.com/0xMiden/miden-node/pull/1496).

### Changes
Expand Down
3 changes: 0 additions & 3 deletions bin/node/src/commands/block_producer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use miden_node_block_producer::BlockProducer;
use miden_node_utils::grpc::UrlExt;
use tokio::sync::Barrier;
use url::Url;

use super::{ENV_BLOCK_PRODUCER_URL, ENV_STORE_BLOCK_PRODUCER_URL};
Expand Down Expand Up @@ -93,7 +91,6 @@ impl BlockProducerCommand {
block_interval: block_producer.block_interval,
max_txs_per_batch: block_producer.max_txs_per_batch,
max_batches_per_block: block_producer.max_batches_per_block,
production_checkpoint: Arc::new(Barrier::new(1)),
grpc_timeout,
mempool_tx_capacity: block_producer.mempool_tx_capacity,
}
Expand Down
13 changes: 0 additions & 13 deletions bin/node/src/commands/bundled.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
Expand All @@ -14,7 +13,6 @@ use miden_protocol::block::BlockSigner;
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey;
use miden_protocol::utils::Deserializable;
use tokio::net::TcpListener;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use url::Url;

Expand Down Expand Up @@ -222,19 +220,11 @@ impl BundledCommand {
})
.id();

// A sync point between the ntx-builder and block-producer components.
let should_start_ntx_builder = !ntx_builder.disabled;
let checkpoint = if should_start_ntx_builder {
Barrier::new(2)
} else {
Barrier::new(1)
};
let checkpoint = Arc::new(checkpoint);

// Start block-producer. The block-producer's endpoint is available after loading completes.
let block_producer_id = join_set
.spawn({
let checkpoint = Arc::clone(&checkpoint);
let store_url = Url::parse(&format!("http://{store_block_producer_address}"))
.context("Failed to parse URL")?;
let validator_url = Url::parse(&format!("http://{validator_address}"))
Expand All @@ -250,7 +240,6 @@ impl BundledCommand {
block_interval: block_producer.block_interval,
max_batches_per_block: block_producer.max_batches_per_block,
max_txs_per_batch: block_producer.max_txs_per_batch,
production_checkpoint: checkpoint,
grpc_timeout,
mempool_tx_capacity: block_producer.mempool_tx_capacity,
}
Expand Down Expand Up @@ -320,8 +309,6 @@ impl BundledCommand {
store_ntx_builder_url,
block_producer_url,
ntx_builder.tx_prover_url,
ntx_builder.ticker_interval,
checkpoint,
ntx_builder.script_cache_size,
)
.run()
Expand Down
15 changes: 2 additions & 13 deletions crates/block-producer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use miden_protocol::block::BlockNumber;
use miden_protocol::transaction::ProvenTransaction;
use miden_protocol::utils::serde::Deserializable;
use tokio::net::TcpListener;
use tokio::sync::{Barrier, Mutex, RwLock};
use tokio::sync::{Mutex, RwLock};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::Status;
use tower_http::trace::TraceLayer;
Expand Down Expand Up @@ -65,11 +65,6 @@ pub struct BlockProducer {
pub max_txs_per_batch: usize,
/// The maximum number of batches per block.
pub max_batches_per_block: usize,
/// Block production only begins after this checkpoint barrier completes.
///
/// The block-producers gRPC endpoint will be available before this point, so this lets the
/// mempool synchronize its event stream without risking a race condition.
pub production_checkpoint: Arc<Barrier>,
/// Server-side timeout for an individual gRPC request.
///
/// If the handler takes longer than this duration, the server cancels the call.
Expand Down Expand Up @@ -155,12 +150,7 @@ impl BlockProducer {
// any complete or fail, we can shutdown the rest (somewhat) gracefully.
let mut tasks = tokio::task::JoinSet::new();

// Launch the gRPC server and wait at the checkpoint for any other components to be in sync.
//
// This is used to ensure the ntx-builder can subscribe to the mempool events without
// playing catch up caused by block-production.
//
// This is a temporary work-around until the ntx-builder can resync on the fly.
// Launch the gRPC server.
let rpc_id = tasks
.spawn({
let mempool = mempool.clone();
Expand All @@ -171,7 +161,6 @@ impl BlockProducer {
}
})
.id();
self.production_checkpoint.wait().await;

let batch_builder_id = tasks
.spawn({
Expand Down
4 changes: 2 additions & 2 deletions crates/ntx-builder/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl BlockProducerClient {
loop {
match self.subscribe_to_mempool(chain_tip).await {
Err(err) if err.code() == tonic::Code::Unavailable => {
// exponential backoff with base 500ms and max 30s
// Exponential backoff with base 500ms and max 30s.
let backoff = Duration::from_millis(500)
.saturating_mul(1 << retry_counter)
.saturating_mul(1 << retry_counter.min(6))
.min(Duration::from_secs(30));

tracing::warn!(
Expand Down
82 changes: 50 additions & 32 deletions crates/ntx-builder/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use futures::TryStreamExt;
Expand All @@ -13,8 +12,7 @@ use miden_protocol::block::BlockHeader;
use miden_protocol::crypto::merkle::mmr::PartialMmr;
use miden_protocol::note::NoteScript;
use miden_protocol::transaction::PartialBlockchain;
use tokio::sync::{Barrier, RwLock};
use tokio::time;
use tokio::sync::{RwLock, mpsc};
use url::Url;

use crate::MAX_IN_PROGRESS_TXS;
Expand Down Expand Up @@ -76,13 +74,6 @@ pub struct NetworkTransactionBuilder {
/// Address of the remote prover. If `None`, transactions will be proven locally, which is
/// undesirable due to the performance impact.
tx_prover_url: Option<Url>,
/// Interval for checking pending notes and executing network transactions.
ticker_interval: Duration,
/// A checkpoint used to sync start-up process with the block-producer.
///
/// This informs the block-producer when we have subscribed to mempool events and that it is
/// safe to begin block-production.
bp_checkpoint: Arc<Barrier>,
/// Shared LRU cache for storing retrieved note scripts to avoid repeated store calls.
/// This cache is shared across all account actors.
script_cache: LruCache<Word, NoteScript>,
Expand All @@ -91,13 +82,14 @@ pub struct NetworkTransactionBuilder {
}

impl NetworkTransactionBuilder {
/// Channel capacity for account loading.
const ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;

/// Creates a new instance of the network transaction builder.
pub fn new(
store_url: Url,
block_producer_url: Url,
tx_prover_url: Option<Url>,
ticker_interval: Duration,
bp_checkpoint: Arc<Barrier>,
script_cache_size: NonZeroUsize,
) -> Self {
let script_cache = LruCache::new(script_cache_size);
Expand All @@ -106,8 +98,6 @@ impl NetworkTransactionBuilder {
store_url,
block_producer_url,
tx_prover_url,
ticker_interval,
bp_checkpoint,
script_cache,
coordinator,
}
Expand All @@ -127,15 +117,6 @@ impl NetworkTransactionBuilder {
.await
.context("failed to subscribe to mempool events")?;

// Unlock the block-producer's block production. The block-producer is prevented from
// producing blocks until we have subscribed to mempool events.
//
// This is a temporary work-around until the ntx-builder can resync on the fly.
self.bp_checkpoint.wait().await;

let mut interval = tokio::time::interval(self.ticker_interval);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);

// Create chain state that will be updated by the coordinator and read by actors.
let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));

Expand All @@ -147,15 +128,18 @@ impl NetworkTransactionBuilder {
script_cache: self.script_cache.clone(),
};

// Create initial set of actors based on all known network accounts.
let account_ids = store.get_network_account_ids().await?;
for account_id in account_ids {
if let Ok(account_prefix) = NetworkAccountPrefix::try_from(account_id) {
self.coordinator
.spawn_actor(AccountOrigin::store(account_prefix), &actor_context)
.await?;
}
}
// Spawn a background task to load network accounts from the store.
// Accounts are sent through a channel in batches and processed in the main event loop.
let (account_tx, mut account_rx) =
mpsc::channel::<NetworkAccountPrefix>(Self::ACCOUNT_CHANNEL_CAPACITY);
let account_loader_store = store.clone();
let account_loader_handle = tokio::spawn(async move {
account_loader_store
.stream_network_account_ids(account_tx)
.await
.context("failed to load network accounts from store")
});
let mut account_loader = Some(account_loader_handle);

// Main loop which manages actors and passes mempool events to them.
loop {
Expand All @@ -176,10 +160,44 @@ impl NetworkTransactionBuilder {
chain_state.clone(),
).await?;
},
// Handle account batches loaded from the store.
// Once all accounts are loaded, the channel closes and this branch
// becomes inactive (recv returns None and we stop matching).
Some(account_id) = account_rx.recv() => {
self.handle_loaded_account(account_id, &actor_context).await?;
},
// Handle account loader task completion/failure.
// If the task fails, we abort since the builder would be in a degraded state
// where existing notes against network accounts won't be processed.
Some(result) = async {
match account_loader.as_mut() {
Some(handle) => Some(handle.await),
None => std::future::pending().await,
}
} => {
account_loader = None;
result.context("account loader task panicked")??;
},
}
}
}

/// Handles a batch of account IDs loaded from the store by spawning actors for them.
#[tracing::instrument(
name = "ntx.builder.handle_loaded_accounts",
skip(self, account_prefix, actor_context)
)]
async fn handle_loaded_account(
&mut self,
account_prefix: NetworkAccountPrefix,
actor_context: &AccountActorContext,
) -> Result<(), anyhow::Error> {
self.coordinator
.spawn_actor(AccountOrigin::store(account_prefix), actor_context)
.await?;
Ok(())
}

/// Handles mempool events by sending them to actors via the coordinator and/or spawning new
/// actors as required.
#[tracing::instrument(
Expand Down
Loading