diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index c8695b3f22..c5375b1a06 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -959,6 +959,10 @@ impl StorageNode { ?error, "failed to check and start garbage collection on startup" ); + // Fallback: mark up to the current epoch as done so that the first + // epoch change's wait_for_blob_info_cleanup doesn't block indefinitely. + self.garbage_collector + .mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch()); } select! { @@ -1840,6 +1844,10 @@ impl StorageNode { c.sync_node_params().await?; } + // Note: blob info cleanup for the previous epoch is guaranteed to be complete + // because it runs inline in start_garbage_collection_task (called at the end of + // the previous epoch's process_epoch_change_start_event). No explicit wait needed. + // Start storage node consistency check if // - consistency check is enabled // - node is not reprocessing events (blob info table should not be affected by future @@ -1934,6 +1942,10 @@ impl StorageNode { last_completed_epoch, "previous garbage-collection task already completed; skipping restart" ); + // No GC restart needed. Mark up to the current epoch as done so that the + // first epoch change's wait_for_blob_info_cleanup doesn't block. + self.garbage_collector + .mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch()); return Ok(()); } @@ -1951,6 +1963,10 @@ impl StorageNode { "the Walrus epoch has changed since the last garbage-collection task was started; \ skipping restart" ); + // GC for previous epochs is stale (epoch changed). Mark up to the last + // started epoch as done so the next epoch change can proceed. + self.garbage_collector + .mark_blob_info_cleanup_done_at_least(last_started_epoch); return Ok(()); } diff --git a/crates/walrus-service/src/node/garbage_collector.rs b/crates/walrus-service/src/node/garbage_collector.rs index 2856e51f8e..841385abb7 100644 --- a/crates/walrus-service/src/node/garbage_collector.rs +++ b/crates/walrus-service/src/node/garbage_collector.rs @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc}; use rand::{Rng, SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; use serde_with::{DurationSeconds, serde_as}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, watch}; use walrus_core::Epoch; use walrus_sui::types::GENESIS_EPOCH; @@ -21,6 +21,8 @@ use crate::node::{StorageNodeInner, metrics::NodeMetricSet}; #[serde(default)] pub struct GarbageCollectionConfig { /// Whether to enable the blob info cleanup at the beginning of each epoch. + /// + /// The configured value is ignored by the garbage collector, which always enables it. pub enable_blob_info_cleanup: bool, /// Whether to delete metadata and slivers of expired or deleted blobs. pub enable_data_deletion: bool, @@ -93,19 +95,31 @@ pub(super) struct GarbageCollector { metrics: Arc, /// Handle to the background task performing database cleanup. task_handle: Arc>>>, + /// Tracks the last epoch for which blob info cleanup completed. + blob_info_cleanup_done: watch::Sender, } impl GarbageCollector { pub fn new( - config: GarbageCollectionConfig, + mut config: GarbageCollectionConfig, node: Arc, metrics: Arc, ) -> Self { + if !config.enable_blob_info_cleanup { + tracing::warn!( + "ignoring `garbage_collection.enable_blob_info_cleanup: false`; \ + blob info cleanup is always enabled" + ); + config.enable_blob_info_cleanup = true; + } + + let (blob_info_cleanup_done, _) = watch::channel(GENESIS_EPOCH); Self { config, node, metrics, task_handle: Arc::new(Mutex::new(None)), + blob_info_cleanup_done, } } @@ -131,48 +145,49 @@ impl GarbageCollector { ) -> anyhow::Result<()> { if epoch == GENESIS_EPOCH { tracing::info!("garbage collection is not relevant in the genesis epoch"); - return Ok(()); - } - - let garbage_collection_config = self.config; - - if !garbage_collection_config.enable_blob_info_cleanup { - if garbage_collection_config.enable_data_deletion { - tracing::warn!( - "data deletion is enabled, but requires blob info cleanup to be enabled; \ - skipping data deletion", - ); - } else { - tracing::info!("garbage collection is disabled, skipping cleanup"); - } + self.blob_info_cleanup_done.send_replace(epoch); return Ok(()); } let mut task_handle = self.task_handle.lock().await; let garbage_collector = self.clone(); - // If there is an existing task, we need to abort it first before starting a new one. + // If there is an existing task (data deletion), abort it before starting a new one. if let Some(old_task) = task_handle.take() { - tracing::info!("aborting existing garbage-collection task before starting a new one"); + tracing::info!( + "aborting existing garbage-collection task (data deletion phase) \ + before starting a new one" + ); old_task.abort(); let _ = old_task.await; } - // Store the current epoch in the DB before spawning the background task. + // Store the current epoch in the DB before running cleanup. self.node .storage .set_garbage_collector_last_started_epoch(epoch)?; self.metrics .set_garbage_collection_last_started_epoch(epoch); - // Calculate target time and update metric before spawning the background task. + // Phase 1: Run blob info cleanup immediately (no random delay). + // This updates aggregate blob info counts for expired blob objects and must + // complete before the next epoch change for consistency. Running it inline + // (rather than in a spawned task) guarantees completion before this function + // returns. + if let Err(error) = self.perform_blob_info_cleanup(epoch).await { + tracing::error!(?error, epoch, "blob info cleanup failed"); + } + self.blob_info_cleanup_done.send_replace(epoch); + + // Phase 2: Spawn data deletion as a background task with random delay to + // spread I/O load across nodes. This phase can be safely aborted by the + // next epoch's GC without affecting blob info consistency. let target_time = self.cleanup_target_time(epoch, epoch_start); self.metrics .garbage_collection_task_start_time .set(target_time.timestamp().try_into().unwrap_or_default()); let new_task = tokio::spawn(async move { - // Sleep until the target time. let sleep_duration = (target_time - Utc::now()) .to_std() .unwrap_or(Duration::ZERO); @@ -180,13 +195,13 @@ impl GarbageCollector { tracing::info!( target_time = target_time.to_rfc3339(), ?sleep_duration, - "sleeping before performing garbage collection", + "sleeping before performing data deletion", ); tokio::time::sleep(sleep_duration).await; } - if let Err(error) = garbage_collector.perform_db_cleanup_task(epoch).await { - tracing::error!(?error, epoch, "garbage-collection task failed"); + if let Err(error) = garbage_collector.perform_data_deletion(epoch).await { + tracing::error!(?error, epoch, "data deletion failed"); } }); @@ -195,6 +210,30 @@ impl GarbageCollector { Ok(()) } + /// Waits for blob info cleanup (process_expired_blob_objects) to complete for the given epoch. + /// Returns immediately if the cleanup for that epoch (or a later one) already completed. + pub(super) async fn wait_for_blob_info_cleanup(&self, epoch: Epoch) { + let mut rx = self.blob_info_cleanup_done.subscribe(); + // wait_for returns immediately if the predicate is already satisfied. + let _ = rx.wait_for(|&completed| completed >= epoch).await; + } + + /// Marks blob info cleanup as done for at least the given epoch. + /// + /// Only updates the watch channel if the given epoch is greater than the current value. + /// Used on startup to mark epochs that don't need GC (either already completed in a previous + /// run or not relevant for a fresh node). + pub(super) fn mark_blob_info_cleanup_done_at_least(&self, epoch: Epoch) { + self.blob_info_cleanup_done.send_if_modified(|current| { + if epoch > *current { + *current = epoch; + true + } else { + false + } + }); + } + /// Aborts any running garbage-collection task. pub(crate) async fn abort(&self) { if let Some(task_handle) = self.task_handle.lock().await.take() { @@ -240,33 +279,26 @@ impl GarbageCollector { ) } - /// Performs database cleanup operations including blob info cleanup and data deletion. - /// - /// Must only be run if blob info cleanup is enabled. + /// Processes expired blob objects to update aggregate blob info counts. /// - /// # Errors - /// - /// Returns an error if a DB operation fails or one of the cleanup tasks fails. - /// - /// # Panics - /// - /// Panics if blob info cleanup is not enabled. - async fn perform_db_cleanup_task(&self, epoch: Epoch) -> anyhow::Result<()> { - assert!( - self.config.enable_blob_info_cleanup, - "garbage-collection task must only be run if blob info cleanup is enabled" - ); - - // Disable DB compactions during cleanup to improve performance. DB compactions are - // automatically re-enabled when the guard is dropped. + /// This is the consistency-critical phase of GC: it ensures that aggregate blob info + /// reflects all expirations for the epoch. It runs inline during epoch change (no + /// random delay) so that it completes before the next epoch starts. + async fn perform_blob_info_cleanup(&self, epoch: Epoch) -> anyhow::Result<()> { let _guard = self.node.storage.temporarily_disable_auto_compactions()?; - tracing::info!("starting garbage collection"); + tracing::info!("starting blob info cleanup"); self.node .storage .process_expired_blob_objects(epoch, &self.metrics, self.config.blob_objects_batch_size) - .await?; + .await + } + /// Deletes slivers and metadata for expired blobs from disk. + /// + /// This is the I/O-heavy phase of GC. It runs in a background task after a random + /// delay to avoid thundering herd across nodes. + async fn perform_data_deletion(&self, epoch: Epoch) -> anyhow::Result<()> { if self.config.enable_data_deletion && self .node @@ -278,7 +310,6 @@ impl GarbageCollector { ) .await? { - // Update the last completed epoch after successful cleanup. self.node .storage .set_garbage_collector_last_completed_epoch(epoch)?;