diff --git a/crates/walrus-service/node_config_example.yaml b/crates/walrus-service/node_config_example.yaml index faf3ff065b..2a9513e203 100644 --- a/crates/walrus-service/node_config_example.yaml +++ b/crates/walrus-service/node_config_example.yaml @@ -240,6 +240,7 @@ blob_event_processor_config: garbage_collection: enable_blob_info_cleanup: true enable_data_deletion: true + enable_immediate_data_deletion: false enable_random_delay: true randomization_time_window_secs: null blob_objects_batch_size: 5000 diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index 76b99f0697..6fd48ff14c 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -4922,6 +4922,71 @@ mod tests { Ok(()) } + async_param_test! { + immediate_data_deletion_on_blob_deleted_event -> TestResult: [ + enabled: (true, false), + disabled: (false, true), + ] + } + async fn immediate_data_deletion_on_blob_deleted_event( + enable_immediate_data_deletion: bool, + expect_data_stored: bool, + ) -> TestResult { + walrus_test_utils::init_tracing(); + let events = Sender::new(48); + let gc_config = GarbageCollectionConfig { + enable_immediate_data_deletion, + ..GarbageCollectionConfig::default_for_test() + }; + let node = StorageNodeHandle::builder() + .with_storage( + populated_storage(&[ + (SHARD_INDEX, vec![(BLOB_ID, WhichSlivers::Both)]), + (OTHER_SHARD_INDEX, vec![(BLOB_ID, WhichSlivers::Both)]), + ]) + .await?, + ) + .with_system_event_provider(events.clone()) + .with_garbage_collection_config(gc_config) + .with_node_started(true) + .build() + .await?; + let inner = node.as_ref().inner.clone(); + + tokio::time::sleep(Duration::from_millis(50)).await; + + assert!( + inner + .is_stored_at_all_shards_at_latest_epoch(&BLOB_ID) + .await?, + ); + events.send( + BlobRegistered { + deletable: true, + ..BlobRegistered::for_testing(BLOB_ID) + } + .into(), + )?; + events.send( + BlobCertified { + deletable: true, + ..BlobCertified::for_testing(BLOB_ID) + } + .into(), + )?; + events.send(BlobDeleted::for_testing(BLOB_ID).into())?; + + tokio::time::sleep(Duration::from_millis(100)).await; + + assert_eq!( + inner + .is_stored_at_all_shards_at_latest_epoch(&BLOB_ID) + .await?, + expect_data_stored, + ); + Ok(()) + } + #[tokio::test] async fn returns_correct_blob_status() -> TestResult { let blob_event = BlobRegistered::for_testing(BLOB_ID); diff --git a/crates/walrus-service/src/node/blob_event_processor.rs b/crates/walrus-service/src/node/blob_event_processor.rs index cb4f3f455a..42a8f1bc2d 100644 --- a/crates/walrus-service/src/node/blob_event_processor.rs +++ b/crates/walrus-service/src/node/blob_event_processor.rs @@ -248,12 +248,18 @@ impl BackgroundEventProcessor { // cannot remove it even if it is no longer valid in the *current* epoch if blob_info.can_data_be_deleted(event.epoch) && self.node.garbage_collection_config.enable_data_deletion + && self + .node + .garbage_collection_config + .enable_immediate_data_deletion { tracing::debug!("deleting data for deleted blob"); self.node .storage .attempt_to_delete_blob_data(&blob_id, event.epoch, &self.node.metrics) .await?; + } else { + tracing::debug!("not deleting data for deleted blob"); } } else if self .node @@ -266,7 +272,10 @@ impl BackgroundEventProcessor { incomplete history; not deleting blob data" ); } else { - tracing::warn!("handling a `BlobDeleted` event for an untracked blob"); + tracing::debug!( + "handling a `BlobDeleted` event for an untracked blob; this can happen when \ + re-processing events after a node restart" + ); } event_handle.mark_as_complete(); diff --git a/crates/walrus-service/src/node/garbage_collector.rs b/crates/walrus-service/src/node/garbage_collector.rs index db75ca32f6..2856e51f8e 100644 --- a/crates/walrus-service/src/node/garbage_collector.rs +++ b/crates/walrus-service/src/node/garbage_collector.rs @@ -24,6 +24,11 @@ pub struct GarbageCollectionConfig { pub enable_blob_info_cleanup: bool, /// Whether to delete metadata and slivers of expired or deleted blobs. pub enable_data_deletion: bool, + /// Whether to immediately delete blob data when a `BlobDeleted` event is processed. + /// + /// When disabled, data is only deleted during periodic garbage collection. Only relevant if + /// `enable_data_deletion` is `true`. + pub enable_immediate_data_deletion: bool, /// Whether to add a random delay before starting garbage collection. /// The delay is deterministically computed based on the node's public key and epoch, /// uniformly distributed between 0 and half the epoch duration. @@ -52,6 +57,7 @@ impl Default for GarbageCollectionConfig { Self { enable_blob_info_cleanup: true, enable_data_deletion: true, + enable_immediate_data_deletion: false, enable_random_delay: true, randomization_time_window: None, blob_objects_batch_size: 5000, @@ -67,6 +73,7 @@ impl GarbageCollectionConfig { Self { enable_blob_info_cleanup: true, enable_data_deletion: true, + enable_immediate_data_deletion: true, enable_random_delay: true, randomization_time_window: Some(Duration::from_secs(1)), blob_objects_batch_size: 10, diff --git a/crates/walrus-service/src/test_utils.rs b/crates/walrus-service/src/test_utils.rs index 363189e866..d5a5d8ab82 100644 --- a/crates/walrus-service/src/test_utils.rs +++ b/crates/walrus-service/src/test_utils.rs @@ -868,6 +868,7 @@ pub struct StorageNodeHandleBuilder { num_checkpoints_per_blob: Option, enable_node_config_synchronizer: bool, node_recovery_config: Option, + garbage_collection_config: Option, event_stream_catchup_min_checkpoint_lag: Option, max_epochs_ahead: Option, } @@ -962,6 +963,12 @@ impl StorageNodeHandleBuilder { self } + /// Sets the garbage collection config for the node. + pub fn with_garbage_collection_config(mut self, config: GarbageCollectionConfig) -> Self { + self.garbage_collection_config = Some(config); + self + } + /// Specify the shard assignment for this node. /// /// If specified, it will determine the the shard assignment for the node in the committee. If @@ -1123,7 +1130,9 @@ impl StorageNodeHandleBuilder { Default::default() }, blob_recovery: BlobRecoveryConfig::default_for_test(), - garbage_collection: GarbageCollectionConfig::default_for_test(), + garbage_collection: self + .garbage_collection_config + .unwrap_or_else(GarbageCollectionConfig::default_for_test), ..storage_node_config().inner }; @@ -1309,7 +1318,9 @@ impl StorageNodeHandleBuilder { storage_node_cap: node_capability.map(|cap| cap.id), node_recovery_config: self.node_recovery_config.clone().unwrap_or_default(), blob_recovery: BlobRecoveryConfig::default_for_test(), - garbage_collection: GarbageCollectionConfig::default_for_test(), + garbage_collection: self + .garbage_collection_config + .unwrap_or_else(|| GarbageCollectionConfig::default_for_test()), ..storage_node_config().inner }; @@ -1391,6 +1402,7 @@ impl Default for StorageNodeHandleBuilder { num_checkpoints_per_blob: None, enable_node_config_synchronizer: false, node_recovery_config: None, + garbage_collection_config: None, event_stream_catchup_min_checkpoint_lag: None, max_epochs_ahead: None, }