From f1c385d3e3ed25abd415ede69bc40c88d8086e4b Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Wed, 15 Oct 2025 17:49:11 +0200 Subject: [PATCH 1/3] initial impl of account storage cleanup --- crates/store/src/db/mod.rs | 195 +++++++++-- .../store/src/db/models/queries/accounts.rs | 175 ++++++++++ crates/store/src/db/tests.rs | 322 +++++++++++++++++- crates/store/src/errors.rs | 2 + 4 files changed, 674 insertions(+), 20 deletions(-) diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 74277ee83..fae6915df 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -47,8 +47,10 @@ pub(crate) mod schema; pub type Result = std::result::Result; +#[derive(Clone)] pub struct Db { pool: deadpool_diesel::Pool>, + notify_cleanup_task: tokio::sync::mpsc::Sender, } /// Describes the value of an asset for an account ID at `block_num` specifically. @@ -303,6 +305,8 @@ impl Db { } /// Open a connection to the DB and apply any pending migrations. + /// + /// This also spawns a background task that handles periodic cleanup of old account data. #[instrument(target = COMPONENT, skip_all)] pub async fn load(database_filepath: PathBuf) -> Result { let manager = ConnectionManager::new(database_filepath.to_str().unwrap()); @@ -314,8 +318,19 @@ impl Db { "Connected to the database" ); - let me = Db { pool }; + // Create channel for cleanup notifications + // Buffer size of 2 is sufficient since cleanup only happens every 10 blocks + let (notify_cleanup_task, rx) = tokio::sync::mpsc::channel(2); + + let me = Db { pool, notify_cleanup_task }; + let me2 = me.clone(); + + // Spawn background cleanup task + let _cleanup_task_handle = + tokio::spawn(async move { Self::periodic_cleanup_task(me2, rx).await }); + me.query("migrations", apply_migrations).await?; + Ok(me) } @@ -477,6 +492,10 @@ impl Db { /// /// `allow_acquire` and `acquire_done` are used to synchronize writes to the DB with writes to /// the in-memory trees. Further details available on [`super::state::State::apply_block`]. + /// + /// After successfully applying the block, this function notifies the background cleanup task + /// about the new block number. The cleanup task will decide whether to trigger a cleanup + /// based on block number divisibility. // TODO: This span is logged in a root span, we should connect it to the parent one. #[instrument(target = COMPONENT, skip_all, err)] pub async fn apply_block( @@ -486,28 +505,166 @@ impl Db { block: ProvenBlock, notes: Vec<(NoteRecord, Option)>, ) -> Result<()> { - self.transact("apply block", move |conn| -> Result<()> { - // TODO: This span is logged in a root span, we should connect it to the parent one. - let _span = info_span!(target: COMPONENT, "write_block_to_db").entered(); + let block_num = block.header().block_num(); - models::queries::apply_block( - conn, - block.header(), - ¬es, - block.created_nullifiers(), - block.updated_accounts(), - block.transactions(), - )?; + let result = self + .transact("apply block", move |conn| -> Result<()> { + // TODO: This span is logged in a root span, we should connect it to the parent one. + let _span = info_span!(target: COMPONENT, "write_block_to_db").entered(); - // XXX FIXME TODO free floating mutex MUST NOT exist - // it doesn't bind it properly to the data locked! - let _ = allow_acquire.send(()); + models::queries::apply_block( + conn, + block.header(), + ¬es, + block.created_nullifiers(), + block.updated_accounts(), + block.transactions(), + )?; - acquire_done.blocking_recv()?; + // XXX FIXME TODO free floating mutex MUST NOT exist + // it doesn't bind it properly to the data locked! + let _ = allow_acquire.send(()); - Ok(()) - }) - .await + acquire_done.blocking_recv()?; + + Ok(()) + }) + .await; + + // Notify the cleanup task of the latest applied block + // Ignore errors since cleanup is non-critical and shouldn't block block application + let _res = self.notify_cleanup_task.try_send(block_num); + + result + } + + /// Background task that handles periodic cleanup of old account data. + /// + /// This task runs indefinitely, receiving block numbers from the `apply_block` method + /// and triggering cleanup whenever new blocks are available. The cleanup process: + /// + /// 1. Batches incoming notifications using `recv_many` to avoid excessive cleanup operations + /// 2. Only processes the most recent block number from the batch (coalescing multiple updates) + /// 3. Runs cleanup with a 30-second timeout to prevent blocking + /// 4. Logs success or failure but continues running regardless of cleanup outcome + /// + /// # Batching Strategy + /// + /// The batching approach ensures that if multiple blocks are applied quickly (e.g., during + /// initial sync), only the latest block number triggers cleanup. This prevents redundant + /// cleanup operations while ensuring cleanup runs on the most recent state. + /// + /// # Error Handling + /// + /// This task never exits on cleanup errors. Cleanup failures are logged but the task + /// continues to process future blocks. This ensures that temporary issues (like database + /// locks or high load) don't permanently disable the cleanup mechanism. + /// + /// The task only exits if the channel is closed (i.e., all `Db` instances are dropped), + /// which typically happens during application shutdown. + async fn periodic_cleanup_task(db: Self, mut notify: tokio::sync::mpsc::Receiver) { + let mut buf = Vec::with_capacity(128); + + loop { + // Receive many notifications at once to batch them + // If the channel is closed (returns 0), exit the task + let received = notify.recv_many(&mut buf, 128).await; + if received == 0 { + tracing::info!(target: COMPONENT, "Cleanup task shutting down: channel closed"); + break; + } + + // Only process the most recent block number from the batch + // This coalesces multiple cleanup requests during fast block processing + if let Some(block_num) = buf.pop() { + match db.run_periodic_cleanup(block_num).await { + Ok((vault_deleted, storage_deleted)) => { + tracing::info!( + target: COMPONENT, + block_num = block_num.as_u32(), + vault_assets_deleted = vault_deleted, + storage_map_values_deleted = storage_deleted, + "Periodic cleanup completed successfully" + ); + }, + Err(e) => { + tracing::warn!( + target: COMPONENT, + block_num = block_num.as_u32(), + error = %e, + "Periodic cleanup failed, will retry on next block" + ); + }, + } + } + + // Clear the buffer for the next batch + buf.clear(); + } + } + + /// Runs periodic cleanup of old account data with a timeout. + /// + /// This function cleans up old vault asset and storage map value entries for all accounts, + /// keeping only the latest entry and up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT historical + /// entries per key. + /// + /// The cleanup operation has a 30-second timeout to prevent it from blocking for too long. + /// If the timeout is reached, the cleanup is aborted and returns an error. + /// + /// # Parameters + /// * `block_num` - The block number at which cleanup was triggered (used for logging) + /// + /// # Returns + /// A tuple of (vault_assets_deleted, storage_map_values_deleted) on success, or an error + /// if the operation fails or times out. + #[instrument(level = "debug", target = COMPONENT, skip(self), fields(block_num = %block_num.as_u32()))] + async fn run_periodic_cleanup(&self, block_num: BlockNumber) -> Result<(usize, usize)> { + use std::time::Duration; + + let cleanup_timeout = Duration::from_secs(30); + let start = std::time::Instant::now(); + + let cleanup_task = self + .transact("periodic cleanup", move |conn| models::queries::cleanup_all_accounts(conn)); + + // Run cleanup with timeout + let result = tokio::time::timeout(cleanup_timeout, cleanup_task).await; + + let duration = start.elapsed(); + + match result { + Ok(Ok((vault_deleted, storage_deleted))) => { + tracing::info!( + target: COMPONENT, + block_num = block_num.as_u32(), + vault_assets_deleted = vault_deleted, + storage_map_values_deleted = storage_deleted, + duration_ms = duration.as_millis(), + "Cleanup completed within timeout" + ); + Ok((vault_deleted, storage_deleted)) + }, + Ok(Err(e)) => { + tracing::error!( + target: COMPONENT, + block_num = block_num.as_u32(), + duration_ms = duration.as_millis(), + error = %e, + "Cleanup failed" + ); + Err(e) + }, + Err(_timeout_err) => { + tracing::warn!( + target: COMPONENT, + block_num = block_num.as_u32(), + timeout_ms = cleanup_timeout.as_millis(), + "Cleanup timed out - operation was aborted" + ); + Err(DatabaseError::QueryTimeout("periodic cleanup".to_string())) + }, + } } /// Selects storage map values for syncing storage maps for a specific account ID. diff --git a/crates/store/src/db/models/queries/accounts.rs b/crates/store/src/db/models/queries/accounts.rs index 4f3946062..108a43169 100644 --- a/crates/store/src/db/models/queries/accounts.rs +++ b/crates/store/src/db/models/queries/accounts.rs @@ -47,6 +47,15 @@ use crate::db::models::{serialize_vec, vec_raw_try_into}; use crate::db::{AccountVaultValue, schema}; use crate::errors::DatabaseError; +// CONSTANTS +// ================================================================================================ + +/// Maximum number of historical entries to retain per account for vault assets and storage map +/// values. This ensures we don't accumulate unbounded historical data in the database. +/// The latest entry (marked with is_latest_update=true) is always retained regardless of this +/// limit. +pub const MAX_HISTORICAL_ENTRIES_PER_ACCOUNT: usize = 50; + /// Select the latest account details by account id from the DB using the given /// [`SqliteConnection`]. /// @@ -915,3 +924,169 @@ pub(crate) struct AccountStorageMapRowInsert { pub(crate) value: Vec, pub(crate) is_latest_update: bool, } + +// CLEANUP FUNCTIONS +// ================================================================================================ + +/// Clean up old vault asset entries for a specific account, keeping only the latest entry +/// (is_latest_update=true) and up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT older entries per +/// (account_id, vault_key) combination. +/// +/// # Parameters +/// * `conn`: Database connection +/// * `account_id`: The account to clean up +/// +/// # Returns +/// The number of rows deleted +/// +/// # Notes +/// This function ensures we don't accumulate unbounded historical data while preserving: +/// - The latest state for each vault key (is_latest_update=true) +/// - Up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT historical entries per vault key +pub(crate) fn cleanup_old_account_vault_assets( + conn: &mut SqliteConnection, + account_id: AccountId, +) -> Result { + // Strategy: For each vault_key, keep the latest entry (is_latest_update=true) and the + // most recent MAX_HISTORICAL_ENTRIES_PER_ACCOUNT historical entries, deleting the rest. + // + // Unfortunately, Diesel doesn't support window functions (ROW_NUMBER OVER) in its type-safe + // API, so we must use raw SQL for this complex deletion pattern. This is necessary because: + // 1. We need to partition by vault_key and order by block_num + // 2. We need to delete based on row ranking within each partition + // 3. The table is created WITHOUT ROWID, so we use the primary key (account_id, block_num, + // vault_key) + + let account_id_bytes = account_id.to_bytes(); + let limit = i64::try_from(MAX_HISTORICAL_ENTRIES_PER_ACCOUNT) + .expect("MAX_HISTORICAL_ENTRIES_PER_ACCOUNT should fit in i64"); + + // This query: + // 1. For each vault_key, ranks only non-latest entries by block_num descending (newest first) + // 2. Keeps entries where rank <= MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + // 3. Deletes everything else using the primary key (account_id, block_num, vault_key) + // Note: The latest entry (is_latest_update=true) is never ranked and never deleted + + diesel::sql_query( + r#" + DELETE FROM account_vault_assets + WHERE (account_id, block_num, vault_key) IN ( + SELECT account_id, block_num, vault_key FROM ( + SELECT + account_id, + block_num, + vault_key, + ROW_NUMBER() OVER ( + PARTITION BY vault_key + ORDER BY block_num DESC + ) as row_num + FROM account_vault_assets + WHERE account_id = ?1 AND is_latest_update = 0 + ) + WHERE row_num > ?2 + ) + "#, + ) + .bind::(&account_id_bytes) + .bind::(limit) + .execute(conn) + .map_err(DatabaseError::Diesel) +} + +/// Clean up old storage map value entries for a specific account, keeping only the latest entry +/// (is_latest_update=true) and up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT older entries per +/// (account_id, slot, key) combination. +/// +/// # Parameters +/// * `conn`: Database connection +/// * `account_id`: The account to clean up +/// +/// # Returns +/// The number of rows deleted +/// +/// # Notes +/// This function ensures we don't accumulate unbounded historical data while preserving: +/// - The latest state for each (slot, key) pair (is_latest_update=true) +/// - Up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT historical entries per (slot, key) pair +pub(crate) fn cleanup_old_account_storage_map_values( + conn: &mut SqliteConnection, + account_id: AccountId, +) -> Result { + // Strategy: Similar to vault assets cleanup, but partition by (slot, key) pairs + // + // Unfortunately, Diesel doesn't support window functions (ROW_NUMBER OVER) in its type-safe + // API, so we must use raw SQL for this complex deletion pattern. This is necessary because: + // 1. We need to partition by (slot, key) and order by block_num + // 2. We need to delete based on row ranking within each partition + // 3. The table is created WITHOUT ROWID, so we use the primary key (account_id, block_num, + // slot, key) + + let account_id_bytes = account_id.to_bytes(); + let limit = i64::try_from(MAX_HISTORICAL_ENTRIES_PER_ACCOUNT) + .expect("MAX_HISTORICAL_ENTRIES_PER_ACCOUNT should fit in i64"); + + // This query: + // 1. For each (slot, key) combination, ranks only non-latest entries by block_num descending + // (newest first) + // 2. Keeps entries where rank <= MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + // 3. Deletes everything else using the primary key (account_id, block_num, slot, key) + // Note: The latest entry (is_latest_update=true) is never ranked and never deleted + + diesel::sql_query( + r#" + DELETE FROM account_storage_map_values + WHERE (account_id, block_num, slot, key) IN ( + SELECT account_id, block_num, slot, key FROM ( + SELECT + account_id, + block_num, + slot, + key, + ROW_NUMBER() OVER ( + PARTITION BY slot, key + ORDER BY block_num DESC + ) as row_num + FROM account_storage_map_values + WHERE account_id = ?1 AND is_latest_update = 0 + ) + WHERE row_num > ?2 + ) + "#, + ) + .bind::(&account_id_bytes) + .bind::(limit) + .execute(conn) + .map_err(DatabaseError::Diesel) +} + +/// Clean up old entries for all accounts in the database. +/// +/// This is a maintenance function that should be called periodically to prevent +/// unbounded growth of historical data. +/// +/// # Parameters +/// * `conn`: Database connection +/// +/// # Returns +/// A tuple of (vault_assets_deleted, storage_map_values_deleted) +pub(crate) fn cleanup_all_accounts( + conn: &mut SqliteConnection, +) -> Result<(usize, usize), DatabaseError> { + use schema::accounts; + + // Get all distinct account IDs using Diesel's query builder API + let account_ids: Vec> = + QueryDsl::distinct(SelectDsl::select(accounts::table, accounts::account_id)).load(conn)?; + + let mut total_vault_deleted = 0; + let mut total_storage_deleted = 0; + + for account_id_bytes in account_ids { + let account_id = AccountId::read_from_bytes(&account_id_bytes)?; + + total_vault_deleted += cleanup_old_account_vault_assets(conn, account_id)?; + total_storage_deleted += cleanup_old_account_storage_map_values(conn, account_id)?; + } + + Ok((total_vault_deleted, total_storage_deleted)) +} diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index f8aa34eef..05fa06b18 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -4,10 +4,11 @@ use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use diesel::{Connection, SqliteConnection}; +use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection}; use miden_lib::account::auth::AuthRpoFalcon512; use miden_lib::note::create_p2id_note; use miden_lib::transaction::TransactionKernel; +use miden_lib::utils::Serializable; use miden_node_proto::domain::account::AccountSummary; use miden_node_utils::fee::test_fee_params; use miden_objects::account::delta::AccountUpdateDetails; @@ -1501,3 +1502,322 @@ fn mock_account_code_and_storage( .build_existing() .unwrap() } + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_old_account_vault_assets() { + use crate::db::queries::{ + MAX_HISTORICAL_ENTRIES_PER_ACCOUNT, + cleanup_old_account_vault_assets, + }; + use crate::db::schema::account_vault_assets::dsl; + + let mut conn = create_db(); + let account_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); + + // Create blocks + for i in 1..=60 { + create_block(&mut conn, BlockNumber::from(i)); + } + + // Create account + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account_id, 0)], + BlockNumber::from(1), + ) + .unwrap(); + + let vault_key = num_to_word(100); + let asset_base = Asset::Fungible(FungibleAsset::new(account_id, 1000).unwrap()); + + // Insert 60 vault asset entries for the same vault_key + for block_num in 1..=60 { + queries::insert_account_vault_asset( + &mut conn, + account_id, + BlockNumber::from(block_num), + vault_key, + Some(asset_base), + ) + .unwrap(); + } + + // Verify we have 60 entries using Diesel API + let count = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!(count, 60, "Should have 60 entries before cleanup"); + + // Run cleanup + let deleted = cleanup_old_account_vault_assets(&mut conn, account_id).unwrap(); + + // We should have deleted entries beyond MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + // The latest entry (is_latest_update=true) is always kept + // Plus up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT non-latest entries + let expected_remaining = MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + 1; // +1 for the latest + let expected_deleted = 60 - expected_remaining; + + assert_eq!( + deleted, expected_deleted, + "Should have deleted {} old entries", + expected_deleted + ); + + // Verify remaining count using Diesel API + let remaining = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!( + remaining as usize, expected_remaining, + "Should have {} entries remaining", + expected_remaining + ); + + // Verify the latest entry is still marked as latest using Diesel API + let latest_count = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest_update.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!(latest_count, 1, "Should have exactly one latest entry"); + + // Verify the latest entry is from block 60 using Diesel API + let latest_block = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest_update.eq(true)) + .select(dsl::block_num) + .first::(&mut conn) + .unwrap(); + assert_eq!(latest_block, 60, "Latest entry should be from block 60"); +} + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_old_account_storage_map_values() { + use crate::db::queries::{ + MAX_HISTORICAL_ENTRIES_PER_ACCOUNT, + cleanup_old_account_storage_map_values, + }; + use crate::db::schema::account_storage_map_values::dsl; + + let mut conn = create_db(); + let account_id = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE).unwrap(); + + // Create blocks + for i in 1..=70 { + create_block(&mut conn, BlockNumber::from(i)); + } + + let slot = 5u8; + let key = num_to_word(123); + let value_base = num_to_word(456); + + // Insert 70 storage map value entries for the same (slot, key) combination + for block_num in 1..=70 { + queries::insert_account_storage_map_value( + &mut conn, + account_id, + BlockNumber::from(block_num), + slot, + key, + value_base, + ) + .unwrap(); + } + + // Verify we have 70 entries using Diesel API + let count = dsl::account_storage_map_values + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!(count, 70, "Should have 70 entries before cleanup"); + + // Run cleanup + let deleted = cleanup_old_account_storage_map_values(&mut conn, account_id).unwrap(); + + // We should have deleted entries beyond MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + let expected_remaining = MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + 1; // +1 for the latest + let expected_deleted = 70 - expected_remaining; + + assert_eq!( + deleted, expected_deleted, + "Should have deleted {} old entries", + expected_deleted + ); + + // Verify remaining count using Diesel API + let remaining = dsl::account_storage_map_values + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!( + remaining as usize, expected_remaining, + "Should have {} entries remaining", + expected_remaining + ); + + // Verify the latest entry is still marked as latest using Diesel API + let latest_count = dsl::account_storage_map_values + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest_update.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!(latest_count, 1, "Should have exactly one latest entry"); +} + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_preserves_latest_state() { + use crate::db::queries::cleanup_old_account_vault_assets; + use crate::db::schema::account_vault_assets::dsl; + + let mut conn = create_db(); + let account_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); + + // Create blocks + for i in 1..=10 { + create_block(&mut conn, BlockNumber::from(i)); + } + + // Create account + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account_id, 0)], + BlockNumber::from(1), + ) + .unwrap(); + + // Test with multiple vault keys to ensure per-key cleanup + let vault_key_1 = num_to_word(100); + let vault_key_2 = num_to_word(200); + let asset = Asset::Fungible(FungibleAsset::new(account_id, 1000).unwrap()); + + // Insert entries for both keys + for block_num in 1..=10 { + queries::insert_account_vault_asset( + &mut conn, + account_id, + BlockNumber::from(block_num), + vault_key_1, + Some(asset), + ) + .unwrap(); + + queries::insert_account_vault_asset( + &mut conn, + account_id, + BlockNumber::from(block_num), + vault_key_2, + Some(asset), + ) + .unwrap(); + } + + // Run cleanup (should not delete anything since we're under the limit) + let deleted = cleanup_old_account_vault_assets(&mut conn, account_id).unwrap(); + assert_eq!(deleted, 0, "Should not delete anything when under limit"); + + // Verify both latest entries exist using Diesel API + let latest_entries: Vec<(Vec, i64)> = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest_update.eq(true)) + .select((dsl::vault_key, dsl::block_num)) + .order(dsl::vault_key.asc()) + .load(&mut conn) + .unwrap(); + + assert_eq!(latest_entries.len(), 2, "Should have two latest entries"); + assert_eq!(latest_entries[0].1, 10, "Latest for key 1 should be block 10"); + assert_eq!(latest_entries[1].1, 10, "Latest for key 2 should be block 10"); +} + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_all_accounts() { + use crate::db::queries::cleanup_all_accounts; + use crate::db::schema::account_vault_assets::dsl; + + let mut conn = create_db(); + + // Create two accounts + let account1 = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE).unwrap(); + let account2 = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE_2).unwrap(); + let faucet_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); + + // Create blocks + for i in 1..=60 { + create_block(&mut conn, BlockNumber::from(i)); + } + + // Create accounts + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account1, 0)], + BlockNumber::from(1), + ) + .unwrap(); + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account2, 0)], + BlockNumber::from(1), + ) + .unwrap(); + + // Insert many entries for both accounts + // Use the same faucet for both accounts since fungible assets need a faucet account ID + let vault_key = num_to_word(100); + let asset = Asset::Fungible(FungibleAsset::new(faucet_id, 1000).unwrap()); + + for block_num in 1..=60 { + queries::insert_account_vault_asset( + &mut conn, + account1, + BlockNumber::from(block_num), + vault_key, + Some(asset), + ) + .unwrap(); + + let asset2 = Asset::Fungible(FungibleAsset::new(faucet_id, 2000).unwrap()); + queries::insert_account_vault_asset( + &mut conn, + account2, + BlockNumber::from(block_num), + vault_key, + Some(asset2), + ) + .unwrap(); + } + + // Run cleanup for all accounts + let (vault_deleted, _) = cleanup_all_accounts(&mut conn).unwrap(); + + // We should have deleted entries from both accounts + assert!(vault_deleted > 0, "Should have deleted some entries"); + + // Each account should have MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + 1 entries remaining using + // Diesel API + let count1 = dsl::account_vault_assets + .filter(dsl::account_id.eq(account1.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + + let count2 = dsl::account_vault_assets + .filter(dsl::account_id.eq(account2.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + + assert_eq!(count1 as usize, crate::db::queries::MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + 1); + assert_eq!(count2 as usize, crate::db::queries::MAX_HISTORICAL_ENTRIES_PER_ACCOUNT + 1); +} diff --git a/crates/store/src/errors.rs b/crates/store/src/errors.rs index 353cb215a..4e67105d1 100644 --- a/crates/store/src/errors.rs +++ b/crates/store/src/errors.rs @@ -115,6 +115,8 @@ pub enum DatabaseError { ConnectionManager(#[from] ConnectionManagerError), #[error(transparent)] SqlValueConversion(#[from] DatabaseTypeConversionError), + #[error("query timeout: {0}")] + QueryTimeout(String), } impl DatabaseError { From 5b3693ec088c494bd34ecc2b7423f843ad8999e5 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 16 Oct 2025 12:02:35 +0200 Subject: [PATCH 2/3] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 602fd9bd5..98a166ce8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - [BREAKING] Renamed `RemoteProverProxy` to `RemoteProverClient` ([#1236](https://github.com/0xMiden/miden-node/pull/1236)). - Added pagination to `SyncNotes` endpoint ([#1257](https://github.com/0xMiden/miden-node/pull/1257)). - [BREAKING] Response type nuances of `GetAccountProof` in the public store API (#[1277](https://github.com/0xMiden/miden-node/pull/1277)). +- Add a cyclic database cleanup task for account related records ([#1296](https://github.com/0xMiden/miden-node/pull/1296)). ## v0.11.2 (2025-09-10) From 8d2ab2ac769b9052275b71ee837d9631f95bd4f4 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 16 Oct 2025 13:26:56 +0200 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98a166ce8..9cdeb0b2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ - [BREAKING] Renamed `RemoteProverProxy` to `RemoteProverClient` ([#1236](https://github.com/0xMiden/miden-node/pull/1236)). - Added pagination to `SyncNotes` endpoint ([#1257](https://github.com/0xMiden/miden-node/pull/1257)). - [BREAKING] Response type nuances of `GetAccountProof` in the public store API (#[1277](https://github.com/0xMiden/miden-node/pull/1277)). -- Add a cyclic database cleanup task for account related records ([#1296](https://github.com/0xMiden/miden-node/pull/1296)). +- Add a database cleanup task for account related records ([#1296](https://github.com/0xMiden/miden-node/pull/1296)). ## v0.11.2 (2025-09-10)