Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion crates/node/src/database/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,22 @@ impl DatabaseMaintenance {
#[cfg(test)]
mod tests {
use chrono::Utc;
use miden_objects::account::AccountId;
use miden_objects::testing::account_id::ACCOUNT_ID_MAX_ZEROES;
use serial_test::serial;

use super::*;
use crate::metrics::Metrics;
use crate::test_utils::test_note_header;
use crate::types::StoredNote;

fn default_test_account_id() -> AccountId {
AccountId::try_from(ACCOUNT_ID_MAX_ZEROES).unwrap()
}

fn note_at(age: Duration) -> StoredNote {
StoredNote {
header: test_note_header(),
header: test_note_header(default_test_account_id()),
details: vec![1, 2, 3, 4],
created_at: Utc::now() - age,
}
Expand Down
183 changes: 168 additions & 15 deletions crates/node/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ pub trait DatabaseBackend: Send + Sync {
/// Store a new note
async fn store_note(&self, note: &StoredNote) -> Result<(), DatabaseError>;

/// Fetch notes by tag
/// Fetch notes by tags
///
/// Fetched notes must be after the provided cursor, up to some limit of notes.
/// If limit is None, no limit is applied.
/// Notes from all tags are combined, ordered by timestamp globally, and the limit
/// is applied to the combined set.
async fn fetch_notes(
&self,
tag: NoteTag,
tags: &[NoteTag],
cursor: u64,
limit: Option<u32>,
) -> Result<Vec<StoredNote>, DatabaseError>;

/// Get statistics about the database
Expand Down Expand Up @@ -78,13 +84,17 @@ impl Database {
Ok(())
}

/// Fetch notes by tag with cursor-based pagination
/// Fetch notes by tags with cursor-based pagination
///
/// Notes from all tags are combined, ordered by timestamp globally, and the limit
/// is applied to the combined set.
pub async fn fetch_notes(
&self,
tag: NoteTag,
tags: &[NoteTag],
cursor: u64,
limit: Option<u32>,
) -> Result<Vec<StoredNote>, DatabaseError> {
self.backend.fetch_notes(tag, cursor).await
self.backend.fetch_notes(tags, cursor, limit).await
}

/// Get statistics about the database
Expand All @@ -106,13 +116,19 @@ impl Database {
#[cfg(test)]
mod tests {
use chrono::Utc;
use miden_objects::account::AccountId;
use miden_objects::testing::account_id::ACCOUNT_ID_MAX_ZEROES;

use super::*;
use crate::metrics::Metrics;
use crate::test_utils::test_note_header;
use crate::test_utils::{random_account_id, test_note_header};

const TAG_LOCAL_ANY: u32 = 0xc000_0000;

fn default_test_account_id() -> AccountId {
AccountId::try_from(ACCOUNT_ID_MAX_ZEROES).unwrap()
}

#[tokio::test]
async fn test_sqlite_database() {
let db = Database::connect(DatabaseConfig::default(), Metrics::default().db)
Expand All @@ -121,17 +137,15 @@ mod tests {
let start = Utc::now();

let note = StoredNote {
header: test_note_header(),
header: test_note_header(default_test_account_id()),
details: vec![1, 2, 3, 4],
created_at: Utc::now(),
};

db.store_note(&note).await.unwrap();

let fetched_notes = db
.fetch_notes(TAG_LOCAL_ANY.into(), start.timestamp_micros().try_into().unwrap())
.await
.unwrap();
let cursor = start.timestamp_micros().try_into().unwrap();
let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, None).await.unwrap();
assert_eq!(fetched_notes.len(), 1);
assert_eq!(fetched_notes[0].header.id(), note.header.id());

Expand All @@ -145,15 +159,15 @@ mod tests {
}

#[tokio::test]
async fn test_fetch_notes_timestamp_filtering() {
async fn test_fetch_notes_pagination() {
let db = Database::connect(DatabaseConfig::default(), Metrics::default().db)
.await
.unwrap();

// Create a note with a specific received_at time
let received_time = Utc::now();
let note = StoredNote {
header: test_note_header(),
header: test_note_header(default_test_account_id()),
details: vec![1, 2, 3, 4],
created_at: received_time,
};
Expand All @@ -165,7 +179,8 @@ mod tests {
.timestamp_micros()
.try_into()
.unwrap();
let fetched_notes = db.fetch_notes(TAG_LOCAL_ANY.into(), before_cursor).await.unwrap();
let fetched_notes =
db.fetch_notes(&[TAG_LOCAL_ANY.into()], before_cursor, None).await.unwrap();
assert_eq!(fetched_notes.len(), 1);
assert_eq!(fetched_notes[0].header.id(), note.header.id());

Expand All @@ -174,7 +189,145 @@ mod tests {
.timestamp_micros()
.try_into()
.unwrap();
let fetched_notes = db.fetch_notes(TAG_LOCAL_ANY.into(), after_cursor).await.unwrap();
let fetched_notes =
db.fetch_notes(&[TAG_LOCAL_ANY.into()], after_cursor, None).await.unwrap();
assert_eq!(fetched_notes.len(), 0);
}

#[tokio::test]
async fn test_fetch_notes_limit() {
let db = Database::connect(DatabaseConfig::default(), Metrics::default().db)
.await
.unwrap();
let start = Utc::now();

// Create 5 notes with slightly different timestamps
let mut note_ids = Vec::new();
for i in 0..5u8 {
let note = StoredNote {
header: test_note_header(default_test_account_id()),
details: vec![i],
created_at: start + chrono::Duration::milliseconds(i64::from(i) * 10),
};
note_ids.push(note.header.id());
db.store_note(&note).await.unwrap();
}

let cursor = 0;

// Limit = 2
let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, Some(2)).await.unwrap();
assert_eq!(fetched_notes.len(), 2);
// Verify they are the first two notes in order
assert_eq!(fetched_notes[0].header.id(), note_ids[0]);
assert_eq!(fetched_notes[1].header.id(), note_ids[1]);

// Limit larger than available notes
let fetched_notes =
db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, Some(10)).await.unwrap();
assert_eq!(fetched_notes.len(), 5);
// Verify all notes are returned in order
for (i, note) in fetched_notes.iter().enumerate() {
assert_eq!(note.header.id(), note_ids[i]);
}

// No limit
let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, None).await.unwrap();
assert_eq!(fetched_notes.len(), 5);
// Verify all notes are returned in order
for (i, note) in fetched_notes.iter().enumerate() {
assert_eq!(note.header.id(), note_ids[i]);
}

// Limit = 0
let fetched_notes = db.fetch_notes(&[TAG_LOCAL_ANY.into()], cursor, Some(0)).await.unwrap();
assert_eq!(fetched_notes.len(), 0);
}

#[tokio::test]
async fn test_fetch_notes_multiple_tags() {
let db = Database::connect(DatabaseConfig::default(), Metrics::default().db)
.await
.unwrap();
let start = Utc::now();

let account_id1 = random_account_id();
let tag1 = NoteTag::from_account_id(account_id1);

let account_id2 = random_account_id();
let tag2 = NoteTag::from_account_id(account_id2);

let account_id3 = random_account_id();
let tag3 = NoteTag::from_account_id(account_id3);

// Create notes with interleaved timestamps across 3 tags
// Tag1: 0, 30, 60
// Tag2: 10, 40, 70
// Tag3: 20, 50, 80

let mut tag1_note_ids = Vec::new();
let mut tag2_note_ids = Vec::new();
let mut tag3_note_ids = Vec::new();

// Create notes for tag1 at 0, 30, 60
for i in 0..3u8 {
let note = StoredNote {
header: test_note_header(account_id1),
details: vec![i],
created_at: start + chrono::Duration::milliseconds(i64::from(i) * 30),
};
tag1_note_ids.push(note.header.id());
db.store_note(&note).await.unwrap();

let note = StoredNote {
header: test_note_header(account_id2),
details: vec![i + 10],
created_at: start + chrono::Duration::milliseconds(i64::from(i) * 30 + 10),
};
tag2_note_ids.push(note.header.id());
db.store_note(&note).await.unwrap();

let note = StoredNote {
header: test_note_header(account_id3),
details: vec![i + 20],
created_at: start + chrono::Duration::milliseconds(i64::from(i) * 30 + 20),
};
tag3_note_ids.push(note.header.id());
db.store_note(&note).await.unwrap();
}

let cursor = 0;

// Fetch all tags, no limit
let fetched_notes = db.fetch_notes(&[tag1, tag2, tag3], cursor, None).await.unwrap();
assert_eq!(fetched_notes.len(), 9);
assert_eq!(fetched_notes[0].header.id(), tag1_note_ids[0]);
assert_eq!(fetched_notes[1].header.id(), tag2_note_ids[0]);
assert_eq!(fetched_notes[2].header.id(), tag3_note_ids[0]);
assert_eq!(fetched_notes[3].header.id(), tag1_note_ids[1]);
assert_eq!(fetched_notes[4].header.id(), tag2_note_ids[1]);
assert_eq!(fetched_notes[5].header.id(), tag3_note_ids[1]);
assert_eq!(fetched_notes[6].header.id(), tag1_note_ids[2]);
assert_eq!(fetched_notes[7].header.id(), tag2_note_ids[2]);
assert_eq!(fetched_notes[8].header.id(), tag3_note_ids[2]);

// Fetch all tags, limit of 5 notes
let fetched_notes = db.fetch_notes(&[tag1, tag2, tag3], cursor, Some(5)).await.unwrap();
assert_eq!(fetched_notes.len(), 5);
assert_eq!(fetched_notes[0].header.id(), tag1_note_ids[0]);
assert_eq!(fetched_notes[1].header.id(), tag2_note_ids[0]);
assert_eq!(fetched_notes[2].header.id(), tag3_note_ids[0]);
assert_eq!(fetched_notes[3].header.id(), tag1_note_ids[1]);
assert_eq!(fetched_notes[4].header.id(), tag2_note_ids[1]);

// Fetch only 2 tags, no limit
let fetched_notes = db.fetch_notes(&[tag1, tag2], cursor, None).await.unwrap();
assert_eq!(fetched_notes.len(), 6);
assert_eq!(fetched_notes[0].header.id(), tag1_note_ids[0]);
assert_eq!(fetched_notes[1].header.id(), tag2_note_ids[0]);
assert_eq!(fetched_notes[2].header.id(), tag1_note_ids[1]);
assert_eq!(fetched_notes[3].header.id(), tag2_note_ids[1]);
assert_eq!(fetched_notes[4].header.id(), tag1_note_ids[2]);
assert_eq!(fetched_notes[5].header.id(), tag2_note_ids[2]);
}
}
22 changes: 17 additions & 5 deletions crates/node/src/database/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,36 @@ impl DatabaseBackend for SqliteDatabase {
#[tracing::instrument(skip(self), fields(operation = "db.fetch_notes"))]
async fn fetch_notes(
&self,
tag: NoteTag,
tags: &[NoteTag],
cursor: u64,
limit: Option<u32>,
) -> Result<Vec<StoredNote>, DatabaseError> {
let timer = self.metrics.db_fetch_notes();

if tags.is_empty() {
return Ok(Vec::new());
}

let cursor_i64: i64 = cursor.try_into().map_err(|_| {
DatabaseError::QueryExecution("Cursor too large for SQLite".to_string())
})?;

let tag_value = i64::from(tag.as_u32());
let tag_values: Vec<i64> = tags.iter().map(|tag| i64::from(tag.as_u32())).collect();
let notes: Vec<Note> = self
.transact("fetch notes", move |conn| {
use schema::notes::dsl::{created_at, notes, tag};
let fetched_notes = notes
.filter(tag.eq(tag_value))
let mut query = notes
.filter(tag.eq_any(tag_values))
.filter(created_at.gt(cursor_i64))
.order(created_at.asc())
.load::<Note>(conn)?;
.into_boxed();

if let Some(limit_val) = limit {
let limit_i64 = i64::from(limit_val);
query = query.limit(limit_i64);
}

let fetched_notes = query.load::<Note>(conn)?;
Ok(fetched_notes)
})
.await?;
Expand Down
36 changes: 17 additions & 19 deletions crates/node/src/node/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod streaming;

use std::collections::BTreeSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -165,29 +164,28 @@ impl miden_note_transport_proto::miden_note_transport::miden_note_transport_serv
let timer = self.metrics.grpc_fetch_notes_request();

let request_data = request.into_inner();
let tags = request_data.tags.into_iter().collect::<BTreeSet<_>>();
let tags: Vec<_> = request_data.tags.into_iter().map(std::convert::Into::into).collect();
let cursor = request_data.cursor;
let limit = request_data.limit;

let stored_notes = self
.database
.fetch_notes(&tags, cursor, limit)
.await
.map_err(|e| tonic::Status::internal(format!("Failed to fetch notes: {e:?}")))?;

let mut rcursor = cursor;
let mut proto_notes = vec![];
for tag in tags {
let stored_notes = self
.database
.fetch_notes(tag.into(), cursor)
.await.map_err(|e| tonic::Status::internal(format!("Failed to fetch notes: {e:?}")))?;

for stored_note in &stored_notes {
let ts_cursor: u64 = stored_note
.created_at
.timestamp_micros()
.try_into()
.map_err(|_| tonic::Status::internal("Timestamp too large for cursor"))?;
rcursor = rcursor.max(ts_cursor);
}

proto_notes.extend(stored_notes.into_iter().map(TransportNote::from));
for stored_note in &stored_notes {
let ts_cursor: u64 = stored_note
.created_at
.timestamp_micros()
.try_into()
.map_err(|_| tonic::Status::internal("Timestamp too large for cursor"))?;
rcursor = rcursor.max(ts_cursor);
}

let proto_notes: Vec<_> = stored_notes.into_iter().map(TransportNote::from).collect();

timer.finish("ok");

let proto_notes_size = proto_notes.iter().map(|pnote| (pnote.header.len() + pnote.details.len()) as u64).sum();
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/node/grpc/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl NoteStreamerManager {

let mut updates = vec![];
for (tag, tag_data) in &self.tags {
let snotes = self.database.fetch_notes(*tag, tag_data.lts).await?;
let snotes = self.database.fetch_notes(&[*tag], tag_data.lts, None).await?;
let mut cursor = tag_data.lts;
for snote in &snotes {
let lcursor = snote
Expand Down
Loading
Loading