Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/walrus-service/node_config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ blob_event_processor_config:
garbage_collection:
enable_blob_info_cleanup: true
enable_data_deletion: true
enable_immediate_data_deletion: false
enable_random_delay: true
randomization_time_window_secs: null
blob_objects_batch_size: 5000
Expand Down
65 changes: 65 additions & 0 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4922,6 +4922,71 @@ mod tests {
Ok(())
}

async_param_test! {
immediate_data_deletion_on_blob_deleted_event -> TestResult: [
enabled: (true, false),
disabled: (false, true),
]
}
async fn immediate_data_deletion_on_blob_deleted_event(
enable_immediate_data_deletion: bool,
expect_data_stored: bool,
) -> TestResult {
walrus_test_utils::init_tracing();
let events = Sender::new(48);
let gc_config = GarbageCollectionConfig {
enable_immediate_data_deletion,
..GarbageCollectionConfig::default_for_test()
};
let node = StorageNodeHandle::builder()
.with_storage(
populated_storage(&[
(SHARD_INDEX, vec![(BLOB_ID, WhichSlivers::Both)]),
(OTHER_SHARD_INDEX, vec![(BLOB_ID, WhichSlivers::Both)]),
])
.await?,
)
.with_system_event_provider(events.clone())
.with_garbage_collection_config(gc_config)
.with_node_started(true)
.build()
.await?;
let inner = node.as_ref().inner.clone();

tokio::time::sleep(Duration::from_millis(50)).await;

assert!(
inner
.is_stored_at_all_shards_at_latest_epoch(&BLOB_ID)
.await?,
);
events.send(
BlobRegistered {
deletable: true,
..BlobRegistered::for_testing(BLOB_ID)
}
.into(),
)?;
events.send(
BlobCertified {
deletable: true,
..BlobCertified::for_testing(BLOB_ID)
}
.into(),
)?;
events.send(BlobDeleted::for_testing(BLOB_ID).into())?;

tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(
inner
.is_stored_at_all_shards_at_latest_epoch(&BLOB_ID)
.await?,
expect_data_stored,
);
Ok(())
}

#[tokio::test]
async fn returns_correct_blob_status() -> TestResult {
let blob_event = BlobRegistered::for_testing(BLOB_ID);
Expand Down
11 changes: 10 additions & 1 deletion crates/walrus-service/src/node/blob_event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,18 @@ impl BackgroundEventProcessor {
// cannot remove it even if it is no longer valid in the *current* epoch
if blob_info.can_data_be_deleted(event.epoch)
&& self.node.garbage_collection_config.enable_data_deletion
&& self
.node
.garbage_collection_config
.enable_immediate_data_deletion
{
tracing::debug!("deleting data for deleted blob");
self.node
.storage
.attempt_to_delete_blob_data(&blob_id, event.epoch, &self.node.metrics)
.await?;
} else {
tracing::debug!("not deleting data for deleted blob");
}
} else if self
.node
Expand All @@ -266,7 +272,10 @@ impl BackgroundEventProcessor {
incomplete history; not deleting blob data"
);
} else {
tracing::warn!("handling a `BlobDeleted` event for an untracked blob");
tracing::debug!(
"handling a `BlobDeleted` event for an untracked blob; this can happen when \
re-processing events after a node restart"
);
}

event_handle.mark_as_complete();
Expand Down
7 changes: 7 additions & 0 deletions crates/walrus-service/src/node/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub struct GarbageCollectionConfig {
pub enable_blob_info_cleanup: bool,
/// Whether to delete metadata and slivers of expired or deleted blobs.
pub enable_data_deletion: bool,
/// Whether to immediately delete blob data when a `BlobDeleted` event is processed.
///
/// When disabled, data is only deleted during periodic garbage collection. Only relevant if
/// `enable_data_deletion` is `true`.
pub enable_immediate_data_deletion: bool,
/// Whether to add a random delay before starting garbage collection.
/// The delay is deterministically computed based on the node's public key and epoch,
/// uniformly distributed between 0 and half the epoch duration.
Expand Down Expand Up @@ -52,6 +57,7 @@ impl Default for GarbageCollectionConfig {
Self {
enable_blob_info_cleanup: true,
enable_data_deletion: true,
enable_immediate_data_deletion: false,
enable_random_delay: true,
randomization_time_window: None,
blob_objects_batch_size: 5000,
Expand All @@ -67,6 +73,7 @@ impl GarbageCollectionConfig {
Self {
enable_blob_info_cleanup: true,
enable_data_deletion: true,
enable_immediate_data_deletion: true,
enable_random_delay: true,
randomization_time_window: Some(Duration::from_secs(1)),
blob_objects_batch_size: 10,
Expand Down
16 changes: 14 additions & 2 deletions crates/walrus-service/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ pub struct StorageNodeHandleBuilder {
num_checkpoints_per_blob: Option<u32>,
enable_node_config_synchronizer: bool,
node_recovery_config: Option<NodeRecoveryConfig>,
garbage_collection_config: Option<GarbageCollectionConfig>,
event_stream_catchup_min_checkpoint_lag: Option<u64>,
max_epochs_ahead: Option<u32>,
}
Expand Down Expand Up @@ -962,6 +963,12 @@ impl StorageNodeHandleBuilder {
self
}

/// Sets the garbage collection config for the node.
pub fn with_garbage_collection_config(mut self, config: GarbageCollectionConfig) -> Self {
self.garbage_collection_config = Some(config);
self
}

/// Specify the shard assignment for this node.
///
/// If specified, it will determine the the shard assignment for the node in the committee. If
Expand Down Expand Up @@ -1123,7 +1130,9 @@ impl StorageNodeHandleBuilder {
Default::default()
},
blob_recovery: BlobRecoveryConfig::default_for_test(),
garbage_collection: GarbageCollectionConfig::default_for_test(),
garbage_collection: self
.garbage_collection_config
.unwrap_or_else(GarbageCollectionConfig::default_for_test),
..storage_node_config().inner
};

Expand Down Expand Up @@ -1309,7 +1318,9 @@ impl StorageNodeHandleBuilder {
storage_node_cap: node_capability.map(|cap| cap.id),
node_recovery_config: self.node_recovery_config.clone().unwrap_or_default(),
blob_recovery: BlobRecoveryConfig::default_for_test(),
garbage_collection: GarbageCollectionConfig::default_for_test(),
garbage_collection: self
.garbage_collection_config
.unwrap_or_else(|| GarbageCollectionConfig::default_for_test()),
..storage_node_config().inner
};

Expand Down Expand Up @@ -1391,6 +1402,7 @@ impl Default for StorageNodeHandleBuilder {
num_checkpoints_per_blob: None,
enable_node_config_synchronizer: false,
node_recovery_config: None,
garbage_collection_config: None,
event_stream_catchup_min_checkpoint_lag: None,
max_epochs_ahead: None,
}
Expand Down
Loading