Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1871,19 +1871,41 @@ impl StorageNode {
self.execute_epoch_change(event_handle, event, shard_map_lock)
.await?;

// Abort any running data deletion task from the previous epoch before starting blob info
// cleanup, so that the two phases don't run concurrently on aggregate_blob_info.
self.garbage_collector.abort().await;

// Phase 1 of garbage collection: clean up per-object blob info before allowing new-epoch
// work to begin. This must run before updating the latest event epoch so that blob syncs
// for the new epoch don't interfere with the cleanup, producing a deterministic and
// consistent snapshot of per-object blob info across nodes at the epoch boundary.
self.perform_blob_info_cleanup(event.epoch).await?;

// Update the latest event epoch to the new epoch. Now, blob syncs will use this epoch to
// check for shard ownership.
self.inner
.latest_event_epoch_sender
.send(Some(event.epoch))?;

self.start_garbage_collection_task(event.epoch).await?;
self.start_data_deletion_task(event.epoch).await?;

Ok(())
}

/// Performs blob info cleanup (GC phase 1) for the given epoch.
async fn perform_blob_info_cleanup(&self, epoch: Epoch) -> anyhow::Result<()> {
if let Err(error) = self
.garbage_collector
.perform_blob_info_cleanup(epoch)
.await
{
tracing::error!(?error, epoch, "blob info cleanup failed");
}
Ok(())
}

/// Starts a background task to perform database cleanup operations if the node is active.
async fn start_garbage_collection_task(&self, epoch: Epoch) -> anyhow::Result<()> {
/// Starts a background task to delete expired blob data (GC phase 2).
async fn start_data_deletion_task(&self, epoch: Epoch) -> anyhow::Result<()> {
// Try to get the epoch start time from the contract service. If the epoch state is not
// available (e.g., in tests), use the current time as the epoch start.
let epoch_start = self
Expand All @@ -1902,10 +1924,10 @@ impl StorageNode {
.unwrap_or_else(Utc::now);
if let Err(error) = self
.garbage_collector
.start_garbage_collection_task(epoch, epoch_start)
.start_data_deletion_task(epoch, epoch_start)
.await
{
tracing::error!(?error, epoch, "failed to start garbage-collection task");
tracing::error!(?error, epoch, "failed to start data-deletion task");
}
Ok(())
}
Expand Down Expand Up @@ -1960,7 +1982,8 @@ impl StorageNode {
current_epoch,
"restarting unfinished garbage-collection task on startup"
);
self.start_garbage_collection_task(current_epoch).await
self.perform_blob_info_cleanup(current_epoch).await?;
self.start_data_deletion_task(current_epoch).await
}

/// Storage node execution of the epoch change start event, to bring the node state to the next
Expand Down
153 changes: 89 additions & 64 deletions crates/walrus-service/src/node/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct GarbageCollectionConfig {
/// When disabled, data is only deleted during periodic garbage collection. Only relevant if
/// `enable_data_deletion` is `true`.
pub enable_immediate_data_deletion: bool,
/// Whether to add a random delay before starting garbage collection.
/// Whether to add a random delay before the data deletion phase of garbage collection.
/// The delay is deterministically computed based on the node's public key and epoch,
/// uniformly distributed between 0 and half the epoch duration.
pub enable_random_delay: bool,
Expand Down Expand Up @@ -109,84 +109,126 @@ impl GarbageCollector {
}
}

/// Schedules database cleanup operations to run in a background task.
/// Phase 1: Cleans up per-object blob info for expired blobs.
///
/// If a cleanup task is already running, it will be aborted and replaced with a new one.
/// This phase processes the `per_object_blob_info` table (and in the future,
/// `storage_pool_info`) to remove entries for blobs that are no longer registered.
/// These tables are critical for node
/// recovery, so this phase is designed to run promptly at the epoch boundary (without random
/// delay) to produce a consistent snapshot across nodes.
///
/// Must only be called *after* the epoch change for the provided epoch is complete.
///
/// The actual cleanup work starts after a deterministic delay computed by
/// [`Self::cleanup_target_time`] based on the node's public key and epoch This is to avoid
/// multiple nodes performing cleanup operations at the same time, which could impact the
/// performance of the network.
/// # Errors
///
/// Returns an error if a DB operation fails.
#[tracing::instrument(skip_all)]
pub(crate) async fn perform_blob_info_cleanup(&self, epoch: Epoch) -> anyhow::Result<()> {
if epoch == GENESIS_EPOCH {
tracing::info!("garbage collection is not relevant in the genesis epoch");
return Ok(());
}

if !self.config.enable_blob_info_cleanup {
tracing::info!("blob info cleanup is disabled, skipping");
return Ok(());
}

// Record that garbage collection has started for this epoch.
self.node
.storage
.set_garbage_collector_last_started_epoch(epoch)?;
self.metrics
.set_garbage_collection_last_started_epoch(epoch);

// Disable DB compactions during cleanup to improve performance. DB compactions are
// automatically re-enabled when the guard is dropped.
let _guard = self.node.storage.temporarily_disable_auto_compactions()?;
tracing::info!("starting garbage collection phase 1: blob info cleanup");

self.node
.storage
.process_expired_blob_objects(epoch, &self.metrics, self.config.blob_objects_batch_size)
.await?;

Ok(())
}

/// Schedules the data deletion phase to run in a background task.
///
/// If a data deletion task is already running, it will be aborted and replaced with a new one.
///
/// Must only be called *after* [`Self::perform_blob_info_cleanup`] for the same epoch.
///
/// The actual deletion starts after a deterministic delay computed by
/// [`Self::cleanup_target_time`] based on the node's public key and epoch. This is to avoid
/// multiple nodes performing heavy I/O at the same time.
///
/// # Errors
///
/// Returns an error if the garbage-collection task cannot be started.
/// Returns an error if the data deletion task cannot be started.
#[tracing::instrument(skip_all)]
pub async fn start_garbage_collection_task(
pub(crate) async fn start_data_deletion_task(
&self,
epoch: Epoch,
epoch_start: DateTime<Utc>,
) -> anyhow::Result<()> {
if epoch == GENESIS_EPOCH {
tracing::info!("garbage collection is not relevant in the genesis epoch");
return Ok(());
}

let garbage_collection_config = self.config;

if !garbage_collection_config.enable_blob_info_cleanup {
if garbage_collection_config.enable_data_deletion {
if !self.config.enable_blob_info_cleanup {
if self.config.enable_data_deletion {
tracing::warn!(
"data deletion is enabled, but requires blob info cleanup to be enabled; \
skipping data deletion",
);
} else {
tracing::info!("garbage collection is disabled, skipping cleanup");
}
return Ok(());
}

if !self.config.enable_data_deletion {
return Ok(());
}

let mut task_handle = self.task_handle.lock().await;
let garbage_collector = self.clone();

// If there is an existing task, we need to abort it first before starting a new one.
if let Some(old_task) = task_handle.take() {
tracing::info!("aborting existing garbage-collection task before starting a new one");
tracing::info!("aborting existing data-deletion task before starting a new one");
old_task.abort();
let _ = old_task.await;
}

// Store the current epoch in the DB before spawning the background task.
self.node
.storage
.set_garbage_collector_last_started_epoch(epoch)?;
self.metrics
.set_garbage_collection_last_started_epoch(epoch);

// Calculate target time and update metric before spawning the background task.
let target_time = self.cleanup_target_time(epoch, epoch_start);
self.metrics
.garbage_collection_task_start_time
.set(target_time.timestamp().try_into().unwrap_or_default());
let data_deletion_target_time = self.cleanup_target_time(epoch, epoch_start);
self.metrics.garbage_collection_task_start_time.set(
data_deletion_target_time
.timestamp()
.try_into()
.unwrap_or_default(),
);

let new_task = tokio::spawn(async move {
// Sleep until the target time.
let sleep_duration = (target_time - Utc::now())
let sleep_duration = (data_deletion_target_time - Utc::now())
.to_std()
.unwrap_or(Duration::ZERO);
if !sleep_duration.is_zero() {
tracing::info!(
target_time = target_time.to_rfc3339(),
target_time = data_deletion_target_time.to_rfc3339(),
?sleep_duration,
"sleeping before performing garbage collection",
"sleeping before performing data deletion",
);
tokio::time::sleep(sleep_duration).await;
}

if let Err(error) = garbage_collector.perform_db_cleanup_task(epoch).await {
tracing::error!(?error, epoch, "garbage-collection task failed");
if let Err(error) = garbage_collector.perform_data_deletion(epoch).await {
tracing::error!(
?error,
epoch,
"garbage collection phase 2 (data deletion) failed"
);
}
});

Expand Down Expand Up @@ -240,45 +282,28 @@ impl GarbageCollector {
)
}

/// Performs database cleanup operations including blob info cleanup and data deletion.
/// Phase 2: Deletes metadata and slivers for expired/deleted blobs.
///
/// Must only be run if blob info cleanup is enabled.
/// This phase performs the heavy I/O work of deleting actual blob data (metadata and slivers)
/// from storage. It is intended to run after a random delay to avoid multiple nodes performing
/// heavy I/O simultaneously.
///
/// # Errors
///
/// Returns an error if a DB operation fails or one of the cleanup tasks fails.
///
/// # Panics
///
/// Panics if blob info cleanup is not enabled.
async fn perform_db_cleanup_task(&self, epoch: Epoch) -> anyhow::Result<()> {
assert!(
self.config.enable_blob_info_cleanup,
"garbage-collection task must only be run if blob info cleanup is enabled"
);

// Disable DB compactions during cleanup to improve performance. DB compactions are
/// Returns an error if a DB operation fails.
async fn perform_data_deletion(&self, epoch: Epoch) -> anyhow::Result<()> {
// Disable DB compactions during data deletion to improve performance. DB compactions are
// automatically re-enabled when the guard is dropped.
let _guard = self.node.storage.temporarily_disable_auto_compactions()?;
tracing::info!("starting garbage collection");
tracing::info!("starting garbage collection phase 2: data deletion");

self.node
if self
.node
.storage
.process_expired_blob_objects(epoch, &self.metrics, self.config.blob_objects_batch_size)
.await?;

if self.config.enable_data_deletion
&& self
.node
.storage
.delete_expired_blob_data(
epoch,
&self.metrics,
self.config.data_deletion_batch_size,
)
.await?
.delete_expired_blob_data(epoch, &self.metrics, self.config.data_deletion_batch_size)
.await?
{
// Update the last completed epoch after successful cleanup.
// Update the last completed epoch after successful data deletion.
self.node
.storage
.set_garbage_collector_last_completed_epoch(epoch)?;
Expand Down
Loading