diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index cf78184592..0a57a40342 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -950,6 +950,10 @@ impl StorageNode { ?error, "failed to check and start garbage collection on startup" ); + // Fallback: mark up to the current epoch as done so that the first + // epoch change's wait_for_blob_info_cleanup doesn't block indefinitely. + self.garbage_collector + .mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch()); } select! { @@ -1832,6 +1836,13 @@ impl StorageNode { c.sync_node_params().await?; } + // Wait for the previous epoch's blob info cleanup (process_expired_blob_objects) + // to complete. This ensures the consistency check snapshot reflects fully + // updated blob info counts, so all nodes produce the same certified blob hash. + self.garbage_collector + .wait_for_blob_info_cleanup(event.epoch.saturating_sub(1)) + .await; + // Start storage node consistency check if // - consistency check is enabled // - node is not reprocessing events (blob info table should not be affected by future @@ -1926,6 +1937,10 @@ impl StorageNode { last_completed_epoch, "previous garbage-collection task already completed; skipping restart" ); + // No GC restart needed. Mark up to the current epoch as done so that the + // first epoch change's wait_for_blob_info_cleanup doesn't block. + self.garbage_collector + .mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch()); return Ok(()); } @@ -1943,6 +1958,10 @@ impl StorageNode { "the Walrus epoch has changed since the last garbage-collection task was started; \ skipping restart" ); + // GC for previous epochs is stale (epoch changed). Mark up to the last + // started epoch as done so the next epoch change can proceed. + self.garbage_collector + .mark_blob_info_cleanup_done_at_least(last_started_epoch); return Ok(()); } diff --git a/crates/walrus-service/src/node/garbage_collector.rs b/crates/walrus-service/src/node/garbage_collector.rs index db75ca32f6..3ce654903d 100644 --- a/crates/walrus-service/src/node/garbage_collector.rs +++ b/crates/walrus-service/src/node/garbage_collector.rs @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc}; use rand::{Rng, SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; use serde_with::{DurationSeconds, serde_as}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, watch}; use walrus_core::Epoch; use walrus_sui::types::GENESIS_EPOCH; @@ -21,6 +21,8 @@ use crate::node::{StorageNodeInner, metrics::NodeMetricSet}; #[serde(default)] pub struct GarbageCollectionConfig { /// Whether to enable the blob info cleanup at the beginning of each epoch. + /// + /// The configured value is ignored by the garbage collector, which always enables it. pub enable_blob_info_cleanup: bool, /// Whether to delete metadata and slivers of expired or deleted blobs. pub enable_data_deletion: bool, @@ -86,19 +88,31 @@ pub(super) struct GarbageCollector { metrics: Arc, /// Handle to the background task performing database cleanup. task_handle: Arc>>>, + /// Tracks the last epoch for which blob info cleanup completed. + blob_info_cleanup_done: watch::Sender, } impl GarbageCollector { pub fn new( - config: GarbageCollectionConfig, + mut config: GarbageCollectionConfig, node: Arc, metrics: Arc, ) -> Self { + if !config.enable_blob_info_cleanup { + tracing::warn!( + "ignoring `garbage_collection.enable_blob_info_cleanup: false`; \ + blob info cleanup is always enabled" + ); + config.enable_blob_info_cleanup = true; + } + + let (blob_info_cleanup_done, _) = watch::channel(GENESIS_EPOCH); Self { config, node, metrics, task_handle: Arc::new(Mutex::new(None)), + blob_info_cleanup_done, } } @@ -124,20 +138,7 @@ impl GarbageCollector { ) -> anyhow::Result<()> { if epoch == GENESIS_EPOCH { tracing::info!("garbage collection is not relevant in the genesis epoch"); - return Ok(()); - } - - let garbage_collection_config = self.config; - - if !garbage_collection_config.enable_blob_info_cleanup { - if garbage_collection_config.enable_data_deletion { - tracing::warn!( - "data deletion is enabled, but requires blob info cleanup to be enabled; \ - skipping data deletion", - ); - } else { - tracing::info!("garbage collection is disabled, skipping cleanup"); - } + let _ = self.blob_info_cleanup_done.send(epoch); return Ok(()); } @@ -146,7 +147,10 @@ impl GarbageCollector { // If there is an existing task, we need to abort it first before starting a new one. if let Some(old_task) = task_handle.take() { - tracing::info!("aborting existing garbage-collection task before starting a new one"); + tracing::info!( + "aborting existing garbage-collection task (data deletion phase) \ + before starting a new one" + ); old_task.abort(); let _ = old_task.await; } @@ -181,6 +185,9 @@ impl GarbageCollector { if let Err(error) = garbage_collector.perform_db_cleanup_task(epoch).await { tracing::error!(?error, epoch, "garbage-collection task failed"); } + // Safety net: ensure notification is sent even if perform_db_cleanup_task + // errored before reaching the notification point. + let _ = garbage_collector.blob_info_cleanup_done.send(epoch); }); *task_handle = Some(new_task); @@ -188,6 +195,30 @@ impl GarbageCollector { Ok(()) } + /// Waits for blob info cleanup (process_expired_blob_objects) to complete for the given epoch. + /// Returns immediately if the cleanup for that epoch (or a later one) already completed. + pub(super) async fn wait_for_blob_info_cleanup(&self, epoch: Epoch) { + let mut rx = self.blob_info_cleanup_done.subscribe(); + // wait_for returns immediately if the predicate is already satisfied. + let _ = rx.wait_for(|&completed| completed >= epoch).await; + } + + /// Marks blob info cleanup as done for at least the given epoch. + /// + /// Only updates the watch channel if the given epoch is greater than the current value. + /// Used on startup to mark epochs that don't need GC (either already completed in a previous + /// run or not relevant for a fresh node). + pub(super) fn mark_blob_info_cleanup_done_at_least(&self, epoch: Epoch) { + self.blob_info_cleanup_done.send_if_modified(|current| { + if epoch > *current { + *current = epoch; + true + } else { + false + } + }); + } + /// Aborts any running garbage-collection task. pub(crate) async fn abort(&self) { if let Some(task_handle) = self.task_handle.lock().await.take() { @@ -255,10 +286,16 @@ impl GarbageCollector { let _guard = self.node.storage.temporarily_disable_auto_compactions()?; tracing::info!("starting garbage collection"); - self.node + let blob_info_result = self + .node .storage .process_expired_blob_objects(epoch, &self.metrics, self.config.blob_objects_batch_size) - .await?; + .await; + + // Notify that blob info cleanup is done (even on error, to unblock waiters). + let _ = self.blob_info_cleanup_done.send(epoch); + + blob_info_result?; if self.config.enable_data_deletion && self