Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- [BREAKING] Renamed `RemoteProverProxy` to `RemoteProverClient` ([#1236](https://github.com/0xMiden/miden-node/pull/1236)).
- Added pagination to `SyncNotes` endpoint ([#1257](https://github.com/0xMiden/miden-node/pull/1257)).
- [BREAKING] Response type nuances of `GetAccountProof` in the public store API (#[1277](https://github.com/0xMiden/miden-node/pull/1277)).
- Add a database cleanup task for account related records ([#1296](https://github.com/0xMiden/miden-node/pull/1296)).

## v0.11.2 (2025-09-10)

Expand Down
195 changes: 176 additions & 19 deletions crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ pub(crate) mod schema;

pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;

#[derive(Clone)]
pub struct Db {
pool: deadpool_diesel::Pool<ConnectionManager, deadpool::managed::Object<ConnectionManager>>,
notify_cleanup_task: tokio::sync::mpsc::Sender<BlockNumber>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on what we do with the validator database (whether we re-use this or not), it would be good to make this task optional/disable-able

}

/// Describes the value of an asset for an account ID at `block_num` specifically.
Expand Down Expand Up @@ -303,6 +305,8 @@ impl Db {
}

/// Open a connection to the DB and apply any pending migrations.
///
/// This also spawns a background task that handles periodic cleanup of old account data.
#[instrument(target = COMPONENT, skip_all)]
pub async fn load(database_filepath: PathBuf) -> Result<Self, DatabaseSetupError> {
let manager = ConnectionManager::new(database_filepath.to_str().unwrap());
Expand All @@ -314,8 +318,19 @@ impl Db {
"Connected to the database"
);

let me = Db { pool };
// Create channel for cleanup notifications
// Buffer size of 2 is sufficient since cleanup only happens every 10 blocks
let (notify_cleanup_task, rx) = tokio::sync::mpsc::channel(2);

let me = Db { pool, notify_cleanup_task };
let me2 = me.clone();

// Spawn background cleanup task
let _cleanup_task_handle =
tokio::spawn(async move { Self::periodic_cleanup_task(me2, rx).await });

me.query("migrations", apply_migrations).await?;

Ok(me)
}

Expand Down Expand Up @@ -477,6 +492,10 @@ impl Db {
///
/// `allow_acquire` and `acquire_done` are used to synchronize writes to the DB with writes to
/// the in-memory trees. Further details available on [`super::state::State::apply_block`].
///
/// After successfully applying the block, this function notifies the background cleanup task
/// about the new block number. The cleanup task will decide whether to trigger a cleanup
/// based on block number divisibility.
// TODO: This span is logged in a root span, we should connect it to the parent one.
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn apply_block(
Expand All @@ -486,28 +505,166 @@ impl Db {
block: ProvenBlock,
notes: Vec<(NoteRecord, Option<Nullifier>)>,
) -> Result<()> {
self.transact("apply block", move |conn| -> Result<()> {
// TODO: This span is logged in a root span, we should connect it to the parent one.
let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();
let block_num = block.header().block_num();

models::queries::apply_block(
conn,
block.header(),
&notes,
block.created_nullifiers(),
block.updated_accounts(),
block.transactions(),
)?;
let result = self
.transact("apply block", move |conn| -> Result<()> {
// TODO: This span is logged in a root span, we should connect it to the parent one.
let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();

// XXX FIXME TODO free floating mutex MUST NOT exist
// it doesn't bind it properly to the data locked!
let _ = allow_acquire.send(());
models::queries::apply_block(
conn,
block.header(),
&notes,
block.created_nullifiers(),
block.updated_accounts(),
block.transactions(),
)?;

acquire_done.blocking_recv()?;
// XXX FIXME TODO free floating mutex MUST NOT exist
// it doesn't bind it properly to the data locked!
let _ = allow_acquire.send(());

Ok(())
})
.await
acquire_done.blocking_recv()?;

Ok(())
})
.await;

// Notify the cleanup task of the latest applied block
// Ignore errors since cleanup is non-critical and shouldn't block block application
let _res = self.notify_cleanup_task.try_send(block_num);

result
}

/// Background task that handles periodic cleanup of old account data.
///
/// This task runs indefinitely, receiving block numbers from the `apply_block` method
/// and triggering cleanup whenever new blocks are available. The cleanup process:
///
/// 1. Batches incoming notifications using `recv_many` to avoid excessive cleanup operations
/// 2. Only processes the most recent block number from the batch (coalescing multiple updates)
/// 3. Runs cleanup with a 30-second timeout to prevent blocking
/// 4. Logs success or failure but continues running regardless of cleanup outcome
///
/// # Batching Strategy
///
/// The batching approach ensures that if multiple blocks are applied quickly (e.g., during
/// initial sync), only the latest block number triggers cleanup. This prevents redundant
/// cleanup operations while ensuring cleanup runs on the most recent state.
///
/// # Error Handling
///
/// This task never exits on cleanup errors. Cleanup failures are logged but the task
/// continues to process future blocks. This ensures that temporary issues (like database
/// locks or high load) don't permanently disable the cleanup mechanism.
///
/// The task only exits if the channel is closed (i.e., all `Db` instances are dropped),
/// which typically happens during application shutdown.
async fn periodic_cleanup_task(db: Self, mut notify: tokio::sync::mpsc::Receiver<BlockNumber>) {
let mut buf = Vec::with_capacity(128);

loop {
// Receive many notifications at once to batch them
// If the channel is closed (returns 0), exit the task
let received = notify.recv_many(&mut buf, 128).await;
if received == 0 {
tracing::info!(target: COMPONENT, "Cleanup task shutting down: channel closed");
break;
}

// Only process the most recent block number from the batch
// This coalesces multiple cleanup requests during fast block processing
if let Some(block_num) = buf.pop() {
match db.run_periodic_cleanup(block_num).await {
Ok((vault_deleted, storage_deleted)) => {
tracing::info!(
target: COMPONENT,
block_num = block_num.as_u32(),
vault_assets_deleted = vault_deleted,
storage_map_values_deleted = storage_deleted,
"Periodic cleanup completed successfully"
);
},
Err(e) => {
tracing::warn!(
target: COMPONENT,
block_num = block_num.as_u32(),
error = %e,
"Periodic cleanup failed, will retry on next block"
);
},
}
}

// Clear the buffer for the next batch
buf.clear();
}
}

/// Runs periodic cleanup of old account data with a timeout.
///
/// This function cleans up old vault asset and storage map value entries for all accounts,
/// keeping only the latest entry and up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT historical
/// entries per key.
///
/// The cleanup operation has a 30-second timeout to prevent it from blocking for too long.
/// If the timeout is reached, the cleanup is aborted and returns an error.
///
/// # Parameters
/// * `block_num` - The block number at which cleanup was triggered (used for logging)
///
/// # Returns
/// A tuple of (vault_assets_deleted, storage_map_values_deleted) on success, or an error
/// if the operation fails or times out.
#[instrument(level = "debug", target = COMPONENT, skip(self), fields(block_num = %block_num.as_u32()))]
async fn run_periodic_cleanup(&self, block_num: BlockNumber) -> Result<(usize, usize)> {
use std::time::Duration;

let cleanup_timeout = Duration::from_secs(30);
let start = std::time::Instant::now();

let cleanup_task = self
.transact("periodic cleanup", move |conn| models::queries::cleanup_all_accounts(conn));

// Run cleanup with timeout
let result = tokio::time::timeout(cleanup_timeout, cleanup_task).await;

let duration = start.elapsed();

match result {
Ok(Ok((vault_deleted, storage_deleted))) => {
tracing::info!(
target: COMPONENT,
block_num = block_num.as_u32(),
vault_assets_deleted = vault_deleted,
storage_map_values_deleted = storage_deleted,
duration_ms = duration.as_millis(),
"Cleanup completed within timeout"
);
Ok((vault_deleted, storage_deleted))
},
Ok(Err(e)) => {
tracing::error!(
target: COMPONENT,
block_num = block_num.as_u32(),
duration_ms = duration.as_millis(),
error = %e,
"Cleanup failed"
);
Err(e)
},
Err(_timeout_err) => {
tracing::warn!(
target: COMPONENT,
block_num = block_num.as_u32(),
timeout_ms = cleanup_timeout.as_millis(),
"Cleanup timed out - operation was aborted"
);
Err(DatabaseError::QueryTimeout("periodic cleanup".to_string()))
},
}
}

/// Selects storage map values for syncing storage maps for a specific account ID.
Expand Down
Loading
Loading