Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
- Add #[track_caller] to tracing/logging helpers ([#1651](https://github.com/0xMiden/node/pull/1651)).
- Added support for generic account loading at genesis ([#1624](https://github.com/0xMiden/node/pull/1624)).
- Improved tracing span fields ([#1650](https://github.com/0xMiden/node/pull/1650))
- Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)).
- Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)).
- [BREAKING] Reworked `miden-remote-prover`, removing the `worker`/`proxy` distinction and simplifying to a `worker` with a request queue ([#1688](https://github.com/0xMiden/node/pull/1688)).
- [BREAKING] Renamed `NoteRoot` protobuf message used in `GetNoteScriptByRoot` gRPC endpoints into `NoteScriptRoot` ([#1722](https://github.com/0xMiden/node/pull/1722)).
- NTX Builder actors now deactivate after being idle for a configurable idle timeout (`--ntx-builder.idle-timeout`, default 5 min) and are re-activated when new notes target their account ([#1705](https://github.com/0xMiden/node/pull/1705)).
Expand All @@ -42,6 +42,7 @@
- NTX Builder now deactivates network accounts which crash repeatedly (configurable via `--ntx-builder.max-account-crashes`, default 10) ([#1712](https://github.com/0xMiden/miden-node/pull/1712)).
- Removed gRPC reflection v1-alpha support ([#1795](https://github.com/0xMiden/node/pull/1795)).
- [BREAKING] Rust requirement bumped from `v1.91` to `v1.93` ([#1803](https://github.com/0xMiden/node/pull/1803)).
- [BREAKING] Updated `SyncNotes` endpoint to returned multiple note updates ([#1809](https://github.com/0xMiden/node/issues/1809), ([#1851](https://github.com/0xMiden/node/pull/1851))).

### Fixes

Expand Down
47 changes: 25 additions & 22 deletions bin/stress-test/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,23 @@ pub async fn bench_sync_nullifiers(
};
let response = store_client.sync_notes(sync_request).await.unwrap().into_inner();

let note_ids = response
.notes
let resp_block_range = response.block_range.expect("block_range should exist");
let resp_chain_tip = resp_block_range.block_to.expect("block_to should exist");

if response.blocks.is_empty() {
break;
}

// Collect note IDs from all blocks in the response.
let note_ids: Vec<_> = response
.blocks
.iter()
.map(|n| n.note_id.unwrap())
.collect::<Vec<proto::note::NoteId>>();
.flat_map(|b| b.notes.iter().map(|n| n.note_id.unwrap()))
.collect();

// Get the notes nullifiers, limiting to 20 notes maximum
let note_ids_to_fetch =
note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect::<Vec<_>>();
// Get the notes nullifiers, limiting to 20 notes maximum.
let note_ids_to_fetch: Vec<_> =
note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect();
if !note_ids_to_fetch.is_empty() {
let notes = store_client
.get_notes_by_id(proto::note::NoteIdList { ids: note_ids_to_fetch })
Expand All @@ -163,23 +171,18 @@ pub async fn bench_sync_nullifiers(
.into_inner()
.notes;

nullifier_prefixes.extend(
notes
.iter()
.filter_map(|n| {
// Private notes are filtered out because `n.details` is None
let details_bytes = n.note.as_ref()?.details.as_ref()?;
let details = NoteDetails::read_from_bytes(details_bytes).unwrap();
Some(u32::from(details.nullifier().prefix()))
})
.collect::<Vec<u32>>(),
);
nullifier_prefixes.extend(notes.iter().filter_map(|n| {
let details_bytes = n.note.as_ref()?.details.as_ref()?;
let details = NoteDetails::read_from_bytes(details_bytes).unwrap();
Some(u32::from(details.nullifier().prefix()))
}));
}

// Update block number from pagination info
let pagination_info = response.pagination_info.expect("pagination_info should exist");
current_block_num = pagination_info.block_num;
if pagination_info.chain_tip == current_block_num {
// Advance past the last block in the response.
let last_block = response.blocks.last().unwrap();
current_block_num =
last_block.block_header.as_ref().map_or(resp_chain_tip, |h| h.block_num);
if current_block_num >= resp_chain_tip {
break;
}
}
Expand Down
6 changes: 2 additions & 4 deletions crates/rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,9 @@ Returns info which can be used by the client to sync up to the tip of chain for

**Limits:** `note_tag` (1000)

Client specifies the `note_tags` they are interested in, and the block range from which to search for matching notes. The request will then return the next block containing any note matching the provided tags within the specified range.
Client specifies the `note_tags` they are interested in, and the block range to search. The response contains all blocks with matching notes that fit within the response payload limit, along with each note's metadata, inclusion proof, and MMR authentication path.

The response includes each note's metadata and inclusion proof.

A basic note sync can be implemented by repeatedly requesting the previous response's block until reaching the tip of the chain.
If `response.block_range.block_to` is less than the requested range end, make another request starting from `response.block_range.block_to + 1` to continue syncing.

#### Error Handling

Expand Down
2 changes: 1 addition & 1 deletion crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ impl Db {
&self,
block_range: RangeInclusive<BlockNumber>,
note_tags: Vec<u32>,
) -> Result<(NoteSyncUpdate, BlockNumber), NoteSyncError> {
) -> Result<Option<NoteSyncUpdate>, NoteSyncError> {
self.transact("notes sync task", move |conn| {
queries::get_note_sync(conn, note_tags.as_slice(), block_range)
})
Expand Down
11 changes: 7 additions & 4 deletions crates/store/src/db/models/queries/notes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,19 @@ pub(crate) fn get_note_sync(
conn: &mut SqliteConnection,
note_tags: &[u32],
block_range: RangeInclusive<BlockNumber>,
) -> Result<(NoteSyncUpdate, BlockNumber), NoteSyncError> {
) -> Result<Option<NoteSyncUpdate>, NoteSyncError> {
QueryParamNoteTagLimit::check(note_tags.len()).map_err(DatabaseError::from)?;

let (notes, last_included_block) =
select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?;
let (notes, _) = select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?;

if notes.is_empty() {
return Ok(None);
}

let block_header =
select_block_header_by_block_num(conn, notes.first().map(|note| note.block_num))?
.ok_or(NoteSyncError::EmptyBlockHeadersTable)?;
Ok((NoteSyncUpdate { notes, block_header }, last_included_block))
Ok(Some(NoteSyncUpdate { notes, block_header }))
}

#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
Expand Down
111 changes: 111 additions & 0 deletions crates/store/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,117 @@ fn notes() {
assert_eq!(note_1.details, None);
}

/// Creates notes across 3 blocks with different tags, then iterates
/// `select_notes_since_block_by_tag_and_sender` advancing the cursor each time,
/// verifying that each call returns the next block's notes.
#[test]
#[miden_node_test_macro::enable_logging]
fn note_sync_across_multiple_blocks() {
let mut conn = create_db();
let conn = &mut conn;

let sender = AccountId::try_from(ACCOUNT_ID_PRIVATE_SENDER).unwrap();

// Create 3 blocks with notes.
let tag = 42u32;
let note_index = BlockNoteIndex::new(0, 0).unwrap();

for block_num_raw in 1..=3u32 {
let block_num = BlockNumber::from(block_num_raw);
create_block(conn, block_num);
queries::upsert_accounts(
conn,
&[mock_block_account_update(sender, block_num_raw.into())],
block_num,
)
.unwrap();

let new_note = create_note(sender);
let note_metadata = NoteMetadata::new(sender, NoteType::Public).with_tag(tag.into());
let values = [(note_index, new_note.id(), &note_metadata)];
let notes_db = BlockNoteTree::with_entries(values).unwrap();
let inclusion_path = notes_db.open(note_index);

let note = NoteRecord {
block_num,
note_index,
note_id: new_note.id().as_word(),
note_commitment: new_note.commitment(),
metadata: note_metadata,
details: Some(NoteDetails::from(&new_note)),
inclusion_path,
};
queries::insert_scripts(conn, [&note]).unwrap();
queries::insert_notes(conn, &[(note, None)]).unwrap();
}

// Simulate the store batching loop: repeatedly call get_note_sync with advancing
// block_from, same as `State::sync_notes` does.
let mut collected_block_nums = Vec::new();
let mut current_from = BlockNumber::GENESIS;
let end = BlockNumber::from(3);

loop {
let range = current_from..=end;
let Some(update) = queries::get_note_sync(conn, &[tag], range).unwrap() else {
break;
};

// All notes in a single response come from the same block.
let block_num = update.block_header.block_num();
assert!(update.notes.iter().all(|n| n.block_num == block_num));
collected_block_nums.push(block_num);

// The query uses `committed_at > block_range.start()` (exclusive), so
// advancing to the found block_num is sufficient to skip it.
current_from = block_num;
}

assert_eq!(
collected_block_nums,
vec![BlockNumber::from(1), BlockNumber::from(2), BlockNumber::from(3)],
"should iterate through all 3 blocks with matching notes"
);
}

/// Tests that note sync returns an empty result when no notes match the requested tags.
#[test]
#[miden_node_test_macro::enable_logging]
fn note_sync_no_matching_tags() {
let mut conn = create_db();
let conn = &mut conn;

let sender = AccountId::try_from(ACCOUNT_ID_PRIVATE_SENDER).unwrap();
let block_num = BlockNumber::from(1);
create_block(conn, block_num);
queries::upsert_accounts(conn, &[mock_block_account_update(sender, 0)], block_num).unwrap();

// Insert a note with tag 10.
let new_note = create_note(sender);
let note_index = BlockNoteIndex::new(0, 0).unwrap();
let note_metadata = NoteMetadata::new(sender, NoteType::Public).with_tag(10u32.into());
let values = [(note_index, new_note.id(), &note_metadata)];
let notes_db = BlockNoteTree::with_entries(values).unwrap();
let inclusion_path = notes_db.open(note_index);

let note = NoteRecord {
block_num,
note_index,
note_id: new_note.id().as_word(),
note_commitment: new_note.commitment(),
metadata: note_metadata,
details: Some(NoteDetails::from(&new_note)),
inclusion_path,
};
queries::insert_scripts(conn, [&note]).unwrap();
queries::insert_notes(conn, &[(note, None)]).unwrap();

// Query with a different tag — should return None.
let range = BlockNumber::GENESIS..=BlockNumber::from(1);
let result = queries::get_note_sync(conn, &[999], range).unwrap();
assert!(result.is_none());
}

fn insert_account_delta(
conn: &mut SqliteConnection,
account_id: AccountId,
Expand Down
26 changes: 17 additions & 9 deletions crates/store/src/server/rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,26 +130,34 @@ impl rpc_server::Rpc for StoreApi {
let request = request.into_inner();

let chain_tip = self.state.latest_block_num().await;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (feel free to disregard): There's no need to read the latest block number if the user set a block_to

let requested_block_to = request.block_range.as_ref().and_then(|r| r.block_to);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would move this down to be below line 134

let block_range =
read_block_range::<NoteSyncError>(request.block_range, "SyncNotesRequest")?
.into_inclusive_range::<NoteSyncError>(&chain_tip)?;

let block_from = block_range.start().as_u32();
let response_block_to = requested_block_to.unwrap_or(chain_tip.as_u32());

// Validate note tags count
check::<QueryParamNoteTagLimit>(request.note_tags.len())?;

let (state, mmr_proof, last_block_included) =
self.state.sync_notes(request.note_tags, block_range).await?;
let results = self.state.sync_notes(request.note_tags, block_range).await?;

let notes = state.notes.into_iter().map(Into::into).collect();
let blocks = results
.into_iter()
.map(|(state, mmr_proof)| proto::rpc::sync_notes_response::NoteSyncBlock {
block_header: Some(state.block_header.into()),
mmr_path: Some(mmr_proof.merkle_path().clone().into()),
notes: state.notes.into_iter().map(Into::into).collect(),
})
.collect();

Ok(Response::new(proto::rpc::SyncNotesResponse {
pagination_info: Some(proto::rpc::PaginationInfo {
chain_tip: chain_tip.as_u32(),
block_num: last_block_included.as_u32(),
block_range: Some(proto::rpc::BlockRange {
block_from,
block_to: Some(response_block_to),
}),
block_header: Some(state.block_header.into()),
mmr_path: Some(mmr_proof.merkle_path().clone().into()),
notes,
blocks,
}))
}

Expand Down
59 changes: 45 additions & 14 deletions crates/store/src/state/sync_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::RangeInclusive;

use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES;
use miden_protocol::account::AccountId;
use miden_protocol::block::BlockNumber;
use miden_protocol::crypto::merkle::mmr::{Forest, MmrDelta, MmrProof};
Expand All @@ -11,6 +12,17 @@ use crate::db::models::queries::StorageMapValuesPage;
use crate::db::{AccountVaultValue, NoteSyncUpdate, NullifierInfo};
use crate::errors::{DatabaseError, NoteSyncError, StateSyncError};

/// Estimated byte size of a [`NoteSyncBlock`] excluding its notes.
///
/// `BlockHeader` (~341 bytes) + MMR proof with 32 siblings (~1216 bytes).
const BLOCK_OVERHEAD_BYTES: usize = 1600;
Comment on lines +16 to +19
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but it would be good to make this based on the actual serialization sizes. For example, we could have BlockHeader::SERIALIZED_SIZE constant and use it here.

Let's create an issue for this (and include NOTE_RECORD_BYTES in this issue too).


/// Estimated byte size of a single [`NoteSyncRecord`].
///
/// Note ID (~38 bytes) + index + metadata (~26 bytes) + sparse merkle path with 16
/// siblings (~608 bytes).
const NOTE_RECORD_BYTES: usize = 700;
Comment on lines +21 to +25
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now, but most likely will be a significant overestimate because sparse Merkle paths get compressed, and in most cases shouldn't be more than a couple hundred bytes. But the compression depends on how many paths there are (the more paths, the worse the compression) - so, taking the worst case is fine for now.


// STATE SYNCHRONIZATION ENDPOINTS
// ================================================================================================

Expand Down Expand Up @@ -64,29 +76,48 @@ impl State {

/// Loads data to synchronize a client's notes.
///
/// The client's request contains a list of tags, this method will return the first
/// block with a matching tag, or the chain tip. All the other values are filter based on this
/// block range.
///
/// # Arguments
///
/// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
/// first block containing a matching note.
/// - `block_range`: The range of blocks from which to synchronize notes.
/// Returns as many blocks with matching notes as fit within the response payload
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can reference the max payload size constant here

/// limit. Each block includes its header and MMR proof at `block_range.end()`.
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn sync_notes(
&self,
note_tags: Vec<u32>,
block_range: RangeInclusive<BlockNumber>,
) -> Result<(NoteSyncUpdate, MmrProof, BlockNumber), NoteSyncError> {
) -> Result<Vec<(NoteSyncUpdate, MmrProof)>, NoteSyncError> {
let inner = self.inner.read().await;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to take the lock in the loop to avoid keeping it for the whole duration

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and it can be moved right next to the open_at() call)

let checkpoint = *block_range.end();

let mut results = Vec::new();
let mut accumulated_size: usize = 0;
let mut current_from = *block_range.start();

loop {
let range = current_from..=checkpoint;
let Some(note_sync) = self.db.get_note_sync(range, note_tags.clone()).await? else {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if get_note_sync could change to take something like &[u32] to avoid cloning every time here

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that get_note_sync then uses db.transact with takes a move closure, so we cannot pass a &[u32]. I changed it to pass an Arc<[u32]> that is cheaper to clone.

break;
};

let (note_sync, last_included_block) =
self.db.get_note_sync(block_range, note_tags).await?;
accumulated_size += BLOCK_OVERHEAD_BYTES + note_sync.notes.len() * NOTE_RECORD_BYTES;

let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num())?;
if accumulated_size > MAX_RESPONSE_PAYLOAD_BYTES {
break;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning a single block is always guaranteed, right? We should make sure this is the case because otherwise it might be problematic

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an extra check so the first block is always included, even if the size is exceeded.


let block_num = note_sync.block_header.block_num();

if block_num >= checkpoint {
break;
}
Comment on lines +111 to +113
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We break here but AFAIK we are supposed to return notes for the checkpoint block, right? I think this might be a special case. If the user calls SyncMmr which returns the chain tip header and the MmrDelta, the user already has the header (and it's already validated by default). So when block_num == checkpoint, we might to just returnr the notes, and not return the merkle path at all (open_at would otherwise fail, I think). cc @bobbinth


let mmr_proof = inner.blockchain.open_at(block_num, checkpoint)?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, I think if the user set a block_to beyond the chain tip, the response would be automatically valid because the store would find no note after the chain tip, but now because we open_at the user-set checkpoint, this might fail. I think this is fine because it's a wrong argument, but maybe at the RPC level we should check that the block_to is not beyond the chain tip and return early.

results.push((note_sync, mmr_proof));

// The DB query uses `committed_at > block_range.start()` (exclusive),
// so setting current_from to the found block is sufficient to skip it.
current_from = block_num;
}

Ok((note_sync, mmr_proof, last_included_block))
Ok(results)
}

pub async fn sync_nullifiers(
Expand Down
10 changes: 2 additions & 8 deletions proto/proto/internal/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,9 @@ service Rpc {
// Note that only 16-bit prefixes are supported at this time.
rpc SyncNullifiers(rpc.SyncNullifiersRequest) returns (rpc.SyncNullifiersResponse) {}

// Returns info which can be used by the requester to sync up to the tip of chain for the notes they are interested in.
// Returns blocks containing notes matching the requested tags within the given range.
//
// requester specifies the `note_tags` they are interested in, and the block height from which to search for new for
// matching notes for. The request will then return the next block containing any note matching the provided tags.
//
// The response includes each note's metadata and inclusion proof.
//
// A basic note sync can be implemented by repeatedly requesting the previous response's block until reaching the
// tip of the chain.
// The response batches as many blocks as fit within the response payload limit.
rpc SyncNotes(rpc.SyncNotesRequest) returns (rpc.SyncNotesResponse) {}

// Returns chain MMR updates within a block range.
Expand Down
Loading
Loading