Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
- Add #[track_caller] to tracing/logging helpers ([#1651](https://github.com/0xMiden/node/pull/1651)).
- Added support for generic account loading at genesis ([#1624](https://github.com/0xMiden/node/pull/1624)).
- Improved tracing span fields ([#1650](https://github.com/0xMiden/node/pull/1650))
- Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)).
- Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)).
- [BREAKING] Reworked `miden-remote-prover`, removing the `worker`/`proxy` distinction and simplifying to a `worker` with a request queue ([#1688](https://github.com/0xMiden/node/pull/1688)).
- [BREAKING] Renamed `NoteRoot` protobuf message used in `GetNoteScriptByRoot` gRPC endpoints into `NoteScriptRoot` ([#1722](https://github.com/0xMiden/node/pull/1722)).
- NTX Builder actors now deactivate after being idle for a configurable idle timeout (`--ntx-builder.idle-timeout`, default 5 min) and are re-activated when new notes target their account ([#1705](https://github.com/0xMiden/node/pull/1705)).
Expand All @@ -41,6 +41,7 @@
- NTX Builder now deactivates network accounts which crash repeatedly (configurable via `--ntx-builder.max-account-crashes`, default 10) ([#1712](https://github.com/0xMiden/miden-node/pull/1712)).
- Removed gRPC reflection v1-alpha support ([#1795](https://github.com/0xMiden/node/pull/1795)).
- [BREAKING] Rust requirement bumped from `v1.91` to `v1.93` ([#1803](https://github.com/0xMiden/node/pull/1803)).
- [BREAKING] Updated `SyncNotes` endpoint to returned multiple note updates ([#1809](https://github.com/0xMiden/node/issues/1809)).

### Fixes

Expand Down
39 changes: 17 additions & 22 deletions bin/stress-test/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,17 @@ pub async fn bench_sync_nullifiers(
};
let response = store_client.sync_notes(sync_request).await.unwrap().into_inner();

let note_ids = response
.notes
.iter()
.map(|n| n.note_id.unwrap())
.collect::<Vec<proto::note::NoteId>>();
let resp_block_range = response.block_range.expect("block_range should exist");
let resp_chain_tip = resp_block_range.block_to.expect("block_to should exist");

if response.notes.is_empty() {
break;
}

// Get the notes nullifiers, limiting to 20 notes maximum
let note_ids_to_fetch =
note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect::<Vec<_>>();
let note_ids: Vec<_> = response.notes.iter().map(|n| n.note_id.unwrap()).collect();
let note_ids_to_fetch: Vec<_> =
note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect();
if !note_ids_to_fetch.is_empty() {
let notes = store_client
.get_notes_by_id(proto::note::NoteIdList { ids: note_ids_to_fetch })
Expand All @@ -163,23 +165,16 @@ pub async fn bench_sync_nullifiers(
.into_inner()
.notes;

nullifier_prefixes.extend(
notes
.iter()
.filter_map(|n| {
// Private notes are filtered out because `n.details` is None
let details_bytes = n.note.as_ref()?.details.as_ref()?;
let details = NoteDetails::read_from_bytes(details_bytes).unwrap();
Some(u32::from(details.nullifier().prefix()))
})
.collect::<Vec<u32>>(),
);
nullifier_prefixes.extend(notes.iter().filter_map(|n| {
let details_bytes = n.note.as_ref()?.details.as_ref()?;
let details = NoteDetails::read_from_bytes(details_bytes).unwrap();
Some(u32::from(details.nullifier().prefix()))
}));
}

// Update block number from pagination info
let pagination_info = response.pagination_info.expect("pagination_info should exist");
current_block_num = pagination_info.block_num;
if pagination_info.chain_tip == current_block_num {
// The notes all come from the same block; use the block header to find it.
current_block_num = response.block_header.map_or(resp_chain_tip, |h| h.block_num);
if current_block_num >= resp_chain_tip {
break;
}
}
Expand Down
6 changes: 2 additions & 4 deletions crates/rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,9 @@ Returns info which can be used by the client to sync up to the tip of chain for

**Limits:** `note_tag` (1000)

Client specifies the `note_tags` they are interested in, and the block range from which to search for matching notes. The request will then return the next block containing any note matching the provided tags within the specified range.
Client specifies the `note_tags` they are interested in, and the block range to search. The response contains all blocks with matching notes that fit within the response payload limit, along with each note's metadata, inclusion proof, and MMR authentication path.

The response includes each note's metadata and inclusion proof.

A basic note sync can be implemented by repeatedly requesting the previous response's block until reaching the tip of the chain.
If `response.block_range.block_to` is less than the requested range end, make another request starting from `response.block_range.block_to + 1` to continue syncing.

#### Error Handling

Expand Down
92 changes: 90 additions & 2 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use miden_node_proto::generated::{self as proto};
use miden_node_proto::try_convert;
use miden_node_utils::ErrorReport;
use miden_node_utils::limiter::{
MAX_RESPONSE_PAYLOAD_BYTES,
QueryParamAccountIdLimit,
QueryParamLimiter,
QueryParamNoteIdLimit,
Expand All @@ -40,6 +41,16 @@ use url::Url;

use crate::COMPONENT;

/// Estimated byte size of a [`NoteSyncBlock`] excluding its notes.
///
/// `BlockHeader` (~288 bytes) + MMR proof (~1024 bytes).
const BLOCK_OVERHEAD_BYTES: usize = 1312;

/// Estimated byte size of a single [`NoteSyncRecord`].
///
/// `note_id` (32) + `note_index` (4) + metadata (23) + sparse merkle path (520).
const NOTE_RECORD_BYTES: usize = 579;

// RPC SERVICE
// ================================================================================================

Expand Down Expand Up @@ -259,9 +270,86 @@ impl api_server::Api for RpcService {
) -> Result<Response<proto::rpc::SyncNotesResponse>, Status> {
debug!(target: COMPONENT, request = ?request.get_ref());

check::<QueryParamNoteTagLimit>(request.get_ref().note_tags.len())?;
let request = request.into_inner();
check::<QueryParamNoteTagLimit>(request.note_tags.len())?;

let block_range = request
.block_range
.ok_or_else(|| Status::invalid_argument("missing block_range"))?;
let note_tags = request.note_tags;

let mut blocks = Vec::new();
let mut accumulated_size: usize = 0;
let mut current_block_from = block_range.block_from;

// The chain tip as reported by the store, updated on each iteration.
let chain_tip;

loop {
let store_request = proto::rpc::SyncNotesRequest {
block_range: Some(proto::rpc::BlockRange {
block_from: current_block_from,
block_to: block_range.block_to,
}),
note_tags: note_tags.clone(),
};

let store_response =
self.store.clone().sync_notes(store_request.into_request()).await?.into_inner();

let store_block_range = store_response
.block_range
.ok_or_else(|| Status::internal("store response missing block_range"))?;
let store_chain_tip = store_block_range
.block_to
.ok_or_else(|| Status::internal("store response missing block_to"))?;

// No notes means we've reached the end of the range without more matches.
if store_response.notes.is_empty() {
chain_tip = store_chain_tip;
break;
}

let block = proto::rpc::sync_notes_response::NoteSyncBlock {
block_header: store_response.block_header,
mmr_path: store_response.mmr_path,
notes: store_response.notes,
};

accumulated_size += BLOCK_OVERHEAD_BYTES + block.notes.len() * NOTE_RECORD_BYTES;

// If we exceed the budget, drop this block and stop.
if accumulated_size > MAX_RESPONSE_PAYLOAD_BYTES {
chain_tip = store_chain_tip;
break;
}

// The block number of the returned notes, used to advance the cursor.
let notes_block_num = block
.block_header
.as_ref()
.ok_or_else(|| Status::internal("store response missing block_header"))?
.block_num;
blocks.push(block);

// Check if we've reached the end of the requested range or the chain tip.
if notes_block_num >= block_range.block_to.unwrap_or(store_chain_tip) {
chain_tip = store_chain_tip;
break;
}

self.store.clone().sync_notes(request).await
// Advance the cursor. The store query uses `committed_at > block_range.start()`
// (exclusive), so setting block_from to the current block is sufficient to skip it.
current_block_from = notes_block_num;
}

Ok(Response::new(proto::rpc::SyncNotesResponse {
block_range: Some(proto::rpc::BlockRange {
block_from: block_range.block_from,
block_to: Some(chain_tip),
}),
blocks,
}))
}

async fn get_notes_by_id(
Expand Down
2 changes: 1 addition & 1 deletion crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ impl Db {
&self,
block_range: RangeInclusive<BlockNumber>,
note_tags: Vec<u32>,
) -> Result<(NoteSyncUpdate, BlockNumber), NoteSyncError> {
) -> Result<NoteSyncUpdate, NoteSyncError> {
self.transact("notes sync task", move |conn| {
queries::get_note_sync(conn, note_tags.as_slice(), block_range)
})
Expand Down
6 changes: 3 additions & 3 deletions crates/store/src/db/models/queries/notes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,16 @@ pub(crate) fn get_note_sync(
conn: &mut SqliteConnection,
note_tags: &[u32],
block_range: RangeInclusive<BlockNumber>,
) -> Result<(NoteSyncUpdate, BlockNumber), NoteSyncError> {
) -> Result<NoteSyncUpdate, NoteSyncError> {
QueryParamNoteTagLimit::check(note_tags.len()).map_err(DatabaseError::from)?;

let (notes, last_included_block) =
let (notes, _last_included_block) =
select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?;

let block_header =
select_block_header_by_block_num(conn, notes.first().map(|note| note.block_num))?
.ok_or(NoteSyncError::EmptyBlockHeadersTable)?;
Ok((NoteSyncUpdate { notes, block_header }, last_included_block))
Ok(NoteSyncUpdate { notes, block_header })
}

#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
Expand Down
116 changes: 116 additions & 0 deletions crates/store/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,122 @@ fn notes() {
assert_eq!(note_1.details, None);
}

/// Creates notes across 3 blocks with different tags, then iterates
/// `select_notes_since_block_by_tag_and_sender` advancing the cursor each time,
/// verifying that each call returns the next block's notes.
#[test]
#[miden_node_test_macro::enable_logging]
fn note_sync_across_multiple_blocks() {
let mut conn = create_db();
let conn = &mut conn;

let sender = AccountId::try_from(ACCOUNT_ID_PRIVATE_SENDER).unwrap();

// Create 3 blocks with notes.
let tag = 42u32;
let note_index = BlockNoteIndex::new(0, 0).unwrap();

for block_num_raw in 1..=3u32 {
let block_num = BlockNumber::from(block_num_raw);
create_block(conn, block_num);
queries::upsert_accounts(
conn,
&[mock_block_account_update(sender, block_num_raw.into())],
block_num,
)
.unwrap();

let new_note = create_note(sender);
let note_metadata = NoteMetadata::new(sender, NoteType::Public).with_tag(tag.into());
let values = [(note_index, new_note.id(), &note_metadata)];
let notes_db = BlockNoteTree::with_entries(values).unwrap();
let inclusion_path = notes_db.open(note_index);

let note = NoteRecord {
block_num,
note_index,
note_id: new_note.id().as_word(),
note_commitment: new_note.commitment(),
metadata: note_metadata,
details: Some(NoteDetails::from(&new_note)),
inclusion_path,
};
queries::insert_scripts(conn, [&note]).unwrap();
queries::insert_notes(conn, &[(note, None)]).unwrap();
}

// Simulate the RPC batching loop: repeatedly query with advancing block_from.
let full_range = BlockNumber::GENESIS..=BlockNumber::from(3);
let mut collected_block_nums = Vec::new();
let mut current_from = *full_range.start();

loop {
let range = current_from..=*full_range.end();
let (notes, _last_included) =
queries::select_notes_since_block_by_tag_and_sender(conn, &[], &[tag], range).unwrap();

if notes.is_empty() {
break;
}

// All notes in a single response come from the same block.
let block_num = notes[0].block_num;
assert!(notes.iter().all(|n| n.block_num == block_num));
collected_block_nums.push(block_num);

// The query uses `committed_at > block_range.start()` (exclusive), so
// advancing to the found block_num is sufficient to skip it.
current_from = block_num;
}

assert_eq!(
collected_block_nums,
vec![BlockNumber::from(1), BlockNumber::from(2), BlockNumber::from(3)],
"should iterate through all 3 blocks with matching notes"
);
}

/// Tests that note sync returns an empty result when no notes match the requested tags.
#[test]
#[miden_node_test_macro::enable_logging]
fn note_sync_no_matching_tags() {
let mut conn = create_db();
let conn = &mut conn;

let sender = AccountId::try_from(ACCOUNT_ID_PRIVATE_SENDER).unwrap();
let block_num = BlockNumber::from(1);
create_block(conn, block_num);
queries::upsert_accounts(conn, &[mock_block_account_update(sender, 0)], block_num).unwrap();

// Insert a note with tag 10.
let new_note = create_note(sender);
let note_index = BlockNoteIndex::new(0, 0).unwrap();
let note_metadata = NoteMetadata::new(sender, NoteType::Public).with_tag(10u32.into());
let values = [(note_index, new_note.id(), &note_metadata)];
let notes_db = BlockNoteTree::with_entries(values).unwrap();
let inclusion_path = notes_db.open(note_index);

let note = NoteRecord {
block_num,
note_index,
note_id: new_note.id().as_word(),
note_commitment: new_note.commitment(),
metadata: note_metadata,
details: Some(NoteDetails::from(&new_note)),
inclusion_path,
};
queries::insert_scripts(conn, [&note]).unwrap();
queries::insert_notes(conn, &[(note, None)]).unwrap();

// Query with a different tag — should return no notes.
let range = BlockNumber::GENESIS..=BlockNumber::from(1);
let (notes, last_included) =
queries::select_notes_since_block_by_tag_and_sender(conn, &[], &[999], range).unwrap();

assert!(notes.is_empty());
assert_eq!(last_included, BlockNumber::from(1));
}

fn insert_account_delta(
conn: &mut SqliteConnection,
account_id: AccountId,
Expand Down
Loading
Loading