Skip to content

Commit d7f2d7a

Browse files
committed
feat(node): disable immediate data deletion after BlobDeleted events by default
1 parent b84970f commit d7f2d7a

5 files changed

Lines changed: 97 additions & 3 deletions

File tree

crates/walrus-service/node_config_example.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ blob_event_processor_config:
241241
garbage_collection:
242242
enable_blob_info_cleanup: true
243243
enable_data_deletion: true
244+
enable_immediate_data_deletion: false
244245
enable_random_delay: true
245246
randomization_time_window_secs: null
246247
blob_objects_batch_size: 5000

crates/walrus-service/src/node.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4947,6 +4947,71 @@ mod tests {
49474947
Ok(())
49484948
}
49494949

4950+
async_param_test! {
4951+
immediate_data_deletion_on_blob_deleted_event -> TestResult: [
4952+
enabled: (true, false),
4953+
disabled: (false, true),
4954+
]
4955+
}
4956+
async fn immediate_data_deletion_on_blob_deleted_event(
4957+
enable_immediate_data_deletion: bool,
4958+
expect_data_stored: bool,
4959+
) -> TestResult {
4960+
walrus_test_utils::init_tracing();
4961+
let events = Sender::new(48);
4962+
let gc_config = GarbageCollectionConfig {
4963+
enable_immediate_data_deletion,
4964+
..GarbageCollectionConfig::default_for_test()
4965+
};
4966+
let node = StorageNodeHandle::builder()
4967+
.with_storage(
4968+
populated_storage(&[
4969+
(SHARD_INDEX, vec![(BLOB_ID, WhichSlivers::Both)]),
4970+
(OTHER_SHARD_INDEX, vec![(BLOB_ID, WhichSlivers::Both)]),
4971+
])
4972+
.await?,
4973+
)
4974+
.with_system_event_provider(events.clone())
4975+
.with_garbage_collection_config(gc_config)
4976+
.with_node_started(true)
4977+
.build()
4978+
.await?;
4979+
let inner = node.as_ref().inner.clone();
4980+
4981+
tokio::time::sleep(Duration::from_millis(50)).await;
4982+
4983+
assert!(
4984+
inner
4985+
.is_stored_at_all_shards_at_latest_epoch(&BLOB_ID)
4986+
.await?,
4987+
);
4988+
events.send(
4989+
BlobRegistered {
4990+
deletable: true,
4991+
..BlobRegistered::for_testing(BLOB_ID)
4992+
}
4993+
.into(),
4994+
)?;
4995+
events.send(
4996+
BlobCertified {
4997+
deletable: true,
4998+
..BlobCertified::for_testing(BLOB_ID)
4999+
}
5000+
.into(),
5001+
)?;
5002+
events.send(BlobDeleted::for_testing(BLOB_ID).into())?;
5003+
5004+
tokio::time::sleep(Duration::from_millis(100)).await;
5005+
5006+
assert_eq!(
5007+
inner
5008+
.is_stored_at_all_shards_at_latest_epoch(&BLOB_ID)
5009+
.await?,
5010+
expect_data_stored,
5011+
);
5012+
Ok(())
5013+
}
5014+
49505015
#[tokio::test]
49515016
async fn returns_correct_blob_status() -> TestResult {
49525017
let blob_event = BlobRegistered::for_testing(BLOB_ID);

crates/walrus-service/src/node/blob_event_processor.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,18 @@ impl BackgroundEventProcessor {
272272
// cannot remove it even if it is no longer valid in the *current* epoch
273273
if blob_info.can_data_be_deleted(event.epoch)
274274
&& self.node.garbage_collection_config.enable_data_deletion
275+
&& self
276+
.node
277+
.garbage_collection_config
278+
.enable_immediate_data_deletion
275279
{
276280
tracing::debug!("deleting data for deleted blob");
277281
self.node
278282
.storage
279283
.attempt_to_delete_blob_data(&blob_id, event.epoch, &self.node.metrics)
280284
.await?;
285+
} else {
286+
tracing::debug!("not deleting data for deleted blob");
281287
}
282288
} else if self
283289
.node
@@ -290,7 +296,10 @@ impl BackgroundEventProcessor {
290296
incomplete history; not deleting blob data"
291297
);
292298
} else {
293-
tracing::warn!("handling a `BlobDeleted` event for an untracked blob");
299+
tracing::debug!(
300+
"handling a `BlobDeleted` event for an untracked blob; this can happen when \
301+
re-processing events after a node restart"
302+
);
294303
}
295304

296305
event_handle.mark_as_complete();

crates/walrus-service/src/node/garbage_collector.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ pub struct GarbageCollectionConfig {
2424
pub enable_blob_info_cleanup: bool,
2525
/// Whether to delete metadata and slivers of expired or deleted blobs.
2626
pub enable_data_deletion: bool,
27+
/// Whether to immediately delete blob data when a `BlobDeleted` event is processed.
28+
///
29+
/// When disabled, data is only deleted during periodic garbage collection. Only relevant if
30+
/// `enable_data_deletion` is `true`.
31+
pub enable_immediate_data_deletion: bool,
2732
/// Whether to add a random delay before starting garbage collection.
2833
/// The delay is deterministically computed based on the node's public key and epoch,
2934
/// uniformly distributed between 0 and half the epoch duration.
@@ -52,6 +57,7 @@ impl Default for GarbageCollectionConfig {
5257
Self {
5358
enable_blob_info_cleanup: true,
5459
enable_data_deletion: true,
60+
enable_immediate_data_deletion: false,
5561
enable_random_delay: true,
5662
randomization_time_window: None,
5763
blob_objects_batch_size: 5000,
@@ -67,6 +73,7 @@ impl GarbageCollectionConfig {
6773
Self {
6874
enable_blob_info_cleanup: true,
6975
enable_data_deletion: true,
76+
enable_immediate_data_deletion: true,
7077
enable_random_delay: true,
7178
randomization_time_window: Some(Duration::from_secs(1)),
7279
blob_objects_batch_size: 10,

crates/walrus-service/src/test_utils.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,7 @@ pub struct StorageNodeHandleBuilder {
868868
num_checkpoints_per_blob: Option<u32>,
869869
enable_node_config_synchronizer: bool,
870870
node_recovery_config: Option<NodeRecoveryConfig>,
871+
garbage_collection_config: Option<GarbageCollectionConfig>,
871872
event_stream_catchup_min_checkpoint_lag: Option<u64>,
872873
max_epochs_ahead: Option<u32>,
873874
}
@@ -962,6 +963,12 @@ impl StorageNodeHandleBuilder {
962963
self
963964
}
964965

966+
/// Sets the garbage collection config for the node.
967+
pub fn with_garbage_collection_config(mut self, config: GarbageCollectionConfig) -> Self {
968+
self.garbage_collection_config = Some(config);
969+
self
970+
}
971+
965972
/// Specify the shard assignment for this node.
966973
///
967974
/// If specified, it will determine the the shard assignment for the node in the committee. If
@@ -1123,7 +1130,9 @@ impl StorageNodeHandleBuilder {
11231130
Default::default()
11241131
},
11251132
blob_recovery: BlobRecoveryConfig::default_for_test(),
1126-
garbage_collection: GarbageCollectionConfig::default_for_test(),
1133+
garbage_collection: self
1134+
.garbage_collection_config
1135+
.unwrap_or_else(|| GarbageCollectionConfig::default_for_test()),
11271136
..storage_node_config().inner
11281137
};
11291138

@@ -1309,7 +1318,9 @@ impl StorageNodeHandleBuilder {
13091318
storage_node_cap: node_capability.map(|cap| cap.id),
13101319
node_recovery_config: self.node_recovery_config.clone().unwrap_or_default(),
13111320
blob_recovery: BlobRecoveryConfig::default_for_test(),
1312-
garbage_collection: GarbageCollectionConfig::default_for_test(),
1321+
garbage_collection: self
1322+
.garbage_collection_config
1323+
.unwrap_or_else(|| GarbageCollectionConfig::default_for_test()),
13131324
..storage_node_config().inner
13141325
};
13151326

@@ -1391,6 +1402,7 @@ impl Default for StorageNodeHandleBuilder {
13911402
num_checkpoints_per_blob: None,
13921403
enable_node_config_synchronizer: false,
13931404
node_recovery_config: None,
1405+
garbage_collection_config: None,
13941406
event_stream_catchup_min_checkpoint_lag: None,
13951407
max_epochs_ahead: None,
13961408
}

0 commit comments

Comments
 (0)