Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,10 @@ impl StorageNode {
?error,
"failed to check and start garbage collection on startup"
);
// Fallback: mark up to the current epoch as done so that the first
// epoch change's wait_for_blob_info_cleanup doesn't block indefinitely.
self.garbage_collector
.mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch());
}

select! {
Expand Down Expand Up @@ -1832,6 +1836,13 @@ impl StorageNode {
c.sync_node_params().await?;
}

// Wait for the previous epoch's blob info cleanup (process_expired_blob_objects)
// to complete. This ensures the consistency check snapshot reflects fully
// updated blob info counts, so all nodes produce the same certified blob hash.
self.garbage_collector
.wait_for_blob_info_cleanup(event.epoch.saturating_sub(1))
.await;

// Start storage node consistency check if
// - consistency check is enabled
// - node is not reprocessing events (blob info table should not be affected by future
Expand Down Expand Up @@ -1926,6 +1937,10 @@ impl StorageNode {
last_completed_epoch,
"previous garbage-collection task already completed; skipping restart"
);
// No GC restart needed. Mark up to the current epoch as done so that the
// first epoch change's wait_for_blob_info_cleanup doesn't block.
self.garbage_collector
.mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch());
return Ok(());
}

Expand All @@ -1943,6 +1958,10 @@ impl StorageNode {
"the Walrus epoch has changed since the last garbage-collection task was started; \
skipping restart"
);
// GC for previous epochs is stale (epoch changed). Mark up to the last
// started epoch as done so the next epoch change can proceed.
self.garbage_collector
.mark_blob_info_cleanup_done_at_least(last_started_epoch);
return Ok(());
}

Expand Down
75 changes: 56 additions & 19 deletions crates/walrus-service/src/node/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc};
use rand::{Rng, SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, watch};
use walrus_core::Epoch;
use walrus_sui::types::GENESIS_EPOCH;

Expand All @@ -21,6 +21,8 @@ use crate::node::{StorageNodeInner, metrics::NodeMetricSet};
#[serde(default)]
pub struct GarbageCollectionConfig {
/// Whether to enable the blob info cleanup at the beginning of each epoch.
///
/// The configured value is ignored by the garbage collector, which always enables it.
pub enable_blob_info_cleanup: bool,
/// Whether to delete metadata and slivers of expired or deleted blobs.
pub enable_data_deletion: bool,
Expand Down Expand Up @@ -86,19 +88,31 @@ pub(super) struct GarbageCollector {
metrics: Arc<NodeMetricSet>,
/// Handle to the background task performing database cleanup.
task_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
/// Tracks the last epoch for which blob info cleanup completed.
blob_info_cleanup_done: watch::Sender<Epoch>,
}

impl GarbageCollector {
pub fn new(
config: GarbageCollectionConfig,
mut config: GarbageCollectionConfig,
node: Arc<StorageNodeInner>,
metrics: Arc<NodeMetricSet>,
) -> Self {
if !config.enable_blob_info_cleanup {
tracing::warn!(
"ignoring `garbage_collection.enable_blob_info_cleanup: false`; \
blob info cleanup is always enabled"
);
config.enable_blob_info_cleanup = true;
}

let (blob_info_cleanup_done, _) = watch::channel(GENESIS_EPOCH);
Self {
config,
node,
metrics,
task_handle: Arc::new(Mutex::new(None)),
blob_info_cleanup_done,
}
}

Expand All @@ -124,20 +138,7 @@ impl GarbageCollector {
) -> anyhow::Result<()> {
if epoch == GENESIS_EPOCH {
tracing::info!("garbage collection is not relevant in the genesis epoch");
return Ok(());
}

let garbage_collection_config = self.config;

if !garbage_collection_config.enable_blob_info_cleanup {
if garbage_collection_config.enable_data_deletion {
tracing::warn!(
"data deletion is enabled, but requires blob info cleanup to be enabled; \
skipping data deletion",
);
} else {
tracing::info!("garbage collection is disabled, skipping cleanup");
}
let _ = self.blob_info_cleanup_done.send(epoch);
return Ok(());
}

Expand All @@ -146,7 +147,10 @@ impl GarbageCollector {

// If there is an existing task, we need to abort it first before starting a new one.
if let Some(old_task) = task_handle.take() {
tracing::info!("aborting existing garbage-collection task before starting a new one");
tracing::info!(
"aborting existing garbage-collection task (data deletion phase) \
before starting a new one"
);
old_task.abort();
let _ = old_task.await;
}
Expand Down Expand Up @@ -181,13 +185,40 @@ impl GarbageCollector {
if let Err(error) = garbage_collector.perform_db_cleanup_task(epoch).await {
tracing::error!(?error, epoch, "garbage-collection task failed");
}
// Safety net: ensure notification is sent even if perform_db_cleanup_task
// errored before reaching the notification point.
let _ = garbage_collector.blob_info_cleanup_done.send(epoch);
});

*task_handle = Some(new_task);

Ok(())
}

/// Waits for blob info cleanup (process_expired_blob_objects) to complete for the given epoch.
/// Returns immediately if the cleanup for that epoch (or a later one) already completed.
pub(super) async fn wait_for_blob_info_cleanup(&self, epoch: Epoch) {
let mut rx = self.blob_info_cleanup_done.subscribe();
// wait_for returns immediately if the predicate is already satisfied.
let _ = rx.wait_for(|&completed| completed >= epoch).await;
}

/// Marks blob info cleanup as done for at least the given epoch.
///
/// Only updates the watch channel if the given epoch is greater than the current value.
/// Used on startup to mark epochs that don't need GC (either already completed in a previous
/// run or not relevant for a fresh node).
pub(super) fn mark_blob_info_cleanup_done_at_least(&self, epoch: Epoch) {
self.blob_info_cleanup_done.send_if_modified(|current| {
if epoch > *current {
*current = epoch;
true
} else {
false
}
});
}

/// Aborts any running garbage-collection task.
pub(crate) async fn abort(&self) {
if let Some(task_handle) = self.task_handle.lock().await.take() {
Expand Down Expand Up @@ -255,10 +286,16 @@ impl GarbageCollector {
let _guard = self.node.storage.temporarily_disable_auto_compactions()?;
tracing::info!("starting garbage collection");

self.node
let blob_info_result = self
.node
.storage
.process_expired_blob_objects(epoch, &self.metrics, self.config.blob_objects_batch_size)
.await?;
.await;

// Notify that blob info cleanup is done (even on error, to unblock waiters).
let _ = self.blob_info_cleanup_done.send(epoch);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Persist cleanup epoch even when no waiters are subscribed

blob_info_cleanup_done is updated with watch::Sender::send, but this channel has no long-lived receiver (the initial receiver is dropped), so send can fail when no task is currently waiting and the epoch value is not retained; in that case a later wait_for_blob_info_cleanup subscriber will still see an older epoch and can block indefinitely (for example, waiting for epoch N-1 after its GC already finished). Use an update method that always mutates the stored value (e.g. send_replace/send_if_modified) or keep a receiver alive.

Useful? React with 👍 / 👎.


blob_info_result?;

if self.config.enable_data_deletion
&& self
Expand Down
Loading