diff --git a/crates/node/src/database/maintenance.rs b/crates/node/src/database/maintenance.rs index 08fc4fb..9441be8 100644 --- a/crates/node/src/database/maintenance.rs +++ b/crates/node/src/database/maintenance.rs @@ -62,6 +62,8 @@ impl DatabaseMaintenance { #[cfg(test)] mod tests { use chrono::Utc; + use miden_objects::account::AccountId; + use miden_objects::testing::account_id::ACCOUNT_ID_MAX_ZEROES; use serial_test::serial; use super::*; @@ -69,9 +71,13 @@ mod tests { use crate::test_utils::test_note_header; use crate::types::StoredNote; + fn default_test_account_id() -> AccountId { + AccountId::try_from(ACCOUNT_ID_MAX_ZEROES).unwrap() + } + fn note_at(age: Duration) -> StoredNote { StoredNote { - header: test_note_header(), + header: test_note_header(default_test_account_id()), details: vec![1, 2, 3, 4], created_at: Utc::now() - age, } diff --git a/crates/node/src/database/mod.rs b/crates/node/src/database/mod.rs index a645c8a..3055caf 100644 --- a/crates/node/src/database/mod.rs +++ b/crates/node/src/database/mod.rs @@ -22,11 +22,17 @@ pub trait DatabaseBackend: Send + Sync { /// Store a new note async fn store_note(&self, note: &StoredNote) -> Result<(), DatabaseError>; - /// Fetch notes by tag + /// Fetch notes by tags + /// + /// Fetched notes must be after the provided cursor, up to some limit of notes. + /// If limit is None, no limit is applied. + /// Notes from all tags are combined, ordered by timestamp globally, and the limit + /// is applied to the combined set. async fn fetch_notes( &self, - tag: NoteTag, + tags: &[NoteTag], cursor: u64, + limit: Option, ) -> Result, DatabaseError>; /// Get statistics about the database @@ -78,13 +84,17 @@ impl Database { Ok(()) } - /// Fetch notes by tag with cursor-based pagination + /// Fetch notes by tags with cursor-based pagination + /// + /// Notes from all tags are combined, ordered by timestamp globally, and the limit + /// is applied to the combined set. pub async fn fetch_notes( &self, - tag: NoteTag, + tags: &[NoteTag], cursor: u64, + limit: Option, ) -> Result, DatabaseError> { - self.backend.fetch_notes(tag, cursor).await + self.backend.fetch_notes(tags, cursor, limit).await } /// Get statistics about the database @@ -106,13 +116,19 @@ impl Database { #[cfg(test)] mod tests { use chrono::Utc; + use miden_objects::account::AccountId; + use miden_objects::testing::account_id::ACCOUNT_ID_MAX_ZEROES; use super::*; use crate::metrics::Metrics; - use crate::test_utils::test_note_header; + use crate::test_utils::{random_account_id, test_note_header}; const TAG_LOCAL_ANY: u32 = 0xc000_0000; + fn default_test_account_id() -> AccountId { + AccountId::try_from(ACCOUNT_ID_MAX_ZEROES).unwrap() + } + #[tokio::test] async fn test_sqlite_database() { let db = Database::connect(DatabaseConfig::default(), Metrics::default().db) @@ -121,17 +137,15 @@ mod tests { let start = Utc::now(); let note = StoredNote { - header: test_note_header(), + header: test_note_header(default_test_account_id()), details: vec![1, 2, 3, 4], created_at: Utc::now(), }; db.store_note(¬e).await.unwrap(); - let fetched_notes = db - .fetch_notes(TAG_LOCAL_ANY.into(), start.timestamp_micros().try_into().unwrap()) - .await - .unwrap(); + let cursor = start.timestamp_micros().try_into().unwrap(); + let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, None).await.unwrap(); assert_eq!(fetched_notes.len(), 1); assert_eq!(fetched_notes[0].header.id(), note.header.id()); @@ -145,7 +159,7 @@ mod tests { } #[tokio::test] - async fn test_fetch_notes_timestamp_filtering() { + async fn test_fetch_notes_pagination() { let db = Database::connect(DatabaseConfig::default(), Metrics::default().db) .await .unwrap(); @@ -153,7 +167,7 @@ mod tests { // Create a note with a specific received_at time let received_time = Utc::now(); let note = StoredNote { - header: test_note_header(), + header: test_note_header(default_test_account_id()), details: vec![1, 2, 3, 4], created_at: received_time, }; @@ -165,7 +179,8 @@ mod tests { .timestamp_micros() .try_into() .unwrap(); - let fetched_notes = db.fetch_notes(TAG_LOCAL_ANY.into(), before_cursor).await.unwrap(); + let fetched_notes = + db.fetch_notes(&[TAG_LOCAL_ANY.into()], before_cursor, None).await.unwrap(); assert_eq!(fetched_notes.len(), 1); assert_eq!(fetched_notes[0].header.id(), note.header.id()); @@ -174,7 +189,145 @@ mod tests { .timestamp_micros() .try_into() .unwrap(); - let fetched_notes = db.fetch_notes(TAG_LOCAL_ANY.into(), after_cursor).await.unwrap(); + let fetched_notes = + db.fetch_notes(&[TAG_LOCAL_ANY.into()], after_cursor, None).await.unwrap(); + assert_eq!(fetched_notes.len(), 0); + } + + #[tokio::test] + async fn test_fetch_notes_limit() { + let db = Database::connect(DatabaseConfig::default(), Metrics::default().db) + .await + .unwrap(); + let start = Utc::now(); + + // Create 5 notes with slightly different timestamps + let mut note_ids = Vec::new(); + for i in 0..5u8 { + let note = StoredNote { + header: test_note_header(default_test_account_id()), + details: vec![i], + created_at: start + chrono::Duration::milliseconds(i64::from(i) * 10), + }; + note_ids.push(note.header.id()); + db.store_note(¬e).await.unwrap(); + } + + let cursor = 0; + + // Limit = 2 + let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, Some(2)).await.unwrap(); + assert_eq!(fetched_notes.len(), 2); + // Verify they are the first two notes in order + assert_eq!(fetched_notes[0].header.id(), note_ids[0]); + assert_eq!(fetched_notes[1].header.id(), note_ids[1]); + + // Limit larger than available notes + let fetched_notes = + db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, Some(10)).await.unwrap(); + assert_eq!(fetched_notes.len(), 5); + // Verify all notes are returned in order + for (i, note) in fetched_notes.iter().enumerate() { + assert_eq!(note.header.id(), note_ids[i]); + } + + // No limit + let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, None).await.unwrap(); + assert_eq!(fetched_notes.len(), 5); + // Verify all notes are returned in order + for (i, note) in fetched_notes.iter().enumerate() { + assert_eq!(note.header.id(), note_ids[i]); + } + + // Limit = 0 + let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, Some(0)).await.unwrap(); assert_eq!(fetched_notes.len(), 0); } + + #[tokio::test] + async fn test_fetch_notes_multiple_tags() { + let db = Database::connect(DatabaseConfig::default(), Metrics::default().db) + .await + .unwrap(); + let start = Utc::now(); + + let account_id1 = random_account_id(); + let tag1 = NoteTag::from_account_id(account_id1); + + let account_id2 = random_account_id(); + let tag2 = NoteTag::from_account_id(account_id2); + + let account_id3 = random_account_id(); + let tag3 = NoteTag::from_account_id(account_id3); + + // Create notes with interleaved timestamps across 3 tags + // Tag1: 0, 30, 60 + // Tag2: 10, 40, 70 + // Tag3: 20, 50, 80 + + let mut tag1_note_ids = Vec::new(); + let mut tag2_note_ids = Vec::new(); + let mut tag3_note_ids = Vec::new(); + + // Create notes for tag1 at 0, 30, 60 + for i in 0..3u8 { + let note = StoredNote { + header: test_note_header(account_id1), + details: vec![i], + created_at: start + chrono::Duration::milliseconds(i64::from(i) * 30), + }; + tag1_note_ids.push(note.header.id()); + db.store_note(¬e).await.unwrap(); + + let note = StoredNote { + header: test_note_header(account_id2), + details: vec![i + 10], + created_at: start + chrono::Duration::milliseconds(i64::from(i) * 30 + 10), + }; + tag2_note_ids.push(note.header.id()); + db.store_note(¬e).await.unwrap(); + + let note = StoredNote { + header: test_note_header(account_id3), + details: vec![i + 20], + created_at: start + chrono::Duration::milliseconds(i64::from(i) * 30 + 20), + }; + tag3_note_ids.push(note.header.id()); + db.store_note(¬e).await.unwrap(); + } + + let cursor = 0; + + // Fetch all tags, no limit + let fetched_notes = db.fetch_notes(&[tag1, tag2, tag3], cursor, None).await.unwrap(); + assert_eq!(fetched_notes.len(), 9); + assert_eq!(fetched_notes[0].header.id(), tag1_note_ids[0]); + assert_eq!(fetched_notes[1].header.id(), tag2_note_ids[0]); + assert_eq!(fetched_notes[2].header.id(), tag3_note_ids[0]); + assert_eq!(fetched_notes[3].header.id(), tag1_note_ids[1]); + assert_eq!(fetched_notes[4].header.id(), tag2_note_ids[1]); + assert_eq!(fetched_notes[5].header.id(), tag3_note_ids[1]); + assert_eq!(fetched_notes[6].header.id(), tag1_note_ids[2]); + assert_eq!(fetched_notes[7].header.id(), tag2_note_ids[2]); + assert_eq!(fetched_notes[8].header.id(), tag3_note_ids[2]); + + // Fetch all tags, limit of 5 notes + let fetched_notes = db.fetch_notes(&[tag1, tag2, tag3], cursor, Some(5)).await.unwrap(); + assert_eq!(fetched_notes.len(), 5); + assert_eq!(fetched_notes[0].header.id(), tag1_note_ids[0]); + assert_eq!(fetched_notes[1].header.id(), tag2_note_ids[0]); + assert_eq!(fetched_notes[2].header.id(), tag3_note_ids[0]); + assert_eq!(fetched_notes[3].header.id(), tag1_note_ids[1]); + assert_eq!(fetched_notes[4].header.id(), tag2_note_ids[1]); + + // Fetch only 2 tags, no limit + let fetched_notes = db.fetch_notes(&[tag1, tag2], cursor, None).await.unwrap(); + assert_eq!(fetched_notes.len(), 6); + assert_eq!(fetched_notes[0].header.id(), tag1_note_ids[0]); + assert_eq!(fetched_notes[1].header.id(), tag2_note_ids[0]); + assert_eq!(fetched_notes[2].header.id(), tag1_note_ids[1]); + assert_eq!(fetched_notes[3].header.id(), tag2_note_ids[1]); + assert_eq!(fetched_notes[4].header.id(), tag1_note_ids[2]); + assert_eq!(fetched_notes[5].header.id(), tag2_note_ids[2]); + } } diff --git a/crates/node/src/database/sqlite/mod.rs b/crates/node/src/database/sqlite/mod.rs index d0c3ecc..9062e02 100644 --- a/crates/node/src/database/sqlite/mod.rs +++ b/crates/node/src/database/sqlite/mod.rs @@ -98,24 +98,36 @@ impl DatabaseBackend for SqliteDatabase { #[tracing::instrument(skip(self), fields(operation = "db.fetch_notes"))] async fn fetch_notes( &self, - tag: NoteTag, + tags: &[NoteTag], cursor: u64, + limit: Option, ) -> Result, DatabaseError> { let timer = self.metrics.db_fetch_notes(); + if tags.is_empty() { + return Ok(Vec::new()); + } + let cursor_i64: i64 = cursor.try_into().map_err(|_| { DatabaseError::QueryExecution("Cursor too large for SQLite".to_string()) })?; - let tag_value = i64::from(tag.as_u32()); + let tag_values: Vec = tags.iter().map(|tag| i64::from(tag.as_u32())).collect(); let notes: Vec = self .transact("fetch notes", move |conn| { use schema::notes::dsl::{created_at, notes, tag}; - let fetched_notes = notes - .filter(tag.eq(tag_value)) + let mut query = notes + .filter(tag.eq_any(tag_values)) .filter(created_at.gt(cursor_i64)) .order(created_at.asc()) - .load::(conn)?; + .into_boxed(); + + if let Some(limit_val) = limit { + let limit_i64 = i64::from(limit_val); + query = query.limit(limit_i64); + } + + let fetched_notes = query.load::(conn)?; Ok(fetched_notes) }) .await?; diff --git a/crates/node/src/node/grpc/mod.rs b/crates/node/src/node/grpc/mod.rs index 41b49b2..358557e 100644 --- a/crates/node/src/node/grpc/mod.rs +++ b/crates/node/src/node/grpc/mod.rs @@ -1,6 +1,5 @@ mod streaming; -use std::collections::BTreeSet; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -165,29 +164,28 @@ impl miden_note_transport_proto::miden_note_transport::miden_note_transport_serv let timer = self.metrics.grpc_fetch_notes_request(); let request_data = request.into_inner(); - let tags = request_data.tags.into_iter().collect::>(); + let tags: Vec<_> = request_data.tags.into_iter().map(std::convert::Into::into).collect(); let cursor = request_data.cursor; + let limit = request_data.limit; + + let stored_notes = self + .database + .fetch_notes(&tags, cursor, limit) + .await + .map_err(|e| tonic::Status::internal(format!("Failed to fetch notes: {e:?}")))?; let mut rcursor = cursor; - let mut proto_notes = vec![]; - for tag in tags { - let stored_notes = self - .database - .fetch_notes(tag.into(), cursor) - .await.map_err(|e| tonic::Status::internal(format!("Failed to fetch notes: {e:?}")))?; - - for stored_note in &stored_notes { - let ts_cursor: u64 = stored_note - .created_at - .timestamp_micros() - .try_into() - .map_err(|_| tonic::Status::internal("Timestamp too large for cursor"))?; - rcursor = rcursor.max(ts_cursor); - } - - proto_notes.extend(stored_notes.into_iter().map(TransportNote::from)); + for stored_note in &stored_notes { + let ts_cursor: u64 = stored_note + .created_at + .timestamp_micros() + .try_into() + .map_err(|_| tonic::Status::internal("Timestamp too large for cursor"))?; + rcursor = rcursor.max(ts_cursor); } + let proto_notes: Vec<_> = stored_notes.into_iter().map(TransportNote::from).collect(); + timer.finish("ok"); let proto_notes_size = proto_notes.iter().map(|pnote| (pnote.header.len() + pnote.details.len()) as u64).sum(); diff --git a/crates/node/src/node/grpc/streaming.rs b/crates/node/src/node/grpc/streaming.rs index eed39ac..edaadcc 100644 --- a/crates/node/src/node/grpc/streaming.rs +++ b/crates/node/src/node/grpc/streaming.rs @@ -79,7 +79,7 @@ impl NoteStreamerManager { let mut updates = vec![]; for (tag, tag_data) in &self.tags { - let snotes = self.database.fetch_notes(*tag, tag_data.lts).await?; + let snotes = self.database.fetch_notes(&[*tag], tag_data.lts, None).await?; let mut cursor = tag_data.lts; for snote in &snotes { let lcursor = snote diff --git a/crates/node/src/test_utils.rs b/crates/node/src/test_utils.rs index e0a8253..717072a 100644 --- a/crates/node/src/test_utils.rs +++ b/crates/node/src/test_utils.rs @@ -1,9 +1,16 @@ -use miden_objects::account::AccountId; +use miden_objects::account::{AccountId, AccountStorageMode}; use miden_objects::note::{NoteExecutionHint, NoteHeader, NoteId, NoteMetadata, NoteTag, NoteType}; -use miden_objects::testing::account_id::ACCOUNT_ID_MAX_ZEROES; +use miden_objects::testing::account_id::{ACCOUNT_ID_MAX_ZEROES, AccountIdBuilder}; use miden_objects::{Felt, Word}; use rand::Rng; +/// Generate a random [`AccountId`] +pub fn random_account_id() -> AccountId { + AccountIdBuilder::new() + .storage_mode(AccountStorageMode::Private) + .build_with_rng(&mut rand::rng()) +} + /// Generate a random [`NoteId`] pub fn random_note_id() -> NoteId { let mut rng = rand::rng(); @@ -24,12 +31,15 @@ pub fn random_note_id() -> NoteId { NoteId::new(recipient, asset_commitment) } -/// Generate a private [`NoteHeader`] with random sender -pub fn test_note_header() -> NoteHeader { +/// Generate a private [`NoteHeader`] with the tag derived from the given recipient account ID. +/// +/// The tag is created using [`NoteTag::from_account_id`] with the provided `recipient` account ID. +/// This allows tests to create notes with different tags by passing different account IDs. +pub fn test_note_header(recipient: AccountId) -> NoteHeader { let id = random_note_id(); let sender = AccountId::try_from(ACCOUNT_ID_MAX_ZEROES).unwrap(); let note_type = NoteType::Private; - let tag = NoteTag::from_account_id(sender); + let tag = NoteTag::from_account_id(recipient); let aux = Felt::try_from(0xffff_ffff_0000_0000u64).unwrap(); let execution_hint = NoteExecutionHint::None; diff --git a/crates/proto/src/generated/miden_note_transport.rs b/crates/proto/src/generated/miden_note_transport.rs index f9da3ea..08d4c13 100644 --- a/crates/proto/src/generated/miden_note_transport.rs +++ b/crates/proto/src/generated/miden_note_transport.rs @@ -26,6 +26,8 @@ pub struct FetchNotesRequest { pub tags: ::prost::alloc::vec::Vec, #[prost(fixed64, tag = "2")] pub cursor: u64, + #[prost(fixed32, optional, tag = "3")] + pub limit: ::core::option::Option, } /// API response for fetching notes #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/proto/proto/miden_note_transport.proto b/proto/proto/miden_note_transport.proto index fbfc801..f879418 100644 --- a/proto/proto/miden_note_transport.proto +++ b/proto/proto/miden_note_transport.proto @@ -26,6 +26,7 @@ message SendNoteResponse { } message FetchNotesRequest { repeated fixed32 tags = 1; fixed64 cursor = 2; + optional fixed32 limit = 3; } // API response for fetching notes