diff --git a/Cargo.lock b/Cargo.lock index c6b7cc03b..9d5a595d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2948,8 +2948,7 @@ dependencies = [ [[package]] name = "miden-note-transport-proto-build" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d7a7b3a64c71d33f771d32cde58559207819a64ada9add0acb31857e111b9d" +source = "git+https://github.com/0xMiden/miden-note-transport?branch=ev%2Ffetch-limit#e8e70d80ed189434051ce66bcd076de6e3f3a410" dependencies = [ "fs-err", "miette", diff --git a/Cargo.toml b/Cargo.toml index eb26e0b49..93443fe32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ miden-node-proto-build = { default-features = false, version = "0.12" miden-node-rpc = { version = "0.12" } miden-node-store = { version = "0.12" } miden-node-utils = { version = "0.12" } -miden-note-transport-proto-build = { default-features = false, version = "0.1" } +miden-note-transport-proto-build = { default-features = false, git = "https://github.com/0xMiden/miden-note-transport", branch = "ev/fetch-limit" } miden-remote-prover = { features = ["concurrent"], version = "0.12" } miden-remote-prover-client = { default-features = false, features = ["tx-prover"], version = "0.12" } diff --git a/crates/rust-client/src/note_transport/generated/nostd/miden_note_transport.rs b/crates/rust-client/src/note_transport/generated/nostd/miden_note_transport.rs index 0ba8cccab..0e9b1193a 100644 --- a/crates/rust-client/src/note_transport/generated/nostd/miden_note_transport.rs +++ b/crates/rust-client/src/note_transport/generated/nostd/miden_note_transport.rs @@ -26,6 +26,8 @@ pub struct FetchNotesRequest { pub tags: ::prost::alloc::vec::Vec, #[prost(fixed64, tag = "2")] pub cursor: u64, + #[prost(fixed32, optional, tag = "3")] + pub limit: ::core::option::Option, } /// API response for fetching notes #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/crates/rust-client/src/note_transport/generated/std/miden_note_transport.rs b/crates/rust-client/src/note_transport/generated/std/miden_note_transport.rs index 684ff2478..99abd2f1a 100644 --- a/crates/rust-client/src/note_transport/generated/std/miden_note_transport.rs +++ b/crates/rust-client/src/note_transport/generated/std/miden_note_transport.rs @@ -26,6 +26,8 @@ pub struct FetchNotesRequest { pub tags: ::prost::alloc::vec::Vec, #[prost(fixed64, tag = "2")] pub cursor: u64, + #[prost(fixed32, optional, tag = "3")] + pub limit: ::core::option::Option, } /// API response for fetching notes #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/crates/rust-client/src/note_transport/grpc.rs b/crates/rust-client/src/note_transport/grpc.rs index 694b7e217..51c1a12f2 100644 --- a/crates/rust-client/src/note_transport/grpc.rs +++ b/crates/rust-client/src/note_transport/grpc.rs @@ -111,9 +111,14 @@ impl GrpcNoteTransportClient { &self, tags: &[NoteTag], cursor: NoteTransportCursor, + limit: Option, ) -> Result<(Vec, NoteTransportCursor), NoteTransportError> { let tags_int = tags.iter().map(NoteTag::as_u32).collect(); - let request = FetchNotesRequest { tags: tags_int, cursor: cursor.value() }; + let request = FetchNotesRequest { + tags: tags_int, + cursor: cursor.value(), + limit, + }; let response = self .api() @@ -199,8 +204,9 @@ impl super::NoteTransportClient for GrpcNoteTransportClient { &self, tags: &[NoteTag], cursor: NoteTransportCursor, + limit: Option, ) -> Result<(Vec, NoteTransportCursor), NoteTransportError> { - self.fetch_notes(tags, cursor).await + self.fetch_notes(tags, cursor, limit).await } async fn stream_notes( diff --git a/crates/rust-client/src/note_transport/mod.rs b/crates/rust-client/src/note_transport/mod.rs index b89e6c8f4..49f4d639a 100644 --- a/crates/rust-client/src/note_transport/mod.rs +++ b/crates/rust-client/src/note_transport/mod.rs @@ -69,7 +69,7 @@ impl Client { // Get global cursor let cursor = self.store.get_note_transport_cursor().await?; - let update = NoteTransport::new(api).fetch_notes(cursor, note_tags).await?; + let update = NoteTransport::new(api).fetch_notes(note_tags, cursor, None).await?; self.store.apply_note_transport_update(update).await?; @@ -87,7 +87,7 @@ impl Client { let note_tags = self.store.get_unique_note_tags().await?; let update = NoteTransport::new(api) - .fetch_notes(NoteTransportCursor::init(), note_tags) + .fetch_notes(note_tags, NoteTransportCursor::init(), None) .await?; self.store.apply_note_transport_update(update).await?; @@ -141,18 +141,22 @@ impl NoteTransport { /// Fetch notes for provided note tags with pagination /// /// Only notes after the provided cursor are requested. + /// If a limit is provided, then only a number of notes up to that limit will be downloaded. pub(crate) async fn fetch_notes( &mut self, - cursor: NoteTransportCursor, tags: I, + cursor: NoteTransportCursor, + limit: Option, ) -> Result where I: IntoIterator, { let mut note_updates = vec![]; // Fetch notes - let (note_infos, rcursor) = - self.api.fetch_notes(&tags.into_iter().collect::>(), cursor).await?; + let (note_infos, rcursor) = self + .api + .fetch_notes(&tags.into_iter().collect::>(), cursor, limit) + .await?; for note_info in ¬e_infos { // e2ee impl hint: // for key in self.store.decryption_keys() try @@ -207,6 +211,7 @@ pub trait NoteTransportClient: Send + Sync { &self, tag: &[NoteTag], cursor: NoteTransportCursor, + limit: Option, ) -> Result<(Vec, NoteTransportCursor), NoteTransportError>; /// Stream notes for a given tag diff --git a/crates/rust-client/src/sync/mod.rs b/crates/rust-client/src/sync/mod.rs index 0c95c15b3..1691b95dd 100644 --- a/crates/rust-client/src/sync/mod.rs +++ b/crates/rust-client/src/sync/mod.rs @@ -166,7 +166,7 @@ where // TODO We can run both sync_state, fetch_notes futures in parallel let note_transport_update = if let Some(mut note_transport) = note_transport { let cursor = self.store.get_note_transport_cursor().await?; - Some(note_transport.fetch_notes(cursor, note_tags).await?) + Some(note_transport.fetch_notes(note_tags, cursor, None).await?) } else { None }; diff --git a/crates/rust-client/src/test_utils/note_transport.rs b/crates/rust-client/src/test_utils/note_transport.rs index ed35c5d78..cc0ca3efc 100644 --- a/crates/rust-client/src/test_utils/note_transport.rs +++ b/crates/rust-client/src/test_utils/note_transport.rs @@ -45,33 +45,35 @@ impl MockNoteTransportNode { &self, tags: &[NoteTag], cursor: NoteTransportCursor, + limit: Option, ) -> (Vec, NoteTransportCursor) { - let mut notes = vec![]; - let mut rcursor = NoteTransportCursor::init(); + let mut notesc_unlimited = Vec::new(); + for tag in tags { // Assumes stored notes are ordered by cursor - let tnotes = self - .notes - .get(tag) - .map(|pg_notes| { - // Find first element after cursor - if let Some(pos) = pg_notes.iter().position(|(_, tcursor)| *tcursor > cursor) { - &pg_notes[pos..] - } else { - &[] + if let Some(tag_notes) = self.notes.get(tag) { + // Find first element after cursor + if let Some(pos) = tag_notes.iter().position(|(_, tcursor)| *tcursor > cursor) { + for (note, note_cursor) in &tag_notes[pos..] { + notesc_unlimited.push((note.clone(), *note_cursor)); } - }) - .map(Vec::from) - .unwrap_or_default(); - rcursor = rcursor.max( - tnotes - .iter() - .map(|(_, cursor)| *cursor) - .max() - .unwrap_or(NoteTransportCursor::init()), - ); - notes.extend(tnotes.into_iter().map(|(note, _)| note).collect::>()); + } + } } + + // Sort mixed-tagged notes by cursor + notesc_unlimited.sort_by_key(|(_, cursor)| *cursor); + + // Apply limit if specified + let limit_usize = limit.map(|l| l as usize); + let notesc_limited: Vec<_> = + notesc_unlimited.iter().take(limit_usize.unwrap_or(usize::MAX)).collect(); + + let rcursor = notesc_limited.last().map(|(_, cursor)| *cursor).unwrap_or(cursor); + + // Extract notes + let notes: Vec = notesc_limited.iter().map(|(note, _)| note.clone()).collect(); + (notes, rcursor) } } @@ -105,8 +107,9 @@ impl MockNoteTransportApi { &self, tags: &[NoteTag], cursor: NoteTransportCursor, + limit: Option, ) -> (Vec, NoteTransportCursor) { - self.mock_node.read().get_notes(tags, cursor) + self.mock_node.read().get_notes(tags, cursor, limit) } } @@ -136,8 +139,9 @@ impl NoteTransportClient for MockNoteTransportApi { &self, tags: &[NoteTag], cursor: NoteTransportCursor, + limit: Option, ) -> Result<(Vec, NoteTransportCursor), NoteTransportError> { - Ok(self.fetch_notes(tags, cursor)) + Ok(self.fetch_notes(tags, cursor, limit)) } async fn stream_notes(