Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
- NTX Builder now deactivates network accounts which crash repeatedly (configurable via `--ntx-builder.max-account-crashes`, default 10) ([#1712](https://github.com/0xMiden/miden-node/pull/1712)).
- Removed gRPC reflection v1-alpha support ([#1795](https://github.com/0xMiden/node/pull/1795)).
- [BREAKING] Rust requirement bumped from `v1.91` to `v1.93` ([#1803](https://github.com/0xMiden/node/pull/1803)).
- [BREAKING] Updated `SyncNotes` endpoint to returned multiple note updates ([#1809](https://github.com/0xMiden/node/issues/1809)).
- [BREAKING] Updated `SyncNotes` endpoint to returned multiple note updates ([#1809](https://github.com/0xMiden/node/issues/1809), ([#1851](https://github.com/0xMiden/node/pull/1851))).

### Fixes

Expand Down
18 changes: 13 additions & 5 deletions bin/stress-test/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,18 @@ pub async fn bench_sync_nullifiers(
let resp_block_range = response.block_range.expect("block_range should exist");
let resp_chain_tip = resp_block_range.block_to.expect("block_to should exist");

if response.notes.is_empty() {
if response.blocks.is_empty() {
break;
}

// Get the notes nullifiers, limiting to 20 notes maximum
let note_ids: Vec<_> = response.notes.iter().map(|n| n.note_id.unwrap()).collect();
// Collect note IDs from all blocks in the response.
let note_ids: Vec<_> = response
.blocks
.iter()
.flat_map(|b| b.notes.iter().map(|n| n.note_id.unwrap()))
.collect();

// Get the notes nullifiers, limiting to 20 notes maximum.
let note_ids_to_fetch: Vec<_> =
note_ids.iter().take(NOTE_IDS_PER_NULLIFIERS_CHECK).copied().collect();
if !note_ids_to_fetch.is_empty() {
Expand All @@ -172,8 +178,10 @@ pub async fn bench_sync_nullifiers(
}));
}

// The notes all come from the same block; use the block header to find it.
current_block_num = response.block_header.map_or(resp_chain_tip, |h| h.block_num);
// Advance past the last block in the response.
let last_block = response.blocks.last().unwrap();
current_block_num =
last_block.block_header.as_ref().map_or(resp_chain_tip, |h| h.block_num);
if current_block_num >= resp_chain_tip {
break;
}
Expand Down
93 changes: 2 additions & 91 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use miden_node_proto::generated::{self as proto};
use miden_node_proto::try_convert;
use miden_node_utils::ErrorReport;
use miden_node_utils::limiter::{
MAX_RESPONSE_PAYLOAD_BYTES,
QueryParamAccountIdLimit,
QueryParamLimiter,
QueryParamNoteIdLimit,
Expand All @@ -41,17 +40,6 @@ use url::Url;

use crate::COMPONENT;

/// Estimated byte size of a [`NoteSyncBlock`] excluding its notes.
///
/// `BlockHeader` (~341 bytes) + MMR proof with 32 siblings (~1216 bytes).
const BLOCK_OVERHEAD_BYTES: usize = 1600;

/// Estimated byte size of a single [`NoteSyncRecord`].
///
/// Note ID (~38 bytes) + index + metadata (~26 bytes) + sparse merkle path with 16
/// siblings (~608 bytes).
const NOTE_RECORD_BYTES: usize = 700;

// RPC SERVICE
// ================================================================================================

Expand Down Expand Up @@ -271,86 +259,9 @@ impl api_server::Api for RpcService {
) -> Result<Response<proto::rpc::SyncNotesResponse>, Status> {
debug!(target: COMPONENT, request = ?request.get_ref());

let request = request.into_inner();
check::<QueryParamNoteTagLimit>(request.note_tags.len())?;

let block_range = request
.block_range
.ok_or_else(|| Status::invalid_argument("missing block_range"))?;
let note_tags = request.note_tags;
check::<QueryParamNoteTagLimit>(request.get_ref().note_tags.len())?;

let mut sync_blocks = Vec::new();
let mut accumulated_size: usize = 0;
let mut current_block_from = block_range.block_from;
// The upper bound for the response: the requested block_to if specified,
// otherwise updated to the chain tip from the store on each iteration.
let mut response_block_to = block_range.block_to.unwrap_or(0);

loop {
let store_request = proto::rpc::SyncNotesRequest {
block_range: Some(proto::rpc::BlockRange {
block_from: current_block_from,
block_to: block_range.block_to,
}),
note_tags: note_tags.clone(),
};

let store_response =
self.store.clone().sync_notes(store_request.into_request()).await?.into_inner();

// When block_to was not specified, use the chain tip from the store.
if block_range.block_to.is_none() {
let store_chain_tip = store_response
.block_range
.ok_or_else(|| Status::internal("store response missing block_range"))?
.block_to
.ok_or_else(|| Status::internal("store response missing block_to"))?;
response_block_to = store_chain_tip;
}

// No notes means we've reached the end of the range without more matches.
if store_response.notes.is_empty() {
break;
}

let sync_block = proto::rpc::sync_notes_response::NoteSyncBlock {
block_header: store_response.block_header,
mmr_path: store_response.mmr_path,
notes: store_response.notes,
};

accumulated_size += BLOCK_OVERHEAD_BYTES + sync_block.notes.len() * NOTE_RECORD_BYTES;

// If we exceed the budget, drop this block and stop.
if accumulated_size > MAX_RESPONSE_PAYLOAD_BYTES {
break;
}

// The block number of the returned notes, used to advance the cursor.
let notes_block_num = sync_block
.block_header
.as_ref()
.ok_or_else(|| Status::internal("store response missing block_header"))?
.block_num;
sync_blocks.push(sync_block);

// Check if we've reached the end of the requested range.
if notes_block_num >= response_block_to {
break;
}

// Advance the cursor. The store query uses `committed_at > block_range.start()`
// (exclusive), so setting block_from to the current block is sufficient to skip it.
current_block_from = notes_block_num;
}

Ok(Response::new(proto::rpc::SyncNotesResponse {
block_range: Some(proto::rpc::BlockRange {
block_from: block_range.block_from,
block_to: Some(response_block_to),
}),
blocks: sync_blocks,
}))
self.store.clone().sync_notes(request).await
}

async fn get_notes_by_id(
Expand Down
2 changes: 1 addition & 1 deletion crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ impl Db {
&self,
block_range: RangeInclusive<BlockNumber>,
note_tags: Vec<u32>,
) -> Result<NoteSyncUpdate, NoteSyncError> {
) -> Result<Option<NoteSyncUpdate>, NoteSyncError> {
self.transact("notes sync task", move |conn| {
queries::get_note_sync(conn, note_tags.as_slice(), block_range)
})
Expand Down
11 changes: 7 additions & 4 deletions crates/store/src/db/models/queries/notes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,19 @@ pub(crate) fn get_note_sync(
conn: &mut SqliteConnection,
note_tags: &[u32],
block_range: RangeInclusive<BlockNumber>,
) -> Result<NoteSyncUpdate, NoteSyncError> {
) -> Result<Option<NoteSyncUpdate>, NoteSyncError> {
QueryParamNoteTagLimit::check(note_tags.len()).map_err(DatabaseError::from)?;

let (notes, _last_included_block) =
select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?;
let (notes, _) = select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?;

if notes.is_empty() {
return Ok(None);
}

let block_header =
select_block_header_by_block_num(conn, notes.first().map(|note| note.block_num))?
.ok_or(NoteSyncError::EmptyBlockHeadersTable)?;
Ok(NoteSyncUpdate { notes, block_header })
Ok(Some(NoteSyncUpdate { notes, block_header }))
}

#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
Expand Down
29 changes: 12 additions & 17 deletions crates/store/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,23 +956,21 @@ fn note_sync_across_multiple_blocks() {
queries::insert_notes(conn, &[(note, None)]).unwrap();
}

// Simulate the RPC batching loop: repeatedly query with advancing block_from.
let full_range = BlockNumber::GENESIS..=BlockNumber::from(3);
// Simulate the store batching loop: repeatedly call get_note_sync with advancing
// block_from, same as `State::sync_notes` does.
let mut collected_block_nums = Vec::new();
let mut current_from = *full_range.start();
let mut current_from = BlockNumber::GENESIS;
let end = BlockNumber::from(3);

loop {
let range = current_from..=*full_range.end();
let (notes, _last_included) =
queries::select_notes_since_block_by_tag_and_sender(conn, &[], &[tag], range).unwrap();

if notes.is_empty() {
let range = current_from..=end;
let Some(update) = queries::get_note_sync(conn, &[tag], range).unwrap() else {
break;
}
};

// All notes in a single response come from the same block.
let block_num = notes[0].block_num;
assert!(notes.iter().all(|n| n.block_num == block_num));
let block_num = update.block_header.block_num();
assert!(update.notes.iter().all(|n| n.block_num == block_num));
collected_block_nums.push(block_num);

// The query uses `committed_at > block_range.start()` (exclusive), so
Expand Down Expand Up @@ -1019,13 +1017,10 @@ fn note_sync_no_matching_tags() {
queries::insert_scripts(conn, [&note]).unwrap();
queries::insert_notes(conn, &[(note, None)]).unwrap();

// Query with a different tag — should return no notes.
// Query with a different tag — should return None.
let range = BlockNumber::GENESIS..=BlockNumber::from(1);
let (notes, last_included) =
queries::select_notes_since_block_by_tag_and_sender(conn, &[], &[999], range).unwrap();

assert!(notes.is_empty());
assert_eq!(last_included, BlockNumber::from(1));
let result = queries::get_note_sync(conn, &[999], range).unwrap();
assert!(result.is_none());
}

fn insert_account_delta(
Expand Down
30 changes: 15 additions & 15 deletions crates/store/src/server/rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,38 +126,38 @@ impl rpc_server::Rpc for StoreApi {
async fn sync_notes(
&self,
request: Request<proto::rpc::SyncNotesRequest>,
) -> Result<Response<proto::store::StoreSyncNotesResponse>, Status> {
) -> Result<Response<proto::rpc::SyncNotesResponse>, Status> {
let request = request.into_inner();

let chain_tip = self.state.latest_block_num().await;
let requested_block_to = request.block_range.as_ref().and_then(|r| r.block_to);
let block_range =
read_block_range::<NoteSyncError>(request.block_range, "SyncNotesRequest")?
.into_inclusive_range::<NoteSyncError>(&chain_tip)?;

let block_from = block_range.start().as_u32();
let response_block_to = requested_block_to.unwrap_or(chain_tip.as_u32());

// Validate note tags count
check::<QueryParamNoteTagLimit>(request.note_tags.len())?;

let result = self.state.sync_notes(request.note_tags, block_range).await?;
let results = self.state.sync_notes(request.note_tags, block_range).await?;

let (block_header, mmr_path, notes) = match result {
Some((state, mmr_proof)) => (
Some(state.block_header.into()),
Some(mmr_proof.merkle_path().clone().into()),
state.notes.into_iter().map(Into::into).collect(),
),
None => (None, None, Vec::new()),
};
let blocks = results
.into_iter()
.map(|(state, mmr_proof)| proto::rpc::sync_notes_response::NoteSyncBlock {
block_header: Some(state.block_header.into()),
mmr_path: Some(mmr_proof.merkle_path().clone().into()),
notes: state.notes.into_iter().map(Into::into).collect(),
})
.collect();

Ok(Response::new(proto::store::StoreSyncNotesResponse {
Ok(Response::new(proto::rpc::SyncNotesResponse {
block_range: Some(proto::rpc::BlockRange {
block_from,
block_to: Some(chain_tip.as_u32()),
block_to: Some(response_block_to),
}),
block_header,
mmr_path,
notes,
blocks,
}))
}

Expand Down
59 changes: 43 additions & 16 deletions crates/store/src/state/sync_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::RangeInclusive;

use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES;
use miden_protocol::account::AccountId;
use miden_protocol::block::BlockNumber;
use miden_protocol::crypto::merkle::mmr::{Forest, MmrDelta, MmrProof};
Expand All @@ -11,6 +12,17 @@ use crate::db::models::queries::StorageMapValuesPage;
use crate::db::{AccountVaultValue, NoteSyncUpdate, NullifierInfo};
use crate::errors::{DatabaseError, NoteSyncError, StateSyncError};

/// Estimated byte size of a [`NoteSyncBlock`] excluding its notes.
///
/// `BlockHeader` (~341 bytes) + MMR proof with 32 siblings (~1216 bytes).
const BLOCK_OVERHEAD_BYTES: usize = 1600;

/// Estimated byte size of a single [`NoteSyncRecord`].
///
/// Note ID (~38 bytes) + index + metadata (~26 bytes) + sparse merkle path with 16
/// siblings (~608 bytes).
const NOTE_RECORD_BYTES: usize = 700;

// STATE SYNCHRONIZATION ENDPOINTS
// ================================================================================================

Expand Down Expand Up @@ -64,33 +76,48 @@ impl State {

/// Loads data to synchronize a client's notes.
///
/// The client's request contains a list of tags, this method will return the first
/// block with a matching tag, or the chain tip. All the other values are filter based on this
/// block range.
///
/// # Arguments
///
/// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
/// first block containing a matching note.
/// - `block_range`: The range of blocks from which to synchronize notes.
/// Returns as many blocks with matching notes as fit within the response payload
/// limit. Each block includes its header and MMR proof at `block_range.end()`.
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn sync_notes(
&self,
note_tags: Vec<u32>,
block_range: RangeInclusive<BlockNumber>,
) -> Result<Option<(NoteSyncUpdate, MmrProof)>, NoteSyncError> {
) -> Result<Vec<(NoteSyncUpdate, MmrProof)>, NoteSyncError> {
let inner = self.inner.read().await;
let checkpoint = *block_range.end();

let note_sync = self.db.get_note_sync(block_range, note_tags).await?;
let mut results = Vec::new();
let mut accumulated_size: usize = 0;
let mut current_from = *block_range.start();

if note_sync.notes.is_empty() {
return Ok(None);
}
loop {
let range = current_from..=checkpoint;
let Some(note_sync) = self.db.get_note_sync(range, note_tags.clone()).await? else {
break;
};

accumulated_size += BLOCK_OVERHEAD_BYTES + note_sync.notes.len() * NOTE_RECORD_BYTES;

let mmr_proof = inner.blockchain.open_at(note_sync.block_header.block_num(), checkpoint)?;
if accumulated_size > MAX_RESPONSE_PAYLOAD_BYTES {
break;
}

let block_num = note_sync.block_header.block_num();

if block_num >= checkpoint {
break;
}

let mmr_proof = inner.blockchain.open_at(block_num, checkpoint)?;
results.push((note_sync, mmr_proof));

// The DB query uses `committed_at > block_range.start()` (exclusive),
// so setting current_from to the found block is sufficient to skip it.
current_from = block_num;
}

Ok(Some((note_sync, mmr_proof)))
Ok(results)
}

pub async fn sync_nullifiers(
Expand Down
Loading
Loading