diff --git a/CHANGELOG.md b/CHANGELOG.md index 84b1105c1..ac1e1c455 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,7 @@ - Add #[track_caller] to tracing/logging helpers ([#1651](https://github.com/0xMiden/node/pull/1651)). - Added support for generic account loading at genesis ([#1624](https://github.com/0xMiden/node/pull/1624)). - Improved tracing span fields ([#1650](https://github.com/0xMiden/node/pull/1650)) - - Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)). +- Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)). - [BREAKING] Reworked `miden-remote-prover`, removing the `worker`/`proxy` distinction and simplifying to a `worker` with a request queue ([#1688](https://github.com/0xMiden/node/pull/1688)). - [BREAKING] Renamed `NoteRoot` protobuf message used in `GetNoteScriptByRoot` gRPC endpoints into `NoteScriptRoot` ([#1722](https://github.com/0xMiden/node/pull/1722)). - NTX Builder actors now deactivate after being idle for a configurable idle timeout (`--ntx-builder.idle-timeout`, default 5 min) and are re-activated when new notes target their account ([#1705](https://github.com/0xMiden/node/pull/1705)). @@ -42,6 +42,7 @@ - NTX Builder now deactivates network accounts which crash repeatedly (configurable via `--ntx-builder.max-account-crashes`, default 10) ([#1712](https://github.com/0xMiden/miden-node/pull/1712)). - Removed gRPC reflection v1-alpha support ([#1795](https://github.com/0xMiden/node/pull/1795)). - [BREAKING] Rust requirement bumped from `v1.91` to `v1.93` ([#1803](https://github.com/0xMiden/node/pull/1803)). +- [BREAKING] Updated `SyncNotes` endpoint to returned multiple note updates ([#1809](https://github.com/0xMiden/node/issues/1809), ([#1851](https://github.com/0xMiden/node/pull/1851))). - [BREAKING] Refactored `NoteSyncRecord` to returned a fixed-size `NoteMetadataHeader` ([#1837](https://github.com/0xMiden/node/pull/1837)). ### Fixes diff --git a/bin/stress-test/src/store/mod.rs b/bin/stress-test/src/store/mod.rs index bb2d9b477..60ecba3b1 100644 --- a/bin/stress-test/src/store/mod.rs +++ b/bin/stress-test/src/store/mod.rs @@ -146,15 +146,25 @@ pub async fn bench_sync_nullifiers( }; let response = store_client.sync_notes(sync_request).await.unwrap().into_inner(); - let note_ids = response - .notes + let resp_block_range = response.block_range.expect("block_range should exist"); + let resp_chain_tip = resp_block_range.block_to.expect("block_to should exist"); + + if response.blocks.is_empty() { + break; + } + + // Collect note IDs from all blocks in the response. + let note_ids: Vec<_> = response + .blocks .iter() - .map(|n| n.inclusion_proof.as_ref().unwrap().note_id.unwrap()) - .collect::>(); + .flat_map(|b| { + b.notes.iter().map(|n| n.inclusion_proof.as_ref().unwrap().note_id.unwrap()) + }) + .collect(); - // Get the notes nullifiers, limiting to 20 notes maximum - let note_ids_to_fetch = - note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect::>(); + // Get the notes nullifiers, limiting to 20 notes maximum. + let note_ids_to_fetch: Vec<_> = + note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect(); if !note_ids_to_fetch.is_empty() { let notes = store_client .get_notes_by_id(proto::note::NoteIdList { ids: note_ids_to_fetch }) @@ -163,23 +173,18 @@ pub async fn bench_sync_nullifiers( .into_inner() .notes; - nullifier_prefixes.extend( - notes - .iter() - .filter_map(|n| { - // Private notes are filtered out because `n.details` is None - let details_bytes = n.note.as_ref()?.details.as_ref()?; - let details = NoteDetails::read_from_bytes(details_bytes).unwrap(); - Some(u32::from(details.nullifier().prefix())) - }) - .collect::>(), - ); + nullifier_prefixes.extend(notes.iter().filter_map(|n| { + let details_bytes = n.note.as_ref()?.details.as_ref()?; + let details = NoteDetails::read_from_bytes(details_bytes).unwrap(); + Some(u32::from(details.nullifier().prefix())) + })); } - // Update block number from pagination info - let pagination_info = response.pagination_info.expect("pagination_info should exist"); - current_block_num = pagination_info.block_num; - if pagination_info.chain_tip == current_block_num { + // Advance past the last block in the response. + let last_block = response.blocks.last().unwrap(); + current_block_num = + last_block.block_header.as_ref().map_or(resp_chain_tip, |h| h.block_num); + if current_block_num >= resp_chain_tip { break; } } diff --git a/crates/rpc/README.md b/crates/rpc/README.md index bfa790910..0e3db54a1 100644 --- a/crates/rpc/README.md +++ b/crates/rpc/README.md @@ -196,11 +196,9 @@ Returns info which can be used by the client to sync up to the tip of chain for **Limits:** `note_tag` (1000) -Client specifies the `note_tags` they are interested in, and the block range from which to search for matching notes. The request will then return the next block containing any note matching the provided tags within the specified range. +Client specifies the `note_tags` they are interested in, and the block range to search. The response contains all blocks with matching notes that fit within the response payload limit, along with each note's metadata, inclusion proof, and MMR authentication path. -The response includes each note's metadata and inclusion proof. - -A basic note sync can be implemented by repeatedly requesting the previous response's block until reaching the tip of the chain. +If `response.block_range.block_to` is less than the requested range end, make another request starting from `response.block_range.block_to + 1` to continue syncing. #### Error Handling diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 2e2f32949..b00faa106 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::mem::size_of; use std::ops::{Deref, DerefMut, RangeInclusive}; use std::path::PathBuf; +use std::sync::Arc; use anyhow::Context; use diesel::{Connection, QueryableByName, RunQueryDsl, SqliteConnection}; @@ -496,10 +497,10 @@ impl Db { pub async fn get_note_sync( &self, block_range: RangeInclusive, - note_tags: Vec, - ) -> Result<(NoteSyncUpdate, BlockNumber), NoteSyncError> { + note_tags: Arc<[u32]>, + ) -> Result, NoteSyncError> { self.transact("notes sync task", move |conn| { - queries::get_note_sync(conn, note_tags.as_slice(), block_range) + queries::get_note_sync(conn, ¬e_tags, block_range) }) .await } diff --git a/crates/store/src/db/models/queries/notes.rs b/crates/store/src/db/models/queries/notes.rs index b05ae4fee..933df6886 100644 --- a/crates/store/src/db/models/queries/notes.rs +++ b/crates/store/src/db/models/queries/notes.rs @@ -518,16 +518,19 @@ pub(crate) fn get_note_sync( conn: &mut SqliteConnection, note_tags: &[u32], block_range: RangeInclusive, -) -> Result<(NoteSyncUpdate, BlockNumber), NoteSyncError> { +) -> Result, NoteSyncError> { QueryParamNoteTagLimit::check(note_tags.len()).map_err(DatabaseError::from)?; - let (notes, last_included_block) = - select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?; + let (notes, _) = select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?; + + if notes.is_empty() { + return Ok(None); + } let block_header = select_block_header_by_block_num(conn, notes.first().map(|note| note.block_num))? .ok_or(NoteSyncError::EmptyBlockHeadersTable)?; - Ok((NoteSyncUpdate { notes, block_header }, last_included_block)) + Ok(Some(NoteSyncUpdate { notes, block_header })) } #[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)] diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index f3dd1c1d2..75a68db2f 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -912,6 +912,117 @@ fn notes() { assert_eq!(note_1.details, None); } +/// Creates notes across 3 blocks with different tags, then iterates +/// `select_notes_since_block_by_tag_and_sender` advancing the cursor each time, +/// verifying that each call returns the next block's notes. +#[test] +#[miden_node_test_macro::enable_logging] +fn note_sync_across_multiple_blocks() { + let mut conn = create_db(); + let conn = &mut conn; + + let sender = AccountId::try_from(ACCOUNT_ID_PRIVATE_SENDER).unwrap(); + + // Create 3 blocks with notes. + let tag = 42u32; + let note_index = BlockNoteIndex::new(0, 0).unwrap(); + + for block_num_raw in 1..=3u32 { + let block_num = BlockNumber::from(block_num_raw); + create_block(conn, block_num); + queries::upsert_accounts( + conn, + &[mock_block_account_update(sender, block_num_raw.into())], + block_num, + ) + .unwrap(); + + let new_note = create_note(sender); + let note_metadata = NoteMetadata::new(sender, NoteType::Public).with_tag(tag.into()); + let values = [(note_index, new_note.id(), ¬e_metadata)]; + let notes_db = BlockNoteTree::with_entries(values).unwrap(); + let inclusion_path = notes_db.open(note_index); + + let note = NoteRecord { + block_num, + note_index, + note_id: new_note.id().as_word(), + note_commitment: new_note.commitment(), + metadata: note_metadata, + details: Some(NoteDetails::from(&new_note)), + inclusion_path, + }; + queries::insert_scripts(conn, [¬e]).unwrap(); + queries::insert_notes(conn, &[(note, None)]).unwrap(); + } + + // Simulate the store batching loop: repeatedly call get_note_sync with advancing + // block_from, same as `State::sync_notes` does. + let mut collected_block_nums = Vec::new(); + let mut current_from = BlockNumber::GENESIS; + let end = BlockNumber::from(3); + + loop { + let range = current_from..=end; + let Some(update) = queries::get_note_sync(conn, &[tag], range).unwrap() else { + break; + }; + + // All notes in a single response come from the same block. + let block_num = update.block_header.block_num(); + assert!(update.notes.iter().all(|n| n.block_num == block_num)); + collected_block_nums.push(block_num); + + // The query uses `committed_at > block_range.start()` (exclusive), so + // advancing to the found block_num is sufficient to skip it. + current_from = block_num; + } + + assert_eq!( + collected_block_nums, + vec![BlockNumber::from(1), BlockNumber::from(2), BlockNumber::from(3)], + "should iterate through all 3 blocks with matching notes" + ); +} + +/// Tests that note sync returns an empty result when no notes match the requested tags. +#[test] +#[miden_node_test_macro::enable_logging] +fn note_sync_no_matching_tags() { + let mut conn = create_db(); + let conn = &mut conn; + + let sender = AccountId::try_from(ACCOUNT_ID_PRIVATE_SENDER).unwrap(); + let block_num = BlockNumber::from(1); + create_block(conn, block_num); + queries::upsert_accounts(conn, &[mock_block_account_update(sender, 0)], block_num).unwrap(); + + // Insert a note with tag 10. + let new_note = create_note(sender); + let note_index = BlockNoteIndex::new(0, 0).unwrap(); + let note_metadata = NoteMetadata::new(sender, NoteType::Public).with_tag(10u32.into()); + let values = [(note_index, new_note.id(), ¬e_metadata)]; + let notes_db = BlockNoteTree::with_entries(values).unwrap(); + let inclusion_path = notes_db.open(note_index); + + let note = NoteRecord { + block_num, + note_index, + note_id: new_note.id().as_word(), + note_commitment: new_note.commitment(), + metadata: note_metadata, + details: Some(NoteDetails::from(&new_note)), + inclusion_path, + }; + queries::insert_scripts(conn, [¬e]).unwrap(); + queries::insert_notes(conn, &[(note, None)]).unwrap(); + + // Query with a different tag — should return None. + let range = BlockNumber::GENESIS..=BlockNumber::from(1); + let result = queries::get_note_sync(conn, &[999], range).unwrap(); + assert!(result.is_none()); +} + fn insert_account_delta( conn: &mut SqliteConnection, account_id: AccountId, diff --git a/crates/store/src/server/rpc_api.rs b/crates/store/src/server/rpc_api.rs index 2e7087682..59b72b2e9 100644 --- a/crates/store/src/server/rpc_api.rs +++ b/crates/store/src/server/rpc_api.rs @@ -133,23 +133,31 @@ impl rpc_server::Rpc for StoreApi { let block_range = read_block_range::(request.block_range, "SyncNotesRequest")? .into_inclusive_range::(&chain_tip)?; + let block_from = block_range.start(); + // Clamp block_to to the chain tip to avoid erroring when opening the MMR proof + let block_to = block_range.end().min(&chain_tip); + let clamped_block_range = *block_from..=*block_to; // Validate note tags count check::(request.note_tags.len())?; - let (state, mmr_proof, last_block_included) = - self.state.sync_notes(request.note_tags, block_range).await?; + let results = self.state.sync_notes(request.note_tags, clamped_block_range).await?; - let notes = state.notes.into_iter().map(Into::into).collect(); + let blocks = results + .into_iter() + .map(|(state, mmr_proof)| proto::rpc::sync_notes_response::NoteSyncBlock { + block_header: Some(state.block_header.into()), + mmr_path: mmr_proof.map(|proof| proof.merkle_path().clone().into()), + notes: state.notes.into_iter().map(Into::into).collect(), + }) + .collect(); Ok(Response::new(proto::rpc::SyncNotesResponse { - pagination_info: Some(proto::rpc::PaginationInfo { - chain_tip: chain_tip.as_u32(), - block_num: last_block_included.as_u32(), + block_range: Some(proto::rpc::BlockRange { + block_from: block_from.as_u32(), + block_to: Some(block_to.as_u32()), }), - block_header: Some(state.block_header.into()), - mmr_path: Some(mmr_proof.merkle_path().clone().into()), - notes, + blocks, })) } diff --git a/crates/store/src/state/sync_state.rs b/crates/store/src/state/sync_state.rs index afb5212f7..ccbf1ad36 100644 --- a/crates/store/src/state/sync_state.rs +++ b/crates/store/src/state/sync_state.rs @@ -1,5 +1,7 @@ use std::ops::RangeInclusive; +use std::sync::Arc; +use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::crypto::merkle::mmr::{Forest, MmrDelta, MmrProof}; @@ -11,6 +13,17 @@ use crate::db::models::queries::StorageMapValuesPage; use crate::db::{AccountVaultValue, NoteSyncUpdate, NullifierInfo}; use crate::errors::{DatabaseError, NoteSyncError, StateSyncError}; +/// Estimated byte size of a [`NoteSyncBlock`] excluding its notes. +/// +/// `BlockHeader` (~341 bytes) + MMR proof with 32 siblings (~1216 bytes). +const BLOCK_OVERHEAD_BYTES: usize = 1600; + +/// Estimated byte size of a single [`NoteSyncRecord`]. +/// +/// Note ID (~38 bytes) + index + metadata (~26 bytes) + sparse merkle path with 16 +/// siblings (~608 bytes). +const NOTE_RECORD_BYTES: usize = 700; + // STATE SYNCHRONIZATION ENDPOINTS // ================================================================================================ @@ -64,29 +77,51 @@ impl State { /// Loads data to synchronize a client's notes. /// - /// The client's request contains a list of tags, this method will return the first - /// block with a matching tag, or the chain tip. All the other values are filter based on this - /// block range. - /// - /// # Arguments - /// - /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the - /// first block containing a matching note. - /// - `block_range`: The range of blocks from which to synchronize notes. + /// Returns as many blocks with matching notes as fit within the response payload limit + /// ([`MAX_RESPONSE_PAYLOAD_BYTES`](miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES)). + /// Each block includes its header and MMR proof at `block_range.end()`. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn sync_notes( &self, note_tags: Vec, block_range: RangeInclusive, - ) -> Result<(NoteSyncUpdate, MmrProof, BlockNumber), NoteSyncError> { - let inner = self.inner.read().await; + ) -> Result)>, NoteSyncError> { + let checkpoint = *block_range.end(); + let note_tags: Arc<[u32]> = note_tags.into(); + + let mut results = Vec::new(); + let mut accumulated_size: usize = 0; + let mut current_from = *block_range.start(); - let (note_sync, last_included_block) = - self.db.get_note_sync(block_range, note_tags).await?; + loop { + let range = current_from..=checkpoint; + let Some(note_sync) = self.db.get_note_sync(range, Arc::clone(¬e_tags)).await? + else { + break; + }; - let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num())?; + accumulated_size += BLOCK_OVERHEAD_BYTES + note_sync.notes.len() * NOTE_RECORD_BYTES; + + if !results.is_empty() && accumulated_size > MAX_RESPONSE_PAYLOAD_BYTES { + break; + } + + let block_num = note_sync.block_header.block_num(); + + // The MMR at `checkpoint` contains proofs for blocks 0..checkpoint-1. When we reach the + // last block, we return the note without a MMR proof. + if block_num >= checkpoint { + results.push((note_sync, None)); + break; + } + + let mmr_proof = self.inner.read().await.blockchain.open_at(block_num, checkpoint)?; + results.push((note_sync, Some(mmr_proof))); + + current_from = block_num; + } - Ok((note_sync, mmr_proof, last_included_block)) + Ok(results) } pub async fn sync_nullifiers( diff --git a/proto/proto/internal/store.proto b/proto/proto/internal/store.proto index 311c1482d..7de72ef0d 100644 --- a/proto/proto/internal/store.proto +++ b/proto/proto/internal/store.proto @@ -52,15 +52,9 @@ service Rpc { // Note that only 16-bit prefixes are supported at this time. rpc SyncNullifiers(rpc.SyncNullifiersRequest) returns (rpc.SyncNullifiersResponse) {} - // Returns info which can be used by the requester to sync up to the tip of chain for the notes they are interested in. + // Returns blocks containing notes matching the requested tags within the given range. // - // requester specifies the `note_tags` they are interested in, and the block height from which to search for new for - // matching notes for. The request will then return the next block containing any note matching the provided tags. - // - // The response includes each note's metadata and inclusion proof. - // - // A basic note sync can be implemented by repeatedly requesting the previous response's block until reaching the - // tip of the chain. + // The response batches as many blocks as fit within the response payload limit. rpc SyncNotes(rpc.SyncNotesRequest) returns (rpc.SyncNotesResponse) {} // Returns chain MMR updates within a block range. diff --git a/proto/proto/rpc.proto b/proto/proto/rpc.proto index 2bb6178e7..c0f912791 100644 --- a/proto/proto/rpc.proto +++ b/proto/proto/rpc.proto @@ -82,14 +82,12 @@ service Api { // Returns info which can be used by the client to sync up to the tip of chain for the notes // they are interested in. // - // Client specifies the `note_tags` they are interested in, and the block height from which to - // search for new for matching notes for. The request will then return the next block containing - // any note matching the provided tags. + // Client specifies the `note_tags` they are interested in, and the block range to search. + // The response contains all blocks with matching notes that fit within the response payload + // limit, along with each note's metadata and inclusion proof. // - // The response includes each note's metadata and inclusion proof. - // - // A basic note sync can be implemented by repeatedly requesting the previous response's block - // until reaching the tip of the chain. + // If `response.block_range.block_to` is less than the requested range end, make another + // request starting from `response.block_range.block_to + 1` to continue syncing. rpc SyncNotes(SyncNotesRequest) returns (SyncNotesResponse) {} // Returns a list of nullifiers that match the specified prefixes and are recorded in the node. @@ -461,8 +459,9 @@ message AccountVaultUpdate { // Note synchronization request. // -// Specifies note tags that requester is interested in. The server will return the first block which -// contains a note matching `note_tags` or the chain tip. +// Specifies note tags that the requester is interested in. The server will return blocks +// containing notes matching `note_tags` within the specified block range, batching as many +// blocks as fit within the response payload limit. message SyncNotesRequest { // Block range from which to start synchronizing. BlockRange block_range = 1; @@ -473,20 +472,33 @@ message SyncNotesRequest { // Represents the result of syncing notes request. message SyncNotesResponse { - // Pagination information. - PaginationInfo pagination_info = 1; + // A single block's worth of note sync data. + message NoteSyncBlock { + // Block header of the block containing the matching notes. + blockchain.BlockHeader block_header = 1; - // Block header of the block with the first note matching the specified criteria. - blockchain.BlockHeader block_header = 2; + // Merkle path to verify the block's inclusion in the MMR at the returned + // `block_range.block_to`. + // + // An MMR proof can be constructed for the leaf of index `block_header.block_num` of + // an MMR of forest `block_range.block_to` with this path. + primitives.MerklePath mmr_path = 2; - // Merkle path to verify the block's inclusion in the MMR at the returned `chain_tip`. + // List of notes matching the specified criteria in this block, together with the + // Merkle paths from `block_header.note_root`. + repeated note.NoteSyncRecord notes = 3; + } + + // The block range covered by this response. // - // An MMR proof can be constructed for the leaf of index `block_header.block_num` of - // an MMR of forest `chain_tip` with this path. - primitives.MerklePath mmr_path = 3; + // `block_from` matches the request's `block_range.block_from`. + // `block_to` matches the request's `block_range.block_to`, or the chain tip if it was + // not specified, or if the requested block number is beyond the tip. + BlockRange block_range = 1; - // List of all notes together with the Merkle paths from `response.block_header.note_root`. - repeated note.NoteSyncRecord notes = 4; + // Blocks containing matching notes, ordered by block number ascending. + // May be empty if no notes matched in the range. + repeated NoteSyncBlock blocks = 2; } // SYNC CHAIN MMR