Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,10 @@ impl StorageNode {
?error,
"failed to check and start garbage collection on startup"
);
// Fallback: mark up to the current epoch as done so that the first
// epoch change's wait_for_blob_info_cleanup doesn't block indefinitely.
self.garbage_collector
.mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch());
}

select! {
Expand Down Expand Up @@ -1840,6 +1844,10 @@ impl StorageNode {
c.sync_node_params().await?;
}

// Note: blob info cleanup for the previous epoch is guaranteed to be complete
// because it runs inline in start_garbage_collection_task (called at the end of
// the previous epoch's process_epoch_change_start_event). No explicit wait needed.

// Start storage node consistency check if
// - consistency check is enabled
// - node is not reprocessing events (blob info table should not be affected by future
Expand Down Expand Up @@ -1934,6 +1942,10 @@ impl StorageNode {
last_completed_epoch,
"previous garbage-collection task already completed; skipping restart"
);
// No GC restart needed. Mark up to the current epoch as done so that the
// first epoch change's wait_for_blob_info_cleanup doesn't block.
self.garbage_collector
.mark_blob_info_cleanup_done_at_least(self.inner.committee_service.get_epoch());
return Ok(());
}

Expand All @@ -1951,6 +1963,10 @@ impl StorageNode {
"the Walrus epoch has changed since the last garbage-collection task was started; \
skipping restart"
);
// GC for previous epochs is stale (epoch changed). Mark up to the last
// started epoch as done so the next epoch change can proceed.
self.garbage_collector
.mark_blob_info_cleanup_done_at_least(last_started_epoch);
return Ok(());
}

Expand Down
121 changes: 76 additions & 45 deletions crates/walrus-service/src/node/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc};
use rand::{Rng, SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, watch};
use walrus_core::Epoch;
use walrus_sui::types::GENESIS_EPOCH;

Expand All @@ -21,6 +21,8 @@ use crate::node::{StorageNodeInner, metrics::NodeMetricSet};
#[serde(default)]
pub struct GarbageCollectionConfig {
/// Whether to enable the blob info cleanup at the beginning of each epoch.
///
/// The configured value is ignored by the garbage collector, which always enables it.
pub enable_blob_info_cleanup: bool,
/// Whether to delete metadata and slivers of expired or deleted blobs.
pub enable_data_deletion: bool,
Expand Down Expand Up @@ -93,19 +95,31 @@ pub(super) struct GarbageCollector {
metrics: Arc<NodeMetricSet>,
/// Handle to the background task performing database cleanup.
task_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
/// Tracks the last epoch for which blob info cleanup completed.
blob_info_cleanup_done: watch::Sender<Epoch>,
}

impl GarbageCollector {
pub fn new(
config: GarbageCollectionConfig,
mut config: GarbageCollectionConfig,
node: Arc<StorageNodeInner>,
metrics: Arc<NodeMetricSet>,
) -> Self {
if !config.enable_blob_info_cleanup {
tracing::warn!(
"ignoring `garbage_collection.enable_blob_info_cleanup: false`; \
blob info cleanup is always enabled"
);
config.enable_blob_info_cleanup = true;
}

let (blob_info_cleanup_done, _) = watch::channel(GENESIS_EPOCH);
Self {
config,
node,
metrics,
task_handle: Arc::new(Mutex::new(None)),
blob_info_cleanup_done,
}
}

Expand All @@ -131,62 +145,63 @@ impl GarbageCollector {
) -> anyhow::Result<()> {
if epoch == GENESIS_EPOCH {
tracing::info!("garbage collection is not relevant in the genesis epoch");
return Ok(());
}

let garbage_collection_config = self.config;

if !garbage_collection_config.enable_blob_info_cleanup {
if garbage_collection_config.enable_data_deletion {
tracing::warn!(
"data deletion is enabled, but requires blob info cleanup to be enabled; \
skipping data deletion",
);
} else {
tracing::info!("garbage collection is disabled, skipping cleanup");
}
self.blob_info_cleanup_done.send_replace(epoch);
return Ok(());
}

let mut task_handle = self.task_handle.lock().await;
let garbage_collector = self.clone();

// If there is an existing task, we need to abort it first before starting a new one.
// If there is an existing task (data deletion), abort it before starting a new one.
if let Some(old_task) = task_handle.take() {
tracing::info!("aborting existing garbage-collection task before starting a new one");
tracing::info!(
"aborting existing garbage-collection task (data deletion phase) \
before starting a new one"
);
old_task.abort();
let _ = old_task.await;
}

// Store the current epoch in the DB before spawning the background task.
// Store the current epoch in the DB before running cleanup.
self.node
.storage
.set_garbage_collector_last_started_epoch(epoch)?;
self.metrics
.set_garbage_collection_last_started_epoch(epoch);

// Calculate target time and update metric before spawning the background task.
// Phase 1: Run blob info cleanup immediately (no random delay).
// This updates aggregate blob info counts for expired blob objects and must
// complete before the next epoch change for consistency. Running it inline
// (rather than in a spawned task) guarantees completion before this function
// returns.
if let Err(error) = self.perform_blob_info_cleanup(epoch).await {
tracing::error!(?error, epoch, "blob info cleanup failed");
}
self.blob_info_cleanup_done.send_replace(epoch);

// Phase 2: Spawn data deletion as a background task with random delay to
// spread I/O load across nodes. This phase can be safely aborted by the
// next epoch's GC without affecting blob info consistency.
let target_time = self.cleanup_target_time(epoch, epoch_start);
self.metrics
.garbage_collection_task_start_time
.set(target_time.timestamp().try_into().unwrap_or_default());

let new_task = tokio::spawn(async move {
// Sleep until the target time.
let sleep_duration = (target_time - Utc::now())
.to_std()
.unwrap_or(Duration::ZERO);
if !sleep_duration.is_zero() {
tracing::info!(
target_time = target_time.to_rfc3339(),
?sleep_duration,
"sleeping before performing garbage collection",
"sleeping before performing data deletion",
);
tokio::time::sleep(sleep_duration).await;
}

if let Err(error) = garbage_collector.perform_db_cleanup_task(epoch).await {
tracing::error!(?error, epoch, "garbage-collection task failed");
if let Err(error) = garbage_collector.perform_data_deletion(epoch).await {
tracing::error!(?error, epoch, "data deletion failed");
}
});

Expand All @@ -195,6 +210,30 @@ impl GarbageCollector {
Ok(())
}

/// Waits for blob info cleanup (process_expired_blob_objects) to complete for the given epoch.
/// Returns immediately if the cleanup for that epoch (or a later one) already completed.
pub(super) async fn wait_for_blob_info_cleanup(&self, epoch: Epoch) {
let mut rx = self.blob_info_cleanup_done.subscribe();
// wait_for returns immediately if the predicate is already satisfied.
let _ = rx.wait_for(|&completed| completed >= epoch).await;
}

/// Marks blob info cleanup as done for at least the given epoch.
///
/// Only updates the watch channel if the given epoch is greater than the current value.
/// Used on startup to mark epochs that don't need GC (either already completed in a previous
/// run or not relevant for a fresh node).
pub(super) fn mark_blob_info_cleanup_done_at_least(&self, epoch: Epoch) {
self.blob_info_cleanup_done.send_if_modified(|current| {
if epoch > *current {
*current = epoch;
true
} else {
false
}
});
}

/// Aborts any running garbage-collection task.
pub(crate) async fn abort(&self) {
if let Some(task_handle) = self.task_handle.lock().await.take() {
Expand Down Expand Up @@ -240,33 +279,26 @@ impl GarbageCollector {
)
}

/// Performs database cleanup operations including blob info cleanup and data deletion.
///
/// Must only be run if blob info cleanup is enabled.
/// Processes expired blob objects to update aggregate blob info counts.
///
/// # Errors
///
/// Returns an error if a DB operation fails or one of the cleanup tasks fails.
///
/// # Panics
///
/// Panics if blob info cleanup is not enabled.
async fn perform_db_cleanup_task(&self, epoch: Epoch) -> anyhow::Result<()> {
assert!(
self.config.enable_blob_info_cleanup,
"garbage-collection task must only be run if blob info cleanup is enabled"
);

// Disable DB compactions during cleanup to improve performance. DB compactions are
// automatically re-enabled when the guard is dropped.
/// This is the consistency-critical phase of GC: it ensures that aggregate blob info
/// reflects all expirations for the epoch. It runs inline during epoch change (no
/// random delay) so that it completes before the next epoch starts.
async fn perform_blob_info_cleanup(&self, epoch: Epoch) -> anyhow::Result<()> {
let _guard = self.node.storage.temporarily_disable_auto_compactions()?;
tracing::info!("starting garbage collection");
tracing::info!("starting blob info cleanup");

self.node
.storage
.process_expired_blob_objects(epoch, &self.metrics, self.config.blob_objects_batch_size)
.await?;
.await
}

/// Deletes slivers and metadata for expired blobs from disk.
///
/// This is the I/O-heavy phase of GC. It runs in a background task after a random
/// delay to avoid thundering herd across nodes.
async fn perform_data_deletion(&self, epoch: Epoch) -> anyhow::Result<()> {
if self.config.enable_data_deletion
&& self
.node
Expand All @@ -278,7 +310,6 @@ impl GarbageCollector {
)
.await?
{
// Update the last completed epoch after successful cleanup.
self.node
.storage
.set_garbage_collector_last_completed_epoch(epoch)?;
Expand Down
Loading