Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5267cbf
chore: decouple ntx builder
SantiagoPittella Jan 9, 2026
2c0a1b2
review: send one account at a time
SantiagoPittella Jan 12, 2026
9ff989d
review: increase channel capacity, remove pagination reference
SantiagoPittella Jan 12, 2026
cd08232
review: return NetworkAccountPrefix
SantiagoPittella Jan 12, 2026
5a58026
review: improve tracing
SantiagoPittella Jan 12, 2026
05909a9
review: add exponential backoff
SantiagoPittella Jan 12, 2026
f15f313
review: remove max iterations
SantiagoPittella Jan 12, 2026
05a1a0f
Merge branch 'next' into santiagopittella-decouple-ntx-builder
SantiagoPittella Jan 13, 2026
3103972
review: reduce channel capacity
SantiagoPittella Jan 14, 2026
ec65239
review: abort on failure
SantiagoPittella Jan 14, 2026
bf5a7b8
fix: exponential backoff overflow
SantiagoPittella Jan 14, 2026
c5e8c7d
review: fix instrumentation
SantiagoPittella Jan 14, 2026
7b2b392
review: add warn! when receiver drops
SantiagoPittella Jan 20, 2026
4769732
review: flatten double result
SantiagoPittella Jan 20, 2026
628ecf0
review: remove Option<Handle>
SantiagoPittella Jan 20, 2026
1cdbf26
review: remove fields from trace
SantiagoPittella Jan 20, 2026
bee1f9e
review: remove parameters from submit_page
SantiagoPittella Jan 20, 2026
aeefdf6
review: log full errors
SantiagoPittella Jan 20, 2026
e76ea6a
review: rename submit and fetch page functions
SantiagoPittella Jan 20, 2026
eed16e9
Merge branch 'next' into santiagopittella-decouple-ntx-builder
SantiagoPittella Jan 20, 2026
190ca56
Merge branch 'next' into santiagopittella-decouple-ntx-builder
SantiagoPittella Jan 21, 2026
9785ea9
review: use _inner approach for errors
SantiagoPittella Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- Added `GetLimits` endpoint to the RPC server ([#1410](https://github.com/0xMiden/miden-node/pull/1410)).
- Added gRPC-Web probe support to the `miden-network-monitor` binary ([#1484](https://github.com/0xMiden/miden-node/pull/1484)).
- Add DB schema change check ([#1268](https://github.com/0xMiden/miden-node/pull/1485)).
- Decoupled ntx-builder from block-producer startup by loading network accounts asynchronously via a background task ([#????](https://github.com/0xMiden/miden-node/pull/????)).

### Changes

Expand Down
3 changes: 0 additions & 3 deletions bin/node/src/commands/block_producer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use miden_node_block_producer::BlockProducer;
use miden_node_utils::grpc::UrlExt;
use tokio::sync::Barrier;
use url::Url;

use super::{ENV_BLOCK_PRODUCER_URL, ENV_STORE_BLOCK_PRODUCER_URL};
Expand Down Expand Up @@ -93,7 +91,6 @@ impl BlockProducerCommand {
block_interval: block_producer.block_interval,
max_txs_per_batch: block_producer.max_txs_per_batch,
max_batches_per_block: block_producer.max_batches_per_block,
production_checkpoint: Arc::new(Barrier::new(1)),
grpc_timeout,
mempool_tx_capacity: block_producer.mempool_tx_capacity,
}
Expand Down
13 changes: 0 additions & 13 deletions bin/node/src/commands/bundled.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
Expand All @@ -14,7 +13,6 @@ use miden_protocol::block::BlockSigner;
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey;
use miden_protocol::utils::Deserializable;
use tokio::net::TcpListener;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use url::Url;

Expand Down Expand Up @@ -222,19 +220,11 @@ impl BundledCommand {
})
.id();

// A sync point between the ntx-builder and block-producer components.
let should_start_ntx_builder = !ntx_builder.disabled;
let checkpoint = if should_start_ntx_builder {
Barrier::new(2)
} else {
Barrier::new(1)
};
let checkpoint = Arc::new(checkpoint);

// Start block-producer. The block-producer's endpoint is available after loading completes.
let block_producer_id = join_set
.spawn({
let checkpoint = Arc::clone(&checkpoint);
let store_url = Url::parse(&format!("http://{store_block_producer_address}"))
.context("Failed to parse URL")?;
let validator_url = Url::parse(&format!("http://{validator_address}"))
Expand All @@ -250,7 +240,6 @@ impl BundledCommand {
block_interval: block_producer.block_interval,
max_batches_per_block: block_producer.max_batches_per_block,
max_txs_per_batch: block_producer.max_txs_per_batch,
production_checkpoint: checkpoint,
grpc_timeout,
mempool_tx_capacity: block_producer.mempool_tx_capacity,
}
Expand Down Expand Up @@ -320,8 +309,6 @@ impl BundledCommand {
store_ntx_builder_url,
block_producer_url,
ntx_builder.tx_prover_url,
ntx_builder.ticker_interval,
checkpoint,
ntx_builder.script_cache_size,
)
.run()
Expand Down
15 changes: 2 additions & 13 deletions crates/block-producer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use miden_protocol::block::BlockNumber;
use miden_protocol::transaction::ProvenTransaction;
use miden_protocol::utils::serde::Deserializable;
use tokio::net::TcpListener;
use tokio::sync::{Barrier, Mutex, RwLock};
use tokio::sync::{Mutex, RwLock};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::Status;
use tower_http::trace::TraceLayer;
Expand Down Expand Up @@ -65,11 +65,6 @@ pub struct BlockProducer {
pub max_txs_per_batch: usize,
/// The maximum number of batches per block.
pub max_batches_per_block: usize,
/// Block production only begins after this checkpoint barrier completes.
///
/// The block-producers gRPC endpoint will be available before this point, so this lets the
/// mempool synchronize its event stream without risking a race condition.
pub production_checkpoint: Arc<Barrier>,
/// Server-side timeout for an individual gRPC request.
///
/// If the handler takes longer than this duration, the server cancels the call.
Expand Down Expand Up @@ -155,12 +150,7 @@ impl BlockProducer {
// any complete or fail, we can shutdown the rest (somewhat) gracefully.
let mut tasks = tokio::task::JoinSet::new();

// Launch the gRPC server and wait at the checkpoint for any other components to be in sync.
//
// This is used to ensure the ntx-builder can subscribe to the mempool events without
// playing catch up caused by block-production.
//
// This is a temporary work-around until the ntx-builder can resync on the fly.
// Launch the gRPC server.
let rpc_id = tasks
.spawn({
let mempool = mempool.clone();
Expand All @@ -171,7 +161,6 @@ impl BlockProducer {
}
})
.id();
self.production_checkpoint.wait().await;

let batch_builder_id = tasks
.spawn({
Expand Down
74 changes: 43 additions & 31 deletions crates/ntx-builder/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use futures::TryStreamExt;
use miden_node_proto::domain::account::NetworkAccountPrefix;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
use miden_protocol::account::AccountId;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::block::BlockHeader;
use miden_protocol::crypto::merkle::mmr::PartialMmr;
use miden_protocol::note::NoteScript;
use miden_protocol::transaction::PartialBlockchain;
use tokio::sync::{Barrier, RwLock};
use tokio::time;
use tokio::sync::{RwLock, mpsc};
use url::Url;

use crate::MAX_IN_PROGRESS_TXS;
Expand Down Expand Up @@ -76,13 +75,6 @@ pub struct NetworkTransactionBuilder {
/// Address of the remote prover. If `None`, transactions will be proven locally, which is
/// undesirable due to the performance impact.
tx_prover_url: Option<Url>,
/// Interval for checking pending notes and executing network transactions.
ticker_interval: Duration,
/// A checkpoint used to sync start-up process with the block-producer.
///
/// This informs the block-producer when we have subscribed to mempool events and that it is
/// safe to begin block-production.
bp_checkpoint: Arc<Barrier>,
/// Shared LRU cache for storing retrieved note scripts to avoid repeated store calls.
/// This cache is shared across all account actors.
script_cache: LruCache<Word, NoteScript>,
Expand All @@ -91,13 +83,16 @@ pub struct NetworkTransactionBuilder {
}

impl NetworkTransactionBuilder {
/// Channel capacity for account loading. Each item in the channel is a batch of account IDs
/// loaded from a single page. A small capacity is sufficient since each page can contain
/// up to ~289,000 accounts.
const ACCOUNT_CHANNEL_CAPACITY: usize = 4;

/// Creates a new instance of the network transaction builder.
pub fn new(
store_url: Url,
block_producer_url: Url,
tx_prover_url: Option<Url>,
ticker_interval: Duration,
bp_checkpoint: Arc<Barrier>,
script_cache_size: NonZeroUsize,
) -> Self {
let script_cache = LruCache::new(script_cache_size);
Expand All @@ -106,8 +101,6 @@ impl NetworkTransactionBuilder {
store_url,
block_producer_url,
tx_prover_url,
ticker_interval,
bp_checkpoint,
script_cache,
coordinator,
}
Expand All @@ -127,15 +120,6 @@ impl NetworkTransactionBuilder {
.await
.context("failed to subscribe to mempool events")?;

// Unlock the block-producer's block production. The block-producer is prevented from
// producing blocks until we have subscribed to mempool events.
//
// This is a temporary work-around until the ntx-builder can resync on the fly.
self.bp_checkpoint.wait().await;

let mut interval = tokio::time::interval(self.ticker_interval);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);

// Create chain state that will be updated by the coordinator and read by actors.
let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));

Expand All @@ -147,15 +131,16 @@ impl NetworkTransactionBuilder {
script_cache: self.script_cache.clone(),
};

// Create initial set of actors based on all known network accounts.
let account_ids = store.get_network_account_ids().await?;
for account_id in account_ids {
if let Ok(account_prefix) = NetworkAccountPrefix::try_from(account_id) {
self.coordinator
.spawn_actor(AccountOrigin::store(account_prefix), &actor_context)
.await?;
// Spawn a background task to load network accounts from the store.
// Accounts are sent through a channel in batches and processed in the main event loop.
let (account_tx, mut account_rx) =
mpsc::channel::<Vec<AccountId>>(Self::ACCOUNT_CHANNEL_CAPACITY);
let account_loader_store = store.clone();
tokio::spawn(async move {
if let Err(err) = account_loader_store.stream_network_account_ids(account_tx).await {
tracing::error!(%err, "failed to load network accounts from store");
}
}
});

// Main loop which manages actors and passes mempool events to them.
loop {
Expand All @@ -176,8 +161,35 @@ impl NetworkTransactionBuilder {
chain_state.clone(),
).await?;
},
// Handle account batches loaded from the store.
// Once all accounts are loaded, the channel closes and this branch
// becomes inactive (recv returns None and we stop matching).
Some(account_ids) = account_rx.recv() => {
self.handle_loaded_accounts(account_ids, &actor_context).await?;
},
}
}
}

/// Handles a batch of account IDs loaded from the store by spawning actors for them.
#[tracing::instrument(
name = "ntx.builder.handle_loaded_accounts",
skip(self, account_ids, actor_context),
fields(count = account_ids.len())
)]
async fn handle_loaded_accounts(
&mut self,
account_ids: Vec<AccountId>,
actor_context: &AccountActorContext,
) -> Result<(), anyhow::Error> {
for account_id in account_ids {
if let Ok(account_prefix) = NetworkAccountPrefix::try_from(account_id) {
self.coordinator
.spawn_actor(AccountOrigin::store(account_prefix), actor_context)
.await?;
}
}
Ok(())
}

/// Handles mempool events by sending them to actors via the coordinator and/or spawning new
Expand Down
35 changes: 22 additions & 13 deletions crates/ntx-builder/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,24 @@ impl StoreClient {
Ok(all_notes)
}

/// Get all network account IDs.
/// Streams network account IDs to the provided sender, page-by-page.
///
/// Since the `GetNetworkAccountIds` method is paginated, we loop through all pages until we
/// reach the end.
/// This method is designed to be run in a background task, sending accounts to the main event
/// loop as they are loaded. This allows the ntx-builder to start processing mempool events
/// without waiting for all accounts to be preloaded.
///
/// Each page can return up to `MAX_RESPONSE_PAYLOAD_BYTES / AccountId::SERIALIZED_SIZE`
/// accounts (~289,000). With `100_000` iterations, which is assumed to be sufficient for the
/// foreseeable future.
#[instrument(target = COMPONENT, name = "store.client.get_network_account_ids", skip_all, err)]
pub async fn get_network_account_ids(&self) -> Result<Vec<AccountId>, StoreError> {
const MAX_ITERATIONS: u32 = 100_000;
/// accounts (~289,000). With 1000 iterations, this supports up to ~524 million network
/// accounts, which is assumed to be sufficient for the foreseeable future.
#[instrument(target = COMPONENT, name = "store.client.stream_network_account_ids", skip_all, err)]
pub async fn stream_network_account_ids(
&self,
sender: tokio::sync::mpsc::Sender<Vec<AccountId>>,
) -> Result<(), StoreError> {
const MAX_ITERATIONS: u32 = 1000;

let mut block_range = BlockNumber::from(0)..=BlockNumber::from(u32::MAX);

let mut ids = Vec::new();
let mut iterations_count = 0;

loop {
Expand All @@ -198,14 +201,14 @@ impl StoreClient {
.await?
.into_inner();

let accounts: Result<Vec<AccountId>, ConversionError> = response
let accounts = response
.account_ids
.into_iter()
.map(|account_id| {
AccountId::read_from_bytes(&account_id.id)
.map_err(|err| ConversionError::deserialization_error("account_id", err))
})
.collect();
.collect::<Result<Vec<AccountId>, ConversionError>>()?;

let pagination_info = response.pagination_info.ok_or(
ConversionError::MissingFieldInProtobufRepresentation {
Expand All @@ -214,7 +217,13 @@ impl StoreClient {
},
)?;

ids.extend(accounts?);
if !accounts.is_empty() {
// Send the entire batch at once. If the receiver is dropped, stop loading.
if sender.send(accounts).await.is_err() {
return Ok(());
}
}

iterations_count += 1;
block_range =
BlockNumber::from(pagination_info.block_num)..=BlockNumber::from(u32::MAX);
Expand All @@ -228,7 +237,7 @@ impl StoreClient {
}
}

Ok(ids)
Ok(())
}

#[instrument(target = COMPONENT, name = "store.client.get_note_script_by_root", skip_all, err)]
Expand Down