diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index cf78184592..1546f5703f 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -118,6 +118,7 @@ use walrus_sdk::{ GENESIS_EPOCH, PackageEvent, ProtocolEvent, + StoragePoolEvent, }, }, }; @@ -1574,6 +1575,9 @@ impl StorageNode { self.process_package_event(event_handle, package_event) .await?; } + EventStreamElement::ContractEvent(ContractEvent::StoragePoolEvent(pool_event)) => { + self.process_storage_pool_event(event_handle, pool_event)?; + } EventStreamElement::ContractEvent(ContractEvent::DenyListEvent(_event)) => { // TODO: Implement DenyListEvent handling (WAL-424) event_handle.mark_as_complete(); @@ -1583,13 +1587,6 @@ impl StorageNode { )) => { event_handle.mark_as_complete(); } - EventStreamElement::ContractEvent(ContractEvent::StoragePoolEvent(event)) => { - // TODO(WAL-1162): implement storage pool event processing on storage nodes. - panic!( - "storage pool event processing is not yet implemented: {:?}", - event.name() - ); - } EventStreamElement::ContractEvent(ContractEvent::ProtocolEvent(event)) => { panic!( "unexpected protocol version update: {:?}", @@ -1620,6 +1617,33 @@ impl StorageNode { Ok(()) } + #[tracing::instrument(skip_all)] + fn process_storage_pool_event( + &self, + event_handle: EventHandle, + event: StoragePoolEvent, + ) -> anyhow::Result<()> { + let _scope = monitored_scope::monitored_scope("ProcessEvent::StoragePoolEvent"); + + tracing::debug!(?event, "{} event received", event.name()); + match event { + StoragePoolEvent::StoragePoolCreated(ref created) => { + self.inner + .storage + .set_storage_pool_end_epoch(&created.storage_pool_id, created.end_epoch) + .context("failed to set storage pool end epoch")?; + } + StoragePoolEvent::StoragePoolExtended(ref extended) => { + self.inner + .storage + .set_storage_pool_end_epoch(&extended.storage_pool_id, extended.new_end_epoch) + .context("failed to update storage pool end epoch")?; + } + } + event_handle.mark_as_complete(); + Ok(()) + } + #[tracing::instrument(skip_all)] async fn process_epoch_change_event( &self, @@ -3273,11 +3297,16 @@ impl StorageNodeInner { } fn is_blob_registered(&self, blob_id: &BlobId) -> Result { - Ok(self + let epoch = self.current_committee_epoch(); + if self .storage .get_blob_info(blob_id) .context("could not retrieve blob info")? - .is_some_and(|blob_info| blob_info.is_registered(self.current_committee_epoch()))) + .is_some_and(|blob_info| blob_info.is_registered(epoch)) + { + return Ok(true); + } + Ok(false) } fn notify_registration(&self, blob_id: &BlobId) { @@ -3321,11 +3350,16 @@ impl StorageNodeInner { } fn is_blob_certified(&self, blob_id: &BlobId) -> Result { - Ok(self + let epoch = self.current_committee_epoch(); + if self .storage .get_blob_info(blob_id) .context("could not retrieve blob info")? - .is_some_and(|blob_info| blob_info.is_certified(self.current_committee_epoch()))) + .is_some_and(|blob_info| blob_info.is_certified(epoch)) + { + return Ok(true); + } + Ok(false) } /// Returns true if the blob is currently not certified. diff --git a/crates/walrus-service/src/node/blob_event_processor.rs b/crates/walrus-service/src/node/blob_event_processor.rs index cb4f3f455a..8cf9696e9e 100644 --- a/crates/walrus-service/src/node/blob_event_processor.rs +++ b/crates/walrus-service/src/node/blob_event_processor.rs @@ -5,8 +5,8 @@ use std::{num::NonZeroUsize, sync::Arc}; use sui_macros::fail_point_async; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; -use walrus_core::BlobId; -use walrus_sui::types::{BlobCertified, BlobDeleted, BlobEvent, BlobRegistered, InvalidBlobId}; +use walrus_core::{BlobId, Epoch}; +use walrus_sui::types::{BlobEvent, InvalidBlobId}; use walrus_utils::metrics::monitored_scope; use self::pending_events::{PendingEventCounter, PendingEventGuard}; @@ -119,12 +119,19 @@ impl BackgroundEventProcessor { match blob_event { BlobEvent::Certified(event) => { let _scope = monitored_scope::monitored_scope("ProcessEvent::BlobEvent::Certified"); - self.process_blob_certified_event(event_handle, event, checkpoint_position) - .await?; + self.process_blob_certified_event( + event_handle, + event.blob_id, + event.epoch, + event.is_extension, + checkpoint_position, + ) + .await?; } BlobEvent::Deleted(event) => { let _scope = monitored_scope::monitored_scope("ProcessEvent::BlobEvent::Deleted"); - self.process_blob_deleted_event(event_handle, event).await?; + self.process_blob_deleted_event(event_handle, event.blob_id, event.epoch) + .await?; } BlobEvent::InvalidBlobID(event) => { let _scope = @@ -135,7 +142,26 @@ impl BackgroundEventProcessor { // TODO (WAL-424): Implement DenyListBlobDeleted event handling. todo!("DenyListBlobDeleted event handling is not yet implemented"); } - BlobEvent::Registered(_) => { + BlobEvent::PooledBlobCertified(event) => { + let _scope = monitored_scope::monitored_scope( + "ProcessEvent::BlobEvent::PooledBlobCertified", + ); + self.process_blob_certified_event( + event_handle, + event.blob_id, + event.epoch, + false, + checkpoint_position, + ) + .await?; + } + BlobEvent::PooledBlobDeleted(event) => { + let _scope = + monitored_scope::monitored_scope("ProcessEvent::BlobEvent::PooledBlobDeleted"); + self.process_blob_deleted_event(event_handle, event.blob_id, event.epoch) + .await?; + } + BlobEvent::Registered(_) | BlobEvent::PooledBlobRegistered(_) => { unreachable!("registered event should be processed immediately"); } } @@ -148,7 +174,9 @@ impl BackgroundEventProcessor { async fn process_blob_certified_event( &self, event_handle: EventHandle, - event: BlobCertified, + blob_id: BlobId, + epoch: Epoch, + is_extension: bool, checkpoint_position: CheckpointEventPosition, ) -> anyhow::Result<()> { let start = tokio::time::Instant::now(); @@ -166,18 +194,18 @@ impl BackgroundEventProcessor { sui_macros::fail_point_if!( "skip_non_extension_certified_event_triggered_blob_sync", || { - skip_blob_sync_in_test = !event.is_extension; + skip_blob_sync_in_test = !is_extension; } ); if skip_blob_sync_in_test - || !self.node.is_blob_certified(&event.blob_id)? + || !self.node.is_blob_certified(&blob_id)? || self.node.storage.node_status()?.is_catching_up() || (current_event_epoch.is_some() && self .node .is_stored_at_all_shards_at_epoch( - &event.blob_id, + &blob_id, current_event_epoch.expect("just checked that current event epoch is set"), ) .await?) @@ -185,9 +213,9 @@ impl BackgroundEventProcessor { event_handle.mark_as_complete(); tracing::debug!( - %event.blob_id, - %event.epoch, - %event.is_extension, + %blob_id, + %epoch, + %is_extension, "skipping syncing blob during certified event processing", ); @@ -201,14 +229,14 @@ impl BackgroundEventProcessor { self.node .maybe_apply_live_upload_deferral( - event.blob_id, + blob_id, checkpoint_position.checkpoint_sequence_number, ) .await; // Slivers and (possibly) metadata are not stored, so initiate blob sync. self.blob_sync_handler - .start_sync(event.blob_id, event.epoch, Some(event_handle)) + .start_sync(blob_id, epoch, Some(event_handle)) .await?; walrus_utils::with_label!(histogram_set, metrics::STATUS_QUEUED) @@ -220,14 +248,14 @@ impl BackgroundEventProcessor { /// Processes a blob deleted event. #[tracing::instrument( skip_all, - fields(walrus.blob_id = %event.blob_id, walrus.epoch = tracing::field::Empty), + fields(walrus.blob_id = %blob_id, walrus.epoch = tracing::field::Empty), )] async fn process_blob_deleted_event( &self, event_handle: EventHandle, - event: BlobDeleted, + blob_id: BlobId, + epoch: Epoch, ) -> anyhow::Result<()> { - let blob_id = event.blob_id; let current_committee_epoch = self.node.current_committee_epoch(); tracing::Span::current().record("walrus.epoch", current_committee_epoch); @@ -246,13 +274,13 @@ impl BackgroundEventProcessor { // *Important*: We use the event's epoch for this check (as opposed to the current // epoch) as subsequent certify or delete events may update the `blob_info`; so we // cannot remove it even if it is no longer valid in the *current* epoch - if blob_info.can_data_be_deleted(event.epoch) + if blob_info.can_data_be_deleted(epoch) && self.node.garbage_collection_config.enable_data_deletion { tracing::debug!("deleting data for deleted blob"); self.node .storage - .attempt_to_delete_blob_data(&blob_id, event.epoch, &self.node.metrics) + .attempt_to_delete_blob_data(&blob_id, epoch, &self.node.metrics) .await?; } } else if self @@ -387,8 +415,12 @@ impl BlobEventProcessor { .storage .update_blob_info(event_handle.index(), &blob_event)?; - if let BlobEvent::Registered(event) = &blob_event { - self.handle_registered_event(event_handle, event).await?; + if matches!( + &blob_event, + BlobEvent::Registered(_) | BlobEvent::PooledBlobRegistered(_) + ) { + self.handle_registered_event(event_handle, blob_event.blob_id()) + .await?; return Ok(()); } @@ -406,7 +438,7 @@ impl BlobEventProcessor { async fn handle_registered_event( &self, event_handle: EventHandle, - event: &BlobRegistered, + blob_id: BlobId, ) -> anyhow::Result<()> { // Registered event is marked as complete immediately. We need to process registered events // as fast as possible to catch up to the latest event in order to not miss blob sliver @@ -417,22 +449,17 @@ impl BlobEventProcessor { // registered events. let _scope = monitored_scope::monitored_scope("ProcessEvent::BlobEvent::Registered"); - self.node.notify_registration(&event.blob_id); + self.node.notify_registration(&blob_id); - if let Err(error) = self.dispatch_task( - event.blob_id, - BackgroundTask::FlushPendingCaches { - blob_id: event.blob_id, - }, - ) { + if let Err(error) = + self.dispatch_task(blob_id, BackgroundTask::FlushPendingCaches { blob_id }) + { tracing::error!( - blob_id = %event.blob_id, + %blob_id, ?error, "failed to enqueue cache flush after registration, flushing inline", ); - self.node - .flush_pending_caches_with_logging(event.blob_id) - .await; + self.node.flush_pending_caches_with_logging(blob_id).await; } event_handle.mark_as_complete(); diff --git a/crates/walrus-service/src/node/metrics.rs b/crates/walrus-service/src/node/metrics.rs index 2e37b8df11..813da9c59d 100644 --- a/crates/walrus-service/src/node/metrics.rs +++ b/crates/walrus-service/src/node/metrics.rs @@ -325,6 +325,9 @@ impl TelemetryLabel for BlobEvent { BlobEvent::Deleted(_) => "deleted", BlobEvent::InvalidBlobID(_) => "invalid-blob", BlobEvent::DenyListBlobDeleted(_) => "deny-list-deleted", + BlobEvent::PooledBlobRegistered(_) => "pooled-blob-registered", + BlobEvent::PooledBlobCertified(_) => "pooled-blob-certified", + BlobEvent::PooledBlobDeleted(_) => "pooled-blob-deleted", } } } @@ -374,9 +377,6 @@ impl TelemetryLabel for StoragePoolEvent { fn label(&self) -> &'static str { match self { StoragePoolEvent::StoragePoolCreated(_) => "storage-pool-created", - StoragePoolEvent::PooledBlobRegistered(_) => "pooled-blob-registered", - StoragePoolEvent::PooledBlobCertified(_) => "pooled-blob-certified", - StoragePoolEvent::PooledBlobDeleted(_) => "pooled-blob-deleted", StoragePoolEvent::StoragePoolExtended(_) => "storage-pool-extended", } } diff --git a/crates/walrus-service/src/node/storage.rs b/crates/walrus-service/src/node/storage.rs index eaa0e445c7..6001b1dcf8 100644 --- a/crates/walrus-service/src/node/storage.rs +++ b/crates/walrus-service/src/node/storage.rs @@ -432,6 +432,7 @@ impl Storage { } /// Returns the highest epoch for which garbage collection was completed. + #[allow(dead_code)] pub(crate) fn garbage_collector_last_completed_epoch(&self) -> Result { Ok(self .garbage_collector_table @@ -670,6 +671,18 @@ impl Storage { self.blob_info.get_per_object_info(object_id) } + // === Storage Pool Accessors === + + /// Sets the end epoch for a storage pool. + pub(crate) fn set_storage_pool_end_epoch( + &self, + storage_pool_id: &ObjectID, + end_epoch: Epoch, + ) -> Result<(), TypedStoreError> { + self.blob_info + .set_storage_pool_end_epoch(storage_pool_id, end_epoch) + } + /// Returns the current event cursor and the next event index. #[tracing::instrument(skip_all)] pub fn get_event_cursor_and_next_index( diff --git a/crates/walrus-service/src/node/storage/blob_info.rs b/crates/walrus-service/src/node/storage/blob_info.rs index 64348dc80d..37b82b5146 100644 --- a/crates/walrus-service/src/node/storage/blob_info.rs +++ b/crates/walrus-service/src/node/storage/blob_info.rs @@ -25,7 +25,16 @@ use typed_store::{ }; use walrus_core::{BlobId, Epoch}; use walrus_storage_node_client::api::{BlobStatus, DeletableCounts}; -use walrus_sui::types::{BlobCertified, BlobDeleted, BlobEvent, BlobRegistered, InvalidBlobId}; +use walrus_sui::types::{ + BlobCertified, + BlobDeleted, + BlobEvent, + BlobRegistered, + InvalidBlobId, + PooledBlobCertified, + PooledBlobDeleted, + PooledBlobRegistered, +}; use self::per_object_blob_info::PerObjectBlobInfoMergeOperand; pub(crate) use self::per_object_blob_info::{PerObjectBlobInfo, PerObjectBlobInfoApi}; @@ -52,6 +61,8 @@ pub(super) struct BlobInfoTable { aggregate_blob_info: DBMap, per_object_blob_info: DBMap, latest_handled_event_index: Arc>>, + /// Maps storage_pool_id -> current end_epoch. + storage_pool_end_epochs: DBMap, } /// Returns the options for the aggregate blob info column family. @@ -96,11 +107,17 @@ impl BlobInfoTable { &ReadWriteOptions::default(), false, )?)); - + let storage_pool_end_epochs = DBMap::reopen( + database, + Some(constants::storage_pool_end_epochs_cf_name()), + &ReadWriteOptions::default(), + false, + )?; Ok(Self { aggregate_blob_info, per_object_blob_info, latest_handled_event_index, + storage_pool_end_epochs, }) } @@ -111,6 +128,7 @@ impl BlobInfoTable { .lock() .expect("mutex should not be poisoned") .schedule_delete_all()?; + self.storage_pool_end_epochs.schedule_delete_all()?; Ok(()) } @@ -133,6 +151,10 @@ impl BlobInfoTable { // value. db_table_opts_factory.standard(), ), + ( + constants::storage_pool_end_epochs_cf_name(), + db_table_opts_factory.standard(), + ), ] } @@ -190,6 +212,50 @@ impl BlobInfoTable { BlobEvent::InvalidBlobID(_) | BlobEvent::DenyListBlobDeleted(_) => { return Ok(()); } + BlobEvent::PooledBlobRegistered(event) => { + // Direct insert PerObjectBlobInfoV2 for storage pool blobs. + let per_object_v2 = + PerObjectBlobInfo::V2(per_object_blob_info::PerObjectBlobInfoV2 { + blob_id: event.blob_id, + registered_epoch: event.epoch, + certified_epoch: None, + end_epoch_info: per_object_blob_info::EndEpochInfo::StoragePool( + event.storage_pool_id, + ), + deletable: event.deletable, + event: event.event_id, + deleted: false, + }); + batch.insert_batch( + &self.per_object_blob_info, + [(&event.object_id, &per_object_v2)], + )?; + return Ok(()); + } + BlobEvent::PooledBlobCertified(event) => { + // Emit a per-object merge operand (end_epoch is unused by the Certify handler but + // BlobStatusChangeInfo requires it, so we set it to 0). + let per_object_change_info = BlobStatusChangeInfo { + blob_id: event.blob_id, + deletable: event.deletable, + epoch: event.epoch, + end_epoch: 0, + status_event: event.event_id, + }; + let operand = PerObjectBlobInfoMergeOperand { + change_type: BlobStatusChangeType::Certify, + change_info: per_object_change_info, + }; + batch.partial_merge_batch( + &self.per_object_blob_info, + [(&event.object_id, operand.to_bytes())], + )?; + return Ok(()); + } + BlobEvent::PooledBlobDeleted(event) => { + batch.delete_batch(&self.per_object_blob_info, [event.object_id])?; + return Ok(()); + } }; batch.partial_merge_batch( @@ -240,6 +306,11 @@ impl BlobInfoTable { // Extensions need special handling. event.clone() } + BlobEvent::PooledBlobRegistered(_) + | BlobEvent::PooledBlobCertified(_) + | BlobEvent::PooledBlobDeleted(_) => { + return self.update_blob_info(event_index, event); + } }; debug_assert!( @@ -534,13 +605,30 @@ impl BlobInfoTable { current_epoch: Epoch, node_metrics: &NodeMetricSet, ) -> anyhow::Result { - if per_object_blob_info.is_registered(current_epoch) { - tracing::trace!( - %object_id, - ?per_object_blob_info, - "skipping blob-info update for blob that is still active" - ); - return Ok(false); + // For pool blobs, check the pool's end_epoch (source of truth for lifetime). + if let Some(pool_id) = per_object_blob_info.storage_pool_id() { + if self + .storage_pool_end_epochs + .get(&pool_id)? + .is_some_and(|e| e > current_epoch) + { + tracing::trace!( + %object_id, + "skipping blob-info update for blob in active storage pool" + ); + return Ok(false); + } + // Pool expired or unknown — fall through to GC. + } else { + // Regular blob: check per-object end_epoch. + if per_object_blob_info.is_registered(current_epoch) { + tracing::trace!( + %object_id, + ?per_object_blob_info, + "skipping blob-info update for blob that is still active" + ); + return Ok(false); + } } let blob_id = per_object_blob_info.blob_id(); @@ -560,7 +648,12 @@ impl BlobInfoTable { %deletable, "updating blob info for expired blob object" ); - let operand = if deletable { + let operand = if let Some(pool_id) = per_object_blob_info.storage_pool_id() { + BlobInfoMergeOperand::PoolExpired { + storage_pool_id: pool_id, + was_certified, + } + } else if deletable { BlobInfoMergeOperand::DeletableExpired { was_certified } } else { BlobInfoMergeOperand::PermanentExpired { was_certified } @@ -584,6 +677,18 @@ impl BlobInfoTable { Ok(true) } + // === Storage Pool Methods === + + /// Sets the end epoch for a storage pool. + pub(crate) fn set_storage_pool_end_epoch( + &self, + storage_pool_id: &ObjectID, + end_epoch: Epoch, + ) -> Result<(), TypedStoreError> { + self.storage_pool_end_epochs + .insert(storage_pool_id, &end_epoch) + } + /// Checks some internal invariants of the blob info table. /// /// The checks are not exhaustive yet. @@ -597,10 +702,13 @@ impl BlobInfoTable { .safe_iter_with_snapshot(&snapshot) .context("failed to create per-object blob info snapshot iterator")? { - let Ok((object_id, PerObjectBlobInfo::V1(per_object_blob_info))) = result else { - return Err(anyhow::anyhow!( - "error encountered while iterating over per-object blob info: {result:?}" - )); + let (object_id, per_object_blob_info) = match result { + Ok(v) => v, + Err(e) => { + return Err(anyhow::anyhow!( + "error encountered while iterating over per-object blob info: {e:?}" + )); + } }; let blob_id = per_object_blob_info.blob_id(); per_object_table_blob_ids.insert(blob_id); @@ -613,44 +721,66 @@ impl BlobInfoTable { per-object blob info entry exists (object ID: {object_id})" )); }; - let BlobInfo::V1(BlobInfoV1::Valid(ValidBlobInfoV1 { - count_deletable_total, - count_deletable_certified, - latest_seen_deletable_registered_end_epoch, - latest_seen_deletable_certified_end_epoch, - .. - })) = blob_info - else { - continue; - }; - if per_object_blob_info.is_deletable() { - let per_object_end_epoch = per_object_blob_info.end_epoch; + + // Extract deletable counts for cross-checking (works for both V1 and V2). + let (count_deletable_total, count_deletable_certified, latest_reg, latest_cert) = + match &blob_info { + BlobInfo::V1(BlobInfoV1::Valid(v)) => ( + v.count_deletable_total, + v.count_deletable_certified, + v.latest_seen_deletable_registered_end_epoch, + v.latest_seen_deletable_certified_end_epoch, + ), + BlobInfo::V2(BlobInfoV2::Valid(v)) => ( + v.count_deletable_total, + v.count_deletable_certified, + // V2 doesn't track end epochs; skip epoch-based assertions. + None, + None, + ), + _ => continue, + }; + + // Only check regular deletable blob invariants for V1 per-object entries. + // V2 per-object entries are storage pool blobs tracked differently. + if per_object_blob_info.storage_pool_id().is_none() + && per_object_blob_info.is_deletable() + { + let per_object_end_epoch = match &per_object_blob_info { + PerObjectBlobInfo::V1(v) => v.end_epoch, + PerObjectBlobInfo::V2(v) => match v.end_epoch_info { + per_object_blob_info::EndEpochInfo::Individual(e) => e, + per_object_blob_info::EndEpochInfo::StoragePool(_) => { + unreachable!("storage_pool_id() returned None above") + } + }, + }; anyhow::ensure!( count_deletable_total > 0, "count_deletable_total is 0 for blob ID {blob_id}, even though a deletable \ blob object exists (object ID: {object_id})" ); anyhow::ensure!( - latest_seen_deletable_registered_end_epoch - .is_some_and(|e| e >= per_object_end_epoch), + latest_reg.is_some_and(|e| e >= per_object_end_epoch), "latest_seen_deletable_registered_end_epoch for blob ID {blob_id} is \ - {latest_seen_deletable_registered_end_epoch:?}, which is inconsistent with the \ - end epoch of a deletable blob object: {per_object_end_epoch} (object ID: \ - {object_id})" + {latest_reg:?}, which is inconsistent with the end epoch of a deletable blob \ + object: {per_object_end_epoch} (object ID: {object_id})" ); - if per_object_blob_info.certified_epoch.is_some() { + let certified_epoch = match &per_object_blob_info { + PerObjectBlobInfo::V1(v) => v.certified_epoch, + PerObjectBlobInfo::V2(v) => v.certified_epoch, + }; + if certified_epoch.is_some() { anyhow::ensure!( count_deletable_certified > 0, "count_deletable_certified is 0 for blob ID {blob_id}, even though a \ deletable certified blob object exists (object ID: {object_id})" ); anyhow::ensure!( - latest_seen_deletable_certified_end_epoch - .is_some_and(|e| e >= per_object_end_epoch), + latest_cert.is_some_and(|e| e >= per_object_end_epoch), "latest_seen_deletable_certified_end_epoch for blob ID {blob_id} is \ - {latest_seen_deletable_certified_end_epoch:?}, which is inconsistent with \ - the end epoch of a deletable certified blob object: {per_object_end_epoch} \ - (object ID: {object_id})" + {latest_cert:?}, which is inconsistent with the end epoch of a deletable \ + certified blob object: {per_object_end_epoch} (object ID: {object_id})" ); } } @@ -666,18 +796,33 @@ impl BlobInfoTable { "error encountered while iterating over aggregate blob info: {result:?}" )); }; - let BlobInfo::V1(BlobInfoV1::Valid(blob_info)) = blob_info else { - continue; - }; - if !blob_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) { - return Err(anyhow::anyhow!( - "per-object blob info not found for blob ID {blob_id}, even though a \ - valid aggregate blob info entry referencing objects exists: {blob_info:?}" - )); + match &blob_info { + BlobInfo::V1(BlobInfoV1::Valid(v1_info)) => { + if !v1_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) { + return Err(anyhow::anyhow!( + "per-object blob info not found for blob ID {blob_id}, even though a \ + valid V1 aggregate blob info entry referencing objects exists: \ + {v1_info:?}" + )); + } + v1_info.check_invariants().context(format!( + "aggregate blob info invariants violated for blob ID {blob_id}" + ))?; + } + BlobInfo::V2(BlobInfoV2::Valid(v2_info)) => { + if !v2_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) { + return Err(anyhow::anyhow!( + "per-object blob info not found for blob ID {blob_id}, even though a \ + valid V2 aggregate blob info entry referencing objects exists: \ + {v2_info:?}" + )); + } + v2_info.check_invariants().context(format!( + "aggregate blob info V2 invariants violated for blob ID {blob_id}" + ))?; + } + _ => continue, } - blob_info.check_invariants().context(format!( - "aggregate blob info invariants violated for blob ID {blob_id}" - ))?; } Ok(()) } @@ -883,6 +1028,17 @@ pub(super) struct BlobStatusChangeInfo { pub(super) status_event: EventID, } +/// Change info for storage pool blob events. +/// +/// Unlike `BlobStatusChangeInfo`, this does not carry `end_epoch` because the lifetime of pool +/// blobs is determined by the pool's end_epoch (tracked in `storage_pool_end_epochs`), not by +/// the individual blob's end_epoch at registration time. +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] +pub(super) struct PooledBlobChangeInfo { + pub(super) epoch: Epoch, + pub(super) storage_pool_id: ObjectID, +} + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, Copy)] pub(super) enum BlobStatusChangeType { Register, @@ -970,6 +1126,16 @@ pub(super) enum BlobInfoMergeOperand { PermanentExpired { was_certified: bool, }, + /// A status change for a blob in a storage pool. + PooledBlobChangeStatus { + change_type: BlobStatusChangeType, + change_info: PooledBlobChangeInfo, + }, + /// A blob in a storage pool has expired (pool lifetime ended). + PoolExpired { + storage_pool_id: ObjectID, + was_certified: bool, + }, } impl ToBytes for BlobInfoMergeOperand {} @@ -1019,6 +1185,44 @@ impl From<&InvalidBlobId> for BlobInfoMergeOperand { } } +impl From<&PooledBlobRegistered> for BlobInfoMergeOperand { + fn from(value: &PooledBlobRegistered) -> Self { + Self::PooledBlobChangeStatus { + change_type: BlobStatusChangeType::Register, + change_info: PooledBlobChangeInfo { + epoch: value.epoch, + storage_pool_id: value.storage_pool_id, + }, + } + } +} + +impl From<&PooledBlobCertified> for BlobInfoMergeOperand { + fn from(value: &PooledBlobCertified) -> Self { + Self::PooledBlobChangeStatus { + change_type: BlobStatusChangeType::Certify, + change_info: PooledBlobChangeInfo { + epoch: value.epoch, + storage_pool_id: value.storage_pool_id, + }, + } + } +} + +impl From<&PooledBlobDeleted> for BlobInfoMergeOperand { + fn from(value: &PooledBlobDeleted) -> Self { + Self::PooledBlobChangeStatus { + change_type: BlobStatusChangeType::Delete { + was_certified: value.was_certified, + }, + change_info: PooledBlobChangeInfo { + epoch: value.epoch, + storage_pool_id: value.storage_pool_id, + }, + } + } +} + impl From<&BlobEvent> for BlobInfoMergeOperand { fn from(value: &BlobEvent) -> Self { match value { @@ -1026,6 +1230,9 @@ impl From<&BlobEvent> for BlobInfoMergeOperand { BlobEvent::Certified(event) => event.into(), BlobEvent::Deleted(event) => event.into(), BlobEvent::InvalidBlobID(event) => event.into(), + BlobEvent::PooledBlobRegistered(event) => event.into(), + BlobEvent::PooledBlobCertified(event) => event.into(), + BlobEvent::PooledBlobDeleted(event) => event.into(), BlobEvent::DenyListBlobDeleted(_) => { // TODO (WAL-424): Implement DenyListBlobDeleted event handling. // Note: It's fine to panic here with a todo!, because in order to trigger this @@ -1548,111 +1755,492 @@ impl PermanentBlobInfoV1 { } } -impl CertifiedBlobInfoApi for BlobInfoV1 { - fn is_certified(&self, current_epoch: Epoch) -> bool { - if let Self::Valid(valid_blob_info) = self { - valid_blob_info.is_certified(current_epoch) - } else { - false +/// V2 aggregate blob info that supports both regular blobs and storage pool blobs. +/// +/// Regular blob fields are identical to V1. Storage pool blobs are tracked via flat counters. +/// A blob_id with only regular references has zero pool counters. A blob_id with only pool +/// references has zero regular counts. Both can coexist. +// INV: same invariants as V1 for regular fields, plus: +// INV: count_pooled_refs_total >= count_pooled_refs_certified +// INV: initial_certified_epoch considers both regular AND pool certified counts +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub(crate) struct ValidBlobInfoV2 { + // Regular blob fields (same as V1). + pub is_metadata_stored: bool, + pub count_deletable_total: u32, + pub count_deletable_certified: u32, + pub permanent_total: Option, + pub permanent_certified: Option, + pub initial_certified_epoch: Option, + // Storage pool references (flat counters). + pub count_pooled_refs_total: u32, + pub count_pooled_refs_certified: u32, +} + +impl From for ValidBlobInfoV2 { + fn from(v1: ValidBlobInfoV1) -> Self { + Self { + is_metadata_stored: v1.is_metadata_stored, + count_deletable_total: v1.count_deletable_total, + count_deletable_certified: v1.count_deletable_certified, + permanent_total: v1.permanent_total, + permanent_certified: v1.permanent_certified, + initial_certified_epoch: v1.initial_certified_epoch, + count_pooled_refs_total: 0, + count_pooled_refs_certified: 0, } } +} - fn initial_certified_epoch(&self) -> Option { - if let Self::Valid(ValidBlobInfoV1 { - initial_certified_epoch, - .. - }) = self - { - *initial_certified_epoch +impl ValidBlobInfoV2 { + /// Handles regular blob status changes (same logic as V1). + fn update_status( + &mut self, + change_type: BlobStatusChangeType, + change_info: BlobStatusChangeInfo, + ) { + let was_certified = self.is_certified(change_info.epoch); + if change_info.deletable { + match change_type { + BlobStatusChangeType::Register => { + self.count_deletable_total += 1; + } + BlobStatusChangeType::Certify => { + if self.count_deletable_total <= self.count_deletable_certified { + tracing::error!( + "attempt to certify a deletable blob before corresponding register" + ); + return; + } + self.count_deletable_certified += 1; + } + BlobStatusChangeType::Extend => { + // No-op for V2 deletable blobs (no end epoch tracking). + } + BlobStatusChangeType::Delete { was_certified } => { + self.update_deletable_counters(was_certified); + } + } } else { - None + match change_type { + BlobStatusChangeType::Register => { + ValidBlobInfoV1::register_permanent(&mut self.permanent_total, &change_info); + } + BlobStatusChangeType::Certify => { + if !ValidBlobInfoV1::certify_permanent( + &self.permanent_total, + &mut self.permanent_certified, + &change_info, + ) { + return; + } + } + BlobStatusChangeType::Extend => { + ValidBlobInfoV1::extend_permanent(&mut self.permanent_total, &change_info); + ValidBlobInfoV1::extend_permanent(&mut self.permanent_certified, &change_info); + } + BlobStatusChangeType::Delete { .. } => { + tracing::error!("attempt to delete a permanent blob"); + return; + } + } + } + + // Update initial certified epoch. + match change_type { + BlobStatusChangeType::Certify => { + self.update_initial_certified_epoch(change_info.epoch, !was_certified); + } + BlobStatusChangeType::Delete { .. } => { + self.maybe_unset_initial_certified_epoch(); + } + BlobStatusChangeType::Register | BlobStatusChangeType::Extend => (), } } -} -impl BlobInfoApi for BlobInfoV1 { - fn is_metadata_stored(&self) -> bool { - matches!( - self, - Self::Valid(ValidBlobInfoV1 { - is_metadata_stored: true, - .. - }) - ) + fn deletable_expired(&mut self, was_certified: bool) { + self.update_deletable_counters(was_certified); + self.maybe_unset_initial_certified_epoch(); } - // Note: See the `is_certified` method for an explanation of the use of the - // `latest_seen_deletable_registered_end_epoch` field. - fn is_registered(&self, current_epoch: Epoch) -> bool { - let Self::Valid(ValidBlobInfoV1 { - count_deletable_total, - permanent_total, - latest_seen_deletable_registered_end_epoch, - .. - }) = self - else { - return false; - }; + fn permanent_expired(&mut self, was_certified: bool) { + ValidBlobInfoV1::decrement_blob_info_inner(&mut self.permanent_total); + if was_certified { + ValidBlobInfoV1::decrement_blob_info_inner(&mut self.permanent_certified); + } + self.maybe_unset_initial_certified_epoch(); + } - let exists_registered_permanent_blob = permanent_total - .as_ref() - .is_some_and(|p| p.end_epoch > current_epoch); - let probably_exists_registered_deletable_blob = *count_deletable_total > 0 - && latest_seen_deletable_registered_end_epoch.is_some_and(|e| e > current_epoch); + fn update_deletable_counters(&mut self, was_certified: bool) { + self.count_deletable_total = self.count_deletable_total.saturating_sub(1); + if was_certified { + self.count_deletable_certified = self.count_deletable_certified.saturating_sub(1); + } + } - exists_registered_permanent_blob || probably_exists_registered_deletable_blob + // --- Storage pool operations --- + + /// Registers a blob in a storage pool. + fn pool_register(&mut self) { + self.count_pooled_refs_total += 1; } - fn can_blob_info_be_deleted(&self, current_epoch: Epoch) -> bool { - match self { - Self::Invalid { .. } => { - // We don't know whether there are any deletable blob objects for this blob ID. - false - } - Self::Valid(ValidBlobInfoV1 { - count_deletable_total, - permanent_total, - .. - }) => { - *count_deletable_total == 0 - && permanent_total.is_none() - && self.can_data_be_deleted(current_epoch) - } + /// Certifies a blob in a storage pool. + fn pool_certify(&mut self, epoch: Epoch) { + let was_certified = self.has_any_certified(); + if self.count_pooled_refs_total <= self.count_pooled_refs_certified { + tracing::error!("attempt to certify a pool blob before corresponding register"); + return; } + self.count_pooled_refs_certified += 1; + self.update_initial_certified_epoch(epoch, !was_certified); } - fn invalidation_event(&self) -> Option { - if let Self::Invalid { event, .. } = self { - Some(*event) - } else { - None + /// Deletes a blob from a storage pool. + fn pool_delete(&mut self, was_certified: bool) { + self.count_pooled_refs_total = self.count_pooled_refs_total.saturating_sub(1); + if was_certified { + self.count_pooled_refs_certified = self.count_pooled_refs_certified.saturating_sub(1); } + self.maybe_unset_initial_certified_epoch(); } - fn to_blob_status(&self, current_epoch: Epoch) -> BlobStatus { - match self { - BlobInfoV1::Invalid { event, .. } => BlobStatus::Invalid { event: *event }, - BlobInfoV1::Valid(valid_blob_info) => valid_blob_info.to_blob_status(current_epoch), + /// Handles a pool blob expiring (pool lifetime ended). + fn pool_expired(&mut self, was_certified: bool) { + self.pool_delete(was_certified); + } + + // --- Helper methods --- + + /// Returns true if any certified references exist (regular or pool). + fn has_any_certified(&self) -> bool { + if self.count_deletable_certified > 0 || self.permanent_certified.is_some() { + return true; } + self.count_pooled_refs_certified > 0 } -} -impl Mergeable for BlobInfoV1 { - type MergeOperand = BlobInfoMergeOperand; - type Key = BlobId; + fn update_initial_certified_epoch(&mut self, new_certified_epoch: Epoch, force: bool) { + if force + || self + .initial_certified_epoch + .is_none_or(|existing_epoch| existing_epoch > new_certified_epoch) + { + self.initial_certified_epoch = Some(new_certified_epoch); + } + } - fn merge_with(mut self, operand: Self::MergeOperand) -> Self { - match (&mut self, operand) { - // If the blob is already marked as invalid, do not update the status. - (Self::Invalid { .. }, _) => (), - ( - _, - BlobInfoMergeOperand::MarkInvalid { - epoch, - status_event, - }, - ) => { - return Self::Invalid { + fn maybe_unset_initial_certified_epoch(&mut self) { + if !self.has_any_certified() { + self.initial_certified_epoch = None; + } + } + + fn has_no_objects(&self) -> bool { + self.count_deletable_total == 0 + && self.count_deletable_certified == 0 + && self.permanent_total.is_none() + && self.permanent_certified.is_none() + && self.initial_certified_epoch.is_none() + && self.count_pooled_refs_total == 0 + } + + /// Checks the invariants of this V2 blob info. + fn check_invariants(&self) -> anyhow::Result<()> { + // Regular field invariants (same as V1). + let has_regular_certified = + self.count_deletable_certified > 0 || self.permanent_certified.is_some(); + let has_pool_certified = self.count_pooled_refs_certified > 0; + + anyhow::ensure!(self.count_deletable_total >= self.count_deletable_certified); + match self.initial_certified_epoch { + None => { + anyhow::ensure!(!has_regular_certified && !has_pool_certified) + } + Some(_) => { + anyhow::ensure!(has_regular_certified || has_pool_certified) + } + } + + match (&self.permanent_total, &self.permanent_certified) { + (None, Some(_)) => { + anyhow::bail!("permanent_total.is_none() => permanent_certified.is_none()") + } + (Some(total_inner), Some(certified_inner)) => { + anyhow::ensure!(total_inner.end_epoch >= certified_inner.end_epoch); + anyhow::ensure!(total_inner.count >= certified_inner.count); + } + _ => (), + } + + // Pool ref invariants. + anyhow::ensure!( + self.count_pooled_refs_total >= self.count_pooled_refs_certified, + "pool ref count_pooled_refs_total < count_pooled_refs_certified" + ); + + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) enum BlobInfoV2 { + Invalid { epoch: Epoch, event: EventID }, + Valid(ValidBlobInfoV2), +} + +impl ToBytes for BlobInfoV2 {} + +impl From for BlobInfoV2 { + fn from(v1: BlobInfoV1) -> Self { + match v1 { + BlobInfoV1::Invalid { epoch, event } => BlobInfoV2::Invalid { epoch, event }, + BlobInfoV1::Valid(v) => BlobInfoV2::Valid(v.into()), + } + } +} + +impl CertifiedBlobInfoApi for ValidBlobInfoV2 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + let exists_certified_permanent_blob = self + .permanent_certified + .as_ref() + .is_some_and(|p| p.end_epoch > current_epoch); + self.initial_certified_epoch + .is_some_and(|epoch| epoch <= current_epoch) + && (exists_certified_permanent_blob + || self.count_deletable_certified > 0 + || self.count_pooled_refs_certified > 0) + } + + fn initial_certified_epoch(&self) -> Option { + self.initial_certified_epoch + } +} + +impl CertifiedBlobInfoApi for BlobInfoV2 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + match self { + Self::Valid(v) => v.is_certified(current_epoch), + Self::Invalid { .. } => false, + } + } + + fn initial_certified_epoch(&self) -> Option { + match self { + Self::Valid(v) => v.initial_certified_epoch, + Self::Invalid { .. } => None, + } + } +} + +impl BlobInfoApi for BlobInfoV2 { + fn is_metadata_stored(&self) -> bool { + matches!( + self, + Self::Valid(ValidBlobInfoV2 { + is_metadata_stored: true, + .. + }) + ) + } + + fn is_registered(&self, current_epoch: Epoch) -> bool { + let Self::Valid(v) = self else { + return false; + }; + + let exists_registered_permanent_blob = v + .permanent_total + .as_ref() + .is_some_and(|p| p.end_epoch > current_epoch); + + exists_registered_permanent_blob + || v.count_deletable_total > 0 + || v.count_pooled_refs_total > 0 + } + + fn can_blob_info_be_deleted(&self, current_epoch: Epoch) -> bool { + match self { + Self::Invalid { .. } => false, + Self::Valid(v) => { + v.count_deletable_total == 0 + && v.permanent_total.is_none() + && v.count_pooled_refs_total == 0 + && self.can_data_be_deleted(current_epoch) + } + } + } + + fn invalidation_event(&self) -> Option { + if let Self::Invalid { event, .. } = self { + Some(*event) + } else { + None + } + } + + fn to_blob_status(&self, current_epoch: Epoch) -> BlobStatus { + match self { + BlobInfoV2::Invalid { event, .. } => BlobStatus::Invalid { event: *event }, + BlobInfoV2::Valid(v) => { + let initial_certified_epoch = v.initial_certified_epoch; + + // Deletable counts include both regular and pool refs. + let deletable_counts = DeletableCounts { + count_deletable_total: v + .count_deletable_total + .saturating_add(v.count_pooled_refs_total), + count_deletable_certified: v + .count_deletable_certified + .saturating_add(v.count_pooled_refs_certified), + }; + + if let Some(PermanentBlobInfoV1 { + end_epoch, event, .. + }) = v.permanent_certified.as_ref() + && *end_epoch > current_epoch + { + return BlobStatus::Permanent { + end_epoch: *end_epoch, + is_certified: true, + status_event: *event, + deletable_counts, + initial_certified_epoch, + }; + } + if let Some(PermanentBlobInfoV1 { + end_epoch, event, .. + }) = v.permanent_total.as_ref() + && *end_epoch > current_epoch + { + return BlobStatus::Permanent { + end_epoch: *end_epoch, + is_certified: false, + status_event: *event, + deletable_counts, + initial_certified_epoch, + }; + } + + if deletable_counts != Default::default() { + BlobStatus::Deletable { + initial_certified_epoch, + deletable_counts, + } + } else { + BlobStatus::Nonexistent + } + } + } + } +} + +impl CertifiedBlobInfoApi for BlobInfoV1 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + if let Self::Valid(valid_blob_info) = self { + valid_blob_info.is_certified(current_epoch) + } else { + false + } + } + + fn initial_certified_epoch(&self) -> Option { + if let Self::Valid(ValidBlobInfoV1 { + initial_certified_epoch, + .. + }) = self + { + *initial_certified_epoch + } else { + None + } + } +} + +impl BlobInfoApi for BlobInfoV1 { + fn is_metadata_stored(&self) -> bool { + matches!( + self, + Self::Valid(ValidBlobInfoV1 { + is_metadata_stored: true, + .. + }) + ) + } + + // Note: See the `is_certified` method for an explanation of the use of the + // `latest_seen_deletable_registered_end_epoch` field. + fn is_registered(&self, current_epoch: Epoch) -> bool { + let Self::Valid(ValidBlobInfoV1 { + count_deletable_total, + permanent_total, + latest_seen_deletable_registered_end_epoch, + .. + }) = self + else { + return false; + }; + + let exists_registered_permanent_blob = permanent_total + .as_ref() + .is_some_and(|p| p.end_epoch > current_epoch); + let probably_exists_registered_deletable_blob = *count_deletable_total > 0 + && latest_seen_deletable_registered_end_epoch.is_some_and(|e| e > current_epoch); + + exists_registered_permanent_blob || probably_exists_registered_deletable_blob + } + + fn can_blob_info_be_deleted(&self, current_epoch: Epoch) -> bool { + match self { + Self::Invalid { .. } => { + // We don't know whether there are any deletable blob objects for this blob ID. + false + } + Self::Valid(ValidBlobInfoV1 { + count_deletable_total, + permanent_total, + .. + }) => { + *count_deletable_total == 0 + && permanent_total.is_none() + && self.can_data_be_deleted(current_epoch) + } + } + } + + fn invalidation_event(&self) -> Option { + if let Self::Invalid { event, .. } = self { + Some(*event) + } else { + None + } + } + + fn to_blob_status(&self, current_epoch: Epoch) -> BlobStatus { + match self { + BlobInfoV1::Invalid { event, .. } => BlobStatus::Invalid { event: *event }, + BlobInfoV1::Valid(valid_blob_info) => valid_blob_info.to_blob_status(current_epoch), + } + } +} + +impl Mergeable for BlobInfoV1 { + type MergeOperand = BlobInfoMergeOperand; + type Key = BlobId; + + fn merge_with(mut self, operand: Self::MergeOperand) -> Self { + match (&mut self, operand) { + // If the blob is already marked as invalid, do not update the status. + (Self::Invalid { .. }, _) => (), + ( + _, + BlobInfoMergeOperand::MarkInvalid { + epoch, + status_event, + }, + ) => { + return Self::Invalid { epoch, event: status_event, }; @@ -1680,6 +2268,11 @@ impl Mergeable for BlobInfoV1 { Self::Valid(valid_blob_info), BlobInfoMergeOperand::PermanentExpired { was_certified }, ) => valid_blob_info.permanent_expired(was_certified), + // Pool operands should never reach V1 — they are intercepted at the BlobInfo level. + (_, BlobInfoMergeOperand::PooledBlobChangeStatus { .. }) + | (_, BlobInfoMergeOperand::PoolExpired { .. }) => { + unreachable!("pool operands should be handled at BlobInfo level, not BlobInfoV1") + } } self } @@ -1748,6 +2341,149 @@ impl Mergeable for BlobInfoV1 { ); None } + // Pool operands should never reach V1 — they are intercepted at the BlobInfo level. + BlobInfoMergeOperand::PooledBlobChangeStatus { .. } + | BlobInfoMergeOperand::PoolExpired { .. } => { + unreachable!("pool operands should be handled at BlobInfo level, not BlobInfoV1") + } + } + } +} + +impl Mergeable for BlobInfoV2 { + type MergeOperand = BlobInfoMergeOperand; + type Key = BlobId; + + fn merge_with(mut self, operand: Self::MergeOperand) -> Self { + match (&mut self, operand) { + // If the blob is already marked as invalid, do not update the status. + (Self::Invalid { .. }, _) => (), + ( + _, + BlobInfoMergeOperand::MarkInvalid { + epoch, + status_event, + }, + ) => { + return Self::Invalid { + epoch, + event: status_event, + }; + } + ( + Self::Valid(ValidBlobInfoV2 { + is_metadata_stored, .. + }), + BlobInfoMergeOperand::MarkMetadataStored(new_is_metadata_stored), + ) => { + *is_metadata_stored = new_is_metadata_stored; + } + // Regular blob status changes. + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::ChangeStatus { + change_type, + change_info, + }, + ) => valid_blob_info.update_status(change_type, change_info), + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::DeletableExpired { was_certified }, + ) => valid_blob_info.deletable_expired(was_certified), + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::PermanentExpired { was_certified }, + ) => valid_blob_info.permanent_expired(was_certified), + // Storage pool operations. + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::PooledBlobChangeStatus { + change_type, + change_info, + .. + }, + ) => match change_type { + BlobStatusChangeType::Register => { + valid_blob_info.pool_register(); + } + BlobStatusChangeType::Certify => { + valid_blob_info.pool_certify(change_info.epoch); + } + BlobStatusChangeType::Extend => { + // Extensions don't change ref counts for storage pool. + // The pool's end_epoch is tracked separately. + } + BlobStatusChangeType::Delete { was_certified } => { + valid_blob_info.pool_delete(was_certified); + } + }, + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::PoolExpired { was_certified, .. }, + ) => valid_blob_info.pool_expired(was_certified), + } + self + } + + fn merge_new(operand: Self::MergeOperand) -> Option { + match operand { + BlobInfoMergeOperand::PooledBlobChangeStatus { + change_type: BlobStatusChangeType::Register, + .. + } => { + let mut v2 = ValidBlobInfoV2::default(); + v2.pool_register(); + Some(BlobInfoV2::Valid(v2)) + } + BlobInfoMergeOperand::MarkInvalid { + epoch, + status_event, + } => Some(BlobInfoV2::Invalid { + epoch, + event: status_event, + }), + BlobInfoMergeOperand::MarkMetadataStored(is_metadata_stored) => { + tracing::info!( + is_metadata_stored, + "marking metadata stored for an untracked blob ID; blob info will be removed \ + during the next garbage collection" + ); + Some(BlobInfoV2::Valid(ValidBlobInfoV2 { + is_metadata_stored, + ..Default::default() + })) + } + BlobInfoMergeOperand::ChangeStatus { + change_type: BlobStatusChangeType::Register, + change_info: + BlobStatusChangeInfo { + deletable, + end_epoch, + status_event, + .. + }, + } => Some(BlobInfoV2::Valid(if deletable { + ValidBlobInfoV2 { + count_deletable_total: 1, + ..Default::default() + } + } else { + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_first(end_epoch, status_event)), + ..Default::default() + } + })), + _ => { + tracing::error!( + ?operand, + "encountered an unexpected update for an untracked blob ID (V2)" + ); + debug_assert!( + false, + "encountered an unexpected update for an untracked blob ID (V2): {operand:?}", + ); + None + } } } } @@ -1794,6 +2530,7 @@ impl Ord for BlobCertificationStatus { #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] pub(crate) enum BlobInfo { V1(BlobInfoV1), + V2(BlobInfoV2), } impl BlobInfo { @@ -1840,12 +2577,29 @@ impl Mergeable for BlobInfo { fn merge_with(self, operand: Self::MergeOperand) -> Self { match self { - Self::V1(value) => Self::V1(value.merge_with(operand)), + Self::V1(v1) => match &operand { + // Only upgrade to V2 when a pool operand hits a V1 entry. + BlobInfoMergeOperand::PooledBlobChangeStatus { .. } + | BlobInfoMergeOperand::PoolExpired { .. } => { + Self::V2(BlobInfoV2::from(v1).merge_with(operand)) + } + // Regular operands keep V1 as V1. + _ => Self::V1(v1.merge_with(operand)), + }, + // V2 handles all operand types (regular AND pool). + Self::V2(v2) => Self::V2(v2.merge_with(operand)), } } fn merge_new(operand: Self::MergeOperand) -> Option { - BlobInfoV1::merge_new(operand).map(Self::from) + match &operand { + // First event for a blob_id is a pool event → create V2. + BlobInfoMergeOperand::PooledBlobChangeStatus { .. } => { + BlobInfoV2::merge_new(operand).map(Self::V2) + } + // First event is a regular event → create V1 (as before). + _ => BlobInfoV1::merge_new(operand).map(Self::V1), + } } } @@ -1901,6 +2655,8 @@ mod per_object_blob_info { fn is_registered(&self, current_epoch: Epoch) -> bool; /// Returns true iff the object is already deleted. fn is_deleted(&self) -> bool; + /// Returns the storage pool ID if this is a storage pool blob. + fn storage_pool_id(&self) -> Option; } #[enum_dispatch(CertifiedBlobInfoApi)] @@ -1908,6 +2664,7 @@ mod per_object_blob_info { #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] pub(crate) enum PerObjectBlobInfo { V1(PerObjectBlobInfoV1), + V2(PerObjectBlobInfoV2), } impl PerObjectBlobInfo { @@ -1942,6 +2699,7 @@ mod per_object_blob_info { fn merge_with(self, operand: Self::MergeOperand) -> Self { match self { Self::V1(value) => Self::V1(value.merge_with(operand)), + Self::V2(value) => Self::V2(value.merge_with(operand)), } } @@ -1997,10 +2755,164 @@ mod per_object_blob_info { fn is_deleted(&self) -> bool { self.deleted } + + fn storage_pool_id(&self) -> Option { + None + } } impl ToBytes for PerObjectBlobInfoV1 {} + /// How the end epoch of a per-object blob is determined. + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] + pub(crate) enum EndEpochInfo { + /// Regular blob with an individual end epoch. + Individual(Epoch), + /// Pooled blob whose lifetime is determined by the storage pool's end epoch. + StoragePool(ObjectID), + } + + /// V2 per-object blob info that supports both regular blobs and storage pool blobs. + /// + /// Unlike V1, the end epoch and storage pool ID are combined into a single `EndEpochInfo` + /// enum: regular blobs have `EndEpochInfo::Individual(end_epoch)`, while pool blobs have + /// `EndEpochInfo::StoragePool(pool_id)`. + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] + pub(crate) struct PerObjectBlobInfoV2 { + /// The blob ID. + pub blob_id: BlobId, + /// The epoch in which the blob has been registered. + pub registered_epoch: Epoch, + /// The epoch in which the blob was first certified, `None` if the blob is uncertified. + pub certified_epoch: Option, + /// How the blob's end epoch is determined. + pub end_epoch_info: EndEpochInfo, + /// Whether the blob is deletable. + pub deletable: bool, + /// The ID of the last blob event related to this object. + pub event: EventID, + /// Whether the blob has been deleted. + pub deleted: bool, + } + + impl CertifiedBlobInfoApi for PerObjectBlobInfoV2 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + self.is_registered(current_epoch) + && self + .certified_epoch + .is_some_and(|epoch| epoch <= current_epoch) + } + + fn initial_certified_epoch(&self) -> Option { + self.certified_epoch + } + } + + impl PerObjectBlobInfoApi for PerObjectBlobInfoV2 { + fn blob_id(&self) -> BlobId { + self.blob_id + } + + fn is_deletable(&self) -> bool { + self.deletable + } + + fn is_registered(&self, current_epoch: Epoch) -> bool { + if self.deleted { + return false; + } + match self.end_epoch_info { + EndEpochInfo::Individual(end_epoch) => end_epoch > current_epoch, + // Pool blob liveness depends on the pool, not the blob. + EndEpochInfo::StoragePool(_) => true, + } + } + + fn is_deleted(&self) -> bool { + self.deleted + } + + fn storage_pool_id(&self) -> Option { + match &self.end_epoch_info { + EndEpochInfo::StoragePool(id) => Some(*id), + EndEpochInfo::Individual(_) => None, + } + } + } + + impl ToBytes for PerObjectBlobInfoV2 {} + + impl Mergeable for PerObjectBlobInfoV2 { + type MergeOperand = PerObjectBlobInfoMergeOperand; + type Key = ObjectID; + + fn merge_with( + mut self, + PerObjectBlobInfoMergeOperand { + change_type, + change_info, + }: PerObjectBlobInfoMergeOperand, + ) -> Self { + assert_eq!( + self.blob_id, change_info.blob_id, + "blob ID mismatch in merge operand" + ); + assert_eq!( + self.deletable, change_info.deletable, + "deletable mismatch in merge operand" + ); + assert!( + !self.deleted, + "attempt to update an already deleted blob {}", + self.blob_id + ); + self.event = change_info.status_event; + match change_type { + BlobStatusChangeType::Register => { + panic!( + "cannot register an already registered blob {}", + self.blob_id + ); + } + BlobStatusChangeType::Certify => { + assert!( + self.certified_epoch.is_none(), + "cannot certify an already certified blob {}", + self.blob_id + ); + self.certified_epoch = Some(change_info.epoch); + } + BlobStatusChangeType::Extend => { + assert!( + self.certified_epoch.is_some(), + "cannot extend an uncertified blob {}", + self.blob_id + ); + self.end_epoch_info = EndEpochInfo::Individual(change_info.end_epoch); + } + BlobStatusChangeType::Delete { was_certified } => { + assert_eq!(self.certified_epoch.is_some(), was_certified); + self.deleted = true; + } + } + self + } + + fn merge_new(operand: Self::MergeOperand) -> Option { + // V2 entries are created via direct insert, not merge_new. + tracing::error!( + ?operand, + "PerObjectBlobInfoV2::merge_new should not be called; V2 entries are created via \ + direct insert" + ); + debug_assert!( + false, + "PerObjectBlobInfoV2::merge_new should not be called: {operand:?}" + ); + None + } + } + impl Mergeable for PerObjectBlobInfoV1 { type MergeOperand = PerObjectBlobInfoMergeOperand; type Key = ObjectID; diff --git a/crates/walrus-service/src/node/storage/constants.rs b/crates/walrus-service/src/node/storage/constants.rs index 29dd0656f1..0c6153eab3 100644 --- a/crates/walrus-service/src/node/storage/constants.rs +++ b/crates/walrus-service/src/node/storage/constants.rs @@ -11,6 +11,7 @@ const METADATA_COLUMN_FAMILY_NAME: &str = "metadata"; const EVENT_INDEX_COLUMN_FAMILY_NAME: &str = "latest_handled_event_index"; const EVENT_CURSOR_COLUMN_FAMILY_NAME: &str = "event_cursor"; const EVENT_CURSOR_KEY: [u8; 6] = *b"cursor"; +const STORAGE_POOL_END_EPOCHS_COLUMN_FAMILY_NAME: &str = "storage_pool_end_epochs"; const GARBAGE_COLLECTOR_TABLE_COLUMN_FAMILY_NAME: &str = "garbage_collector_last_completed_epoch"; const GARBAGE_COLLECTOR_LAST_STARTED_EPOCH_KEY: &str = "started"; const GARBAGE_COLLECTOR_LAST_COMPLETED_EPOCH_KEY: &str = "completed"; @@ -62,6 +63,11 @@ pub fn event_cursor_key() -> &'static [u8; 6] { &EVENT_CURSOR_KEY } +/// Returns the name of the storage pool end epochs column family. +pub fn storage_pool_end_epochs_cf_name() -> &'static str { + STORAGE_POOL_END_EPOCHS_COLUMN_FAMILY_NAME +} + /// Returns the name of the garbage collector last completed epoch column family. pub fn garbage_collector_table_cf_name() -> &'static str { GARBAGE_COLLECTOR_TABLE_COLUMN_FAMILY_NAME diff --git a/crates/walrus-simtest/tests/simtest_storage_pool.rs b/crates/walrus-simtest/tests/simtest_storage_pool.rs new file mode 100644 index 0000000000..36aef7ed04 --- /dev/null +++ b/crates/walrus-simtest/tests/simtest_storage_pool.rs @@ -0,0 +1,618 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Contains simtests for storage pool (bucket) operations. + +#![recursion_limit = "256"] + +#[cfg(msim)] +mod tests { + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, + }; + + use sui_types::base_types::ObjectID; + use walrus_core::{ + BlobId, + EncodingType, + encoding::{EncodingFactory as _, Primary, Secondary}, + messages::{BlobPersistenceType, ConfirmationCertificate}, + }; + use walrus_proc_macros::walrus_simtest; + use walrus_sdk::{node_client::WalrusNodeClient, uploader::TailHandling}; + use walrus_service::{ + client::ClientCommunicationConfig, + test_utils::{SimStorageNodeHandle, TestNodesConfig, test_cluster}, + }; + use walrus_simtest::test_utils::simtest_utils::BlobInfoConsistencyCheck; + use walrus_sui::{ + client::{BlobObjectMetadata, BlobPersistence, SuiContractClient}, + types::move_structs::StoragePoolResource, + }; + use walrus_test_utils::WithTempDir; + + /// Helper: store a blob in a storage pool bucket. + /// + /// Encodes the blob, registers it in the bucket, uploads slivers to storage nodes, + /// certifies it on-chain, and returns the blob ID and PooledBlob object ID. + async fn store_blob_in_bucket( + client: &WithTempDir>, + bucket_id: ObjectID, + data: Vec, + deletable: bool, + ) -> (BlobId, ObjectID) { + let sui_client = client.inner.sui_client(); + + // Encode the blob. + let encoding_config = client.as_ref().encoding_config(); + let encoder = encoding_config.get_for_type(EncodingType::RS2); + let (sliver_pairs, metadata) = encoder + .encode_with_metadata(data) + .expect("encoding should succeed"); + + let blob_id: BlobId = *metadata.blob_id(); + let blob_metadata = + BlobObjectMetadata::try_from(&metadata).expect("metadata conversion should succeed"); + + let persistence = if deletable { + BlobPersistence::Deletable + } else { + BlobPersistence::Permanent + }; + + // Register blob in the bucket on-chain. + let blob_obj_id: ObjectID = sui_client + .register_pooled_blob(bucket_id, blob_metadata, persistence) + .await + .expect("register blob in bucket should succeed"); + + tracing::info!(%blob_id, %blob_obj_id, "registered blob in bucket"); + + // Construct the persistence type for slivers upload. + let blob_persistence_type = if deletable { + BlobPersistenceType::Deletable { + object_id: blob_obj_id.into(), + } + } else { + BlobPersistenceType::Permanent + }; + + // Upload slivers to storage nodes and collect the certificate. + let certificate: ConfirmationCertificate = client + .as_ref() + .send_blob_data_and_get_certificate( + &metadata, + Arc::new(sliver_pairs), + &blob_persistence_type, + None, + TailHandling::Blocking, + None, + None, + None, + None, + ) + .await + .expect("upload slivers and get certificate should succeed"); + + tracing::info!(%blob_id, "got confirmation certificate"); + + // Certify the blob on-chain. + sui_client + .certify_pooled_blob(bucket_id, blob_id, &certificate) + .await + .expect("certify blob in bucket should succeed"); + + tracing::info!(%blob_id, %blob_obj_id, "certified blob in bucket"); + + (blob_id, blob_obj_id) + } + + /// Helper: fetch a bucket's end_epoch and log each blob's effective end epoch. + /// + /// Blobs in storage pool inherit their end_epoch from the parent bucket. + async fn print_blob_end_epochs( + sui_client: &SuiContractClient, + label: &str, + blobs: &[(&str, &BlobId, ObjectID)], + ) { + tracing::info!("--- blob end epochs ({label}) ---"); + // Collect unique bucket IDs from the blob entries. + let mut bucket_ids: Vec = blobs.iter().map(|(_, _, bucket)| *bucket).collect(); + bucket_ids.sort(); + bucket_ids.dedup(); + + for bucket_id in &bucket_ids { + let pool: StoragePoolResource = sui_client + .retriable_sui_client() + .get_sui_object(*bucket_id) + .await + .expect("should be able to fetch bucket"); + tracing::info!( + %bucket_id, + end_epoch = pool.end_epoch, + "bucket end_epoch" + ); + } + + for (name, blob_id, bucket_id) in blobs { + let pool: StoragePoolResource = sui_client + .retriable_sui_client() + .get_sui_object(*bucket_id) + .await + .expect("should be able to fetch bucket"); + tracing::info!( + blob = %name, + %blob_id, + end_epoch = pool.end_epoch, + %bucket_id, + "blob end_epoch (from parent bucket)" + ); + } + tracing::info!("--- end blob end epochs ({label}) ---"); + } + + /// Helper: read a blob and verify it matches the expected data. + async fn read_and_verify_blob( + client: &WithTempDir>, + blob_id: &BlobId, + expected_data: &[u8], + ) { + // Read using primary slivers. + let mut read_result = client.as_ref().read_blob::(blob_id).await; + let mut retries = 0; + while read_result.is_err() && retries < 10 { + retries += 1; + tracing::info!( + "read attempt {} failed, retrying: {:?}", + retries, + read_result.unwrap_err() + ); + tokio::time::sleep(Duration::from_secs(1)).await; + read_result = client.as_ref().read_blob::(blob_id).await; + } + let read_data = read_result.expect("should be able to read blob"); + assert_eq!( + read_data, expected_data, + "blob data mismatch on primary read" + ); + + // Read using secondary slivers. + let read_data = client + .as_ref() + .read_blob::(blob_id) + .await + .expect("should be able to read blob via secondary"); + assert_eq!( + read_data, expected_data, + "blob data mismatch on secondary read" + ); + } + + /// Parses a decimal u256 string into a BlobId. + /// + /// In Move, blob IDs are stored as `u256`. BCS encodes `u256` as 32 little-endian bytes, + /// which is the same layout as `BlobId([u8; 32])`. The Sui JSON-RPC represents `u256` + /// values as decimal strings. + fn blob_id_from_u256_decimal(s: &str) -> BlobId { + let mut le_bytes = [0u8; 32]; + for ch in s.bytes() { + assert!(ch >= b'0' && ch <= b'9', "expected decimal digit"); + let digit = ch - b'0'; + let mut carry = u16::from(digit); + for byte in le_bytes.iter_mut() { + let v = u16::from(*byte) * 10 + carry; + *byte = v as u8; + carry = v >> 8; + } + assert_eq!(carry, 0, "u256 overflow"); + } + BlobId::try_from(le_bytes.as_slice()).expect("32 bytes should produce a valid BlobId") + } + + /// Helper: list all blob entries stored in a bucket by paginating its ObjectTable + /// via the Sui `get_dynamic_fields` RPC. + /// + /// Returns `(blob_id, object_id)` pairs, where `blob_id` is the table key (u256) + /// and `object_id` is the PooledBlob's Sui object ID. + async fn list_blob_ids_in_bucket( + sui_client: &SuiContractClient, + bucket_id: ObjectID, + ) -> Vec<(BlobId, ObjectID)> { + let pool: StoragePoolResource = sui_client + .retriable_sui_client() + .get_sui_object(bucket_id) + .await + .expect("should be able to fetch bucket"); + + let table_id = pool.blobs; + let mut all_entries = Vec::new(); + let mut cursor = None; + + loop { + let page = sui_client + .retriable_sui_client() + .get_dynamic_fields(table_id, cursor, None) + .await + .expect("get_dynamic_fields should succeed"); + + for entry in &page.data { + // The key is a u256 (blob ID). DynamicFieldInfo.name.value is a JSON + // representation; parse the decimal string back into a BlobId. + let key_str = entry + .name + .value + .as_str() + .expect("ObjectTable key should be a JSON string"); + let blob_id = blob_id_from_u256_decimal(key_str); + // For ObjectTable, entry.object_id is the PooledBlob's Sui object ID. + all_entries.push((blob_id, entry.object_id)); + } + + if page.has_next_page { + cursor = page.next_cursor; + } else { + break; + } + } + + all_entries + } + + /// End-to-end test for storage pool (bucket) operations: + /// - Create buckets + /// - Register, upload, and certify blobs through the storage pool path + /// - Read blobs back + /// - Extend bucket lifetime + /// - Delete blobs from a bucket + #[ignore = "ignore integration simtests by default"] + #[walrus_simtest] + async fn test_storage_pool_operations() { + let blob_info_consistency_check = BlobInfoConsistencyCheck::new(); + + let (_sui_cluster, _walrus_cluster, client, _, _) = + test_cluster::E2eTestSetupBuilder::new() + .with_epoch_duration(Duration::from_secs(30)) + .with_test_nodes_config( + TestNodesConfig::builder() + .with_node_weights(&[2, 2, 3, 3, 3]) + .build(), + ) + .with_communication_config( + ClientCommunicationConfig::default_for_test_with_reqwest_timeout( + Duration::from_secs(2), + ), + ) + .with_default_num_checkpoints_per_blob() + .build_generic::() + .await + .unwrap(); + + let sui_client = client.inner.sui_client(); + + // --- Step 1: Create bucket 1 --- + tracing::info!("creating storage pool (bucket 1)"); + let storage_amount: u64 = 10 * 1024 * 1024; // 10 MiB + let epochs_ahead: u32 = 5; + let bucket1_id = sui_client + .create_storage_pool(storage_amount, epochs_ahead) + .await + .expect("create storage pool should succeed"); + tracing::info!(%bucket1_id, "created bucket 1"); + + // Print bucket 1 end epoch after creation. + print_blob_end_epochs(sui_client, "after creating bucket 1", &[]).await; + { + let pool: StoragePoolResource = sui_client + .retriable_sui_client() + .get_sui_object(bucket1_id) + .await + .expect("fetch bucket 1"); + tracing::info!( + %bucket1_id, + end_epoch = pool.end_epoch, + "bucket 1 created" + ); + } + + // --- Step 2: Store blob 1 in bucket 1 (deletable) --- + tracing::info!("storing blob 1 in bucket 1 (deletable)"); + let blob1_data = walrus_test_utils::random_data(1024); + let (blob1_id, blob1_obj_id) = + store_blob_in_bucket(&client, bucket1_id, blob1_data.clone(), true).await; + tracing::info!(%blob1_id, %blob1_obj_id, "stored blob 1"); + + // Print end epochs after blob 1. + print_blob_end_epochs( + sui_client, + "after storing blob 1", + &[("blob_1", &blob1_id, bucket1_id)], + ) + .await; + + // Read blob 1 back. + read_and_verify_blob(&client, &blob1_id, &blob1_data).await; + tracing::info!("blob 1 read successfully"); + + // --- Step 3: Store blob 2 in bucket 1 (deletable) --- + tracing::info!("storing blob 2 in bucket 1 (deletable)"); + let blob2_data = walrus_test_utils::random_data(2048); + let (blob2_id, _blob2_obj_id) = + store_blob_in_bucket(&client, bucket1_id, blob2_data.clone(), true).await; + tracing::info!(%blob2_id, "stored blob 2"); + + // Print end epochs after blob 2. + print_blob_end_epochs( + sui_client, + "after storing blob 2", + &[ + ("blob_1", &blob1_id, bucket1_id), + ("blob_2", &blob2_id, bucket1_id), + ], + ) + .await; + + // Read blob 2 back. + read_and_verify_blob(&client, &blob2_id, &blob2_data).await; + tracing::info!("blob 2 read successfully"); + + // --- Step 4: Create bucket 2 --- + tracing::info!("creating storage pool (bucket 2)"); + let bucket2_id = sui_client + .create_storage_pool(storage_amount, epochs_ahead) + .await + .expect("create storage pool 2 should succeed"); + tracing::info!(%bucket2_id, "created bucket 2"); + + { + let pool: StoragePoolResource = sui_client + .retriable_sui_client() + .get_sui_object(bucket2_id) + .await + .expect("fetch bucket 2"); + tracing::info!( + %bucket2_id, + end_epoch = pool.end_epoch, + "bucket 2 created" + ); + } + + // --- Step 5: Store blob 3 in bucket 2 (deletable) --- + tracing::info!("storing blob 3 in bucket 2 (deletable)"); + let blob3_data = walrus_test_utils::random_data(512); + let (blob3_id, _blob3_obj_id) = + store_blob_in_bucket(&client, bucket2_id, blob3_data.clone(), true).await; + tracing::info!(%blob3_id, "stored blob 3"); + + // Read blob 3 back. + read_and_verify_blob(&client, &blob3_id, &blob3_data).await; + tracing::info!("blob 3 read successfully"); + + // --- Step 6: Store blob 4 in bucket 2 (permanent) --- + tracing::info!("storing blob 4 in bucket 2 (permanent)"); + let blob4_data = walrus_test_utils::random_data(768); + let (blob4_id, _blob4_obj_id) = + store_blob_in_bucket(&client, bucket2_id, blob4_data.clone(), false).await; + tracing::info!(%blob4_id, "stored permanent blob 4"); + + // Print end epochs after all blobs stored. + print_blob_end_epochs( + sui_client, + "after storing all blobs", + &[ + ("blob_1", &blob1_id, bucket1_id), + ("blob_2", &blob2_id, bucket1_id), + ("blob_3", &blob3_id, bucket2_id), + ("blob_4", &blob4_id, bucket2_id), + ], + ) + .await; + + // Read blob 4 back. + read_and_verify_blob(&client, &blob4_id, &blob4_data).await; + tracing::info!("permanent blob 4 read successfully"); + + // --- Step 7: Extend bucket 1 by 2 epochs --- + tracing::info!("extending bucket 1 by 2 epochs"); + sui_client + .extend_storage_pool(bucket1_id, 2, storage_amount) + .await + .expect("extend storage pool should succeed"); + tracing::info!("bucket 1 extended successfully"); + + // Print end epochs after extending bucket 1. + print_blob_end_epochs( + sui_client, + "after extending bucket 1", + &[ + ("blob_1", &blob1_id, bucket1_id), + ("blob_2", &blob2_id, bucket1_id), + ("blob_3", &blob3_id, bucket2_id), + ("blob_4", &blob4_id, bucket2_id), + ], + ) + .await; + + // --- Step 8: Verify all 4 blobs still readable --- + tracing::info!("re-reading all blobs after extend"); + read_and_verify_blob(&client, &blob1_id, &blob1_data).await; + read_and_verify_blob(&client, &blob2_id, &blob2_data).await; + read_and_verify_blob(&client, &blob3_id, &blob3_data).await; + read_and_verify_blob(&client, &blob4_id, &blob4_data).await; + tracing::info!("all blobs still readable after extend"); + + // --- Step 9: Delete blob 1 from bucket 1 --- + tracing::info!("deleting blob 1 from bucket 1"); + sui_client + .delete_pooled_blob(bucket1_id, blob1_id) + .await + .expect("delete blob from bucket should succeed"); + tracing::info!("blob 1 deleted from bucket 1"); + + // Print end epochs after deleting blob 1. + print_blob_end_epochs( + sui_client, + "after deleting blob 1", + &[ + ("blob_2", &blob2_id, bucket1_id), + ("blob_3", &blob3_id, bucket2_id), + ("blob_4", &blob4_id, bucket2_id), + ], + ) + .await; + + // --- Step 10: Verify blobs 2, 3, 4 still readable --- + read_and_verify_blob(&client, &blob2_id, &blob2_data).await; + read_and_verify_blob(&client, &blob3_id, &blob3_data).await; + read_and_verify_blob(&client, &blob4_id, &blob4_data).await; + tracing::info!("remaining blobs still readable after deletion"); + + // Wait for event processing to settle. + tokio::time::sleep(Duration::from_secs(5)).await; + + blob_info_consistency_check.check_storage_node_consistency(); + } + + /// Test listing blobs in a bucket via the Sui `get_dynamic_fields` RPC. + /// + /// Creates a bucket, stores several blobs, lists the ObjectTable keys, verifies the + /// returned set matches expectations, then deletes a blob and verifies the listing + /// updates accordingly. + #[ignore = "ignore integration simtests by default"] + #[walrus_simtest] + async fn test_storage_pool_list_blobs() { + let blob_info_consistency_check = BlobInfoConsistencyCheck::new(); + + let (_sui_cluster, _walrus_cluster, client, _, _) = + test_cluster::E2eTestSetupBuilder::new() + .with_epoch_duration(Duration::from_secs(30)) + .with_test_nodes_config( + TestNodesConfig::builder() + .with_node_weights(&[2, 2, 3, 3, 3]) + .build(), + ) + .with_communication_config( + ClientCommunicationConfig::default_for_test_with_reqwest_timeout( + Duration::from_secs(2), + ), + ) + .with_default_num_checkpoints_per_blob() + .build_generic::() + .await + .unwrap(); + + let sui_client = client.inner.sui_client(); + + // Create a bucket. + let storage_amount: u64 = 10 * 1024 * 1024; + let epochs_ahead: u32 = 5; + let bucket_id = sui_client + .create_storage_pool(storage_amount, epochs_ahead) + .await + .expect("create bucket should succeed"); + tracing::info!(%bucket_id, "created bucket"); + + // Empty bucket should have no blobs. + let listed = list_blob_ids_in_bucket(sui_client, bucket_id).await; + assert!(listed.is_empty(), "new bucket should have no blobs"); + tracing::info!("empty bucket listing verified"); + + // Store 3 deletable blobs. + let blob1_data = walrus_test_utils::random_data(512); + let (blob1_id, blob1_obj_id) = + store_blob_in_bucket(&client, bucket_id, blob1_data.clone(), true).await; + tracing::info!(%blob1_id, %blob1_obj_id, "stored blob 1"); + + let blob2_data = walrus_test_utils::random_data(1024); + let (blob2_id, blob2_obj_id) = + store_blob_in_bucket(&client, bucket_id, blob2_data.clone(), true).await; + tracing::info!(%blob2_id, %blob2_obj_id, "stored blob 2"); + + let blob3_data = walrus_test_utils::random_data(768); + let (blob3_id, blob3_obj_id) = + store_blob_in_bucket(&client, bucket_id, blob3_data.clone(), true).await; + tracing::info!(%blob3_id, %blob3_obj_id, "stored blob 3"); + + // List blobs — should contain exactly the 3 blob IDs with correct object IDs. + let listed = list_blob_ids_in_bucket(sui_client, bucket_id).await; + let listed_map: HashMap = listed.into_iter().collect(); + let listed_set: HashSet = listed_map.keys().copied().collect(); + let expected_set: HashSet = [blob1_id, blob2_id, blob3_id].into_iter().collect(); + tracing::info!(?listed_set, ?expected_set, "bucket listing after 3 stores"); + assert_eq!( + listed_set, expected_set, + "bucket should contain exactly the 3 stored blobs" + ); + // Verify that the object IDs in the ObjectTable match those returned at registration. + assert_eq!( + listed_map[&blob1_id], blob1_obj_id, + "blob 1 object ID mismatch" + ); + assert_eq!( + listed_map[&blob2_id], blob2_obj_id, + "blob 2 object ID mismatch" + ); + assert_eq!( + listed_map[&blob3_id], blob3_obj_id, + "blob 3 object ID mismatch" + ); + + // Verify blob_count on the pool object matches. + let pool: StoragePoolResource = sui_client + .retriable_sui_client() + .get_sui_object(bucket_id) + .await + .expect("fetch bucket"); + assert_eq!(pool.blob_count, 3, "blob_count should be 3"); + + // Delete blob 2. + tracing::info!("deleting blob 2 from bucket"); + sui_client + .delete_pooled_blob(bucket_id, blob2_id) + .await + .expect("delete blob 2 should succeed"); + + // List blobs — should now contain only blob 1 and blob 3. + let listed = list_blob_ids_in_bucket(sui_client, bucket_id).await; + let listed_map: HashMap = listed.into_iter().collect(); + let listed_set: HashSet = listed_map.keys().copied().collect(); + let expected_after_delete: HashSet = [blob1_id, blob3_id].into_iter().collect(); + tracing::info!( + ?listed_set, + ?expected_after_delete, + "bucket listing after deleting blob 2" + ); + assert_eq!( + listed_set, expected_after_delete, + "bucket should contain only blobs 1 and 3 after deletion" + ); + // Verify object IDs still match after deletion of blob 2. + assert_eq!( + listed_map[&blob1_id], blob1_obj_id, + "blob 1 object ID mismatch after delete" + ); + assert_eq!( + listed_map[&blob3_id], blob3_obj_id, + "blob 3 object ID mismatch after delete" + ); + + // Verify blob_count updated. + let pool: StoragePoolResource = sui_client + .retriable_sui_client() + .get_sui_object(bucket_id) + .await + .expect("fetch bucket"); + assert_eq!(pool.blob_count, 2, "blob_count should be 2 after deletion"); + + // Verify remaining blobs are still readable. + read_and_verify_blob(&client, &blob1_id, &blob1_data).await; + read_and_verify_blob(&client, &blob3_id, &blob3_data).await; + tracing::info!("remaining blobs still readable"); + + // Wait for event processing to settle. + tokio::time::sleep(Duration::from_secs(5)).await; + + blob_info_consistency_check.check_storage_node_consistency(); + } +} diff --git a/crates/walrus-sui/src/client.rs b/crates/walrus-sui/src/client.rs index 504ee074a8..359be7daaa 100644 --- a/crates/walrus-sui/src/client.rs +++ b/crates/walrus-sui/src/client.rs @@ -1200,6 +1200,93 @@ impl SuiContractClient { .await } + // === Storage Pool (bucket) operations === + + /// Creates a new storage pool (bucket). + pub async fn create_storage_pool( + &self, + storage_amount: u64, + epochs_ahead: EpochCount, + ) -> SuiClientResult { + self.retry_on_wrong_version(|| async { + self.inner + .lock() + .await + .create_storage_pool(storage_amount, epochs_ahead) + .await + }) + .await + } + + /// Registers a blob against a storage pool. + /// + /// Returns the object ID of the created `PooledBlob` object. + pub async fn register_pooled_blob( + &self, + storage_pool_id: ObjectID, + blob_metadata: BlobObjectMetadata, + persistence: BlobPersistence, + ) -> SuiClientResult { + self.retry_on_wrong_version(|| async { + self.inner + .lock() + .await + .register_pooled_blob(storage_pool_id, blob_metadata.clone(), persistence) + .await + }) + .await + } + + /// Certifies a blob in a storage pool. + pub async fn certify_pooled_blob( + &self, + storage_pool_id: ObjectID, + blob_id: BlobId, + certificate: &ConfirmationCertificate, + ) -> SuiClientResult<()> { + self.retry_on_wrong_version(|| async { + self.inner + .lock() + .await + .certify_pooled_blob(storage_pool_id, blob_id, certificate) + .await + }) + .await + } + + /// Deletes a blob from a storage pool. + pub async fn delete_pooled_blob( + &self, + storage_pool_id: ObjectID, + blob_id: BlobId, + ) -> SuiClientResult<()> { + self.retry_on_wrong_version(|| async { + self.inner + .lock() + .await + .delete_pooled_blob(storage_pool_id, blob_id) + .await + }) + .await + } + + /// Extends the lifetime of a storage pool. + pub async fn extend_storage_pool( + &self, + storage_pool_id: ObjectID, + extended_epochs: EpochCount, + storage_size: u64, + ) -> SuiClientResult<()> { + self.retry_on_wrong_version(|| async { + self.inner + .lock() + .await + .extend_storage_pool(storage_pool_id, extended_epochs, storage_size) + .await + }) + .await + } + /// Updates the parameters for a storage node. pub async fn update_node_params( &self, @@ -2728,6 +2815,119 @@ impl SuiContractClientInner { Ok(()) } + // === Storage Pool (bucket) operations === + + /// Creates a new storage pool (bucket). + pub async fn create_storage_pool( + &mut self, + storage_amount: u64, + epochs_ahead: EpochCount, + ) -> SuiClientResult { + let mut pt_builder = self.transaction_builder(); + pt_builder + .create_storage_pool(storage_amount, epochs_ahead) + .await?; + let response = self + .sign_and_send_transaction( + pt_builder.build_transaction_data(self.gas_budget).await?, + "create_storage_pool", + ) + .await?; + + // Extract the created object ID from the response. + let created_objects = response + .effects + .as_ref() + .ok_or_else(|| anyhow!("no transaction effects"))? + .created() + .iter() + .map(|o| o.object_id()) + .collect::>(); + ensure!(!created_objects.is_empty(), "no objects created"); + Ok(created_objects[0]) + } + + /// Registers a blob against a storage pool. + /// + /// Returns the object ID of the created `PooledBlob` object. + pub async fn register_pooled_blob( + &mut self, + storage_pool_id: ObjectID, + blob_metadata: BlobObjectMetadata, + persistence: BlobPersistence, + ) -> SuiClientResult { + let mut pt_builder = self.transaction_builder(); + pt_builder + .register_pooled_blob(storage_pool_id.into(), blob_metadata, persistence) + .await?; + let transaction = pt_builder.build_transaction_data(self.gas_budget).await?; + let response = self + .sign_and_send_transaction(transaction, "register_pooled_blob") + .await?; + + let blob_obj_ids = get_created_sui_object_ids_by_type( + &response, + &contracts::storage_pool::PooledBlob + .to_move_struct_tag_with_type_map(&self.read_client.type_origin_map(), &[])?, + )?; + ensure!( + blob_obj_ids.len() == 1, + "expected 1 PooledBlob object, got {}", + blob_obj_ids.len() + ); + Ok(blob_obj_ids[0]) + } + + /// Certifies a blob in a storage pool. + pub async fn certify_pooled_blob( + &mut self, + storage_pool_id: ObjectID, + blob_id: BlobId, + certificate: &ConfirmationCertificate, + ) -> SuiClientResult<()> { + let mut pt_builder = self.transaction_builder(); + pt_builder + .certify_pooled_blob(storage_pool_id.into(), blob_id, certificate) + .await?; + let transaction = pt_builder.build_transaction_data(self.gas_budget).await?; + self.sign_and_send_transaction(transaction, "certify_pooled_blob") + .await?; + Ok(()) + } + + /// Deletes a blob from a storage pool. + pub async fn delete_pooled_blob( + &mut self, + storage_pool_id: ObjectID, + blob_id: BlobId, + ) -> SuiClientResult<()> { + let mut pt_builder = self.transaction_builder(); + pt_builder + .delete_pooled_blob(storage_pool_id.into(), blob_id) + .await?; + let transaction = pt_builder.build_transaction_data(self.gas_budget).await?; + self.sign_and_send_transaction(transaction, "delete_pooled_blob") + .await?; + Ok(()) + } + + /// Extends the lifetime of a storage pool. + pub async fn extend_storage_pool( + &mut self, + storage_pool_id: ObjectID, + extended_epochs: EpochCount, + storage_size: u64, + ) -> SuiClientResult<()> { + let mut pt_builder = self.transaction_builder(); + pt_builder + .extend_storage_pool(storage_pool_id.into(), extended_epochs, storage_size) + .await?; + let transaction = pt_builder.build_transaction_data(self.gas_budget).await?; + self.sign_and_send_transaction(transaction, "extend_storage_pool") + .await?; + Ok(()) + } + /// Updates the parameters for a storage node. pub async fn update_node_params( &mut self, diff --git a/crates/walrus-sui/src/client/transaction_builder.rs b/crates/walrus-sui/src/client/transaction_builder.rs index 6633524106..f40e5d4e14 100644 --- a/crates/walrus-sui/src/client/transaction_builder.rs +++ b/crates/walrus-sui/src/client/transaction_builder.rs @@ -32,6 +32,7 @@ use sui_types::{ }; use tracing::Level; use walrus_core::{ + BlobId, Epoch, EpochCount, NetworkPublicKey, @@ -752,6 +753,121 @@ impl WalrusPtbBuilder { Ok(()) } + // === Storage Pool (bucket) operations === + + /// Adds a call to `create_storage_pool` to the `pt_builder` and returns the result + /// [`Argument`] representing the new `StoragePool` object. + pub async fn create_storage_pool( + &mut self, + storage_amount: u64, + epochs_ahead: EpochCount, + ) -> SuiClientResult { + let price = self + .storage_price_for_encoded_length(storage_amount, epochs_ahead, false) + .await?; + self.fill_wal_balance(price).await?; + + let args = vec![ + self.system_arg(SharedObjectMutability::Mutable)?, + self.pt_builder.pure(storage_amount)?, + self.pt_builder.pure(epochs_ahead)?, + self.wal_coin_arg()?, + ]; + let result_arg = self.walrus_move_call(contracts::system::create_storage_pool, args)?; + self.reduce_wal_balance(price)?; + self.add_result_to_be_consumed(result_arg); + Ok(result_arg) + } + + /// Adds a call to `register_pooled_blob` to the `pt_builder`. + pub async fn register_pooled_blob( + &mut self, + storage_pool: ArgumentOrOwnedObject, + blob_metadata: BlobObjectMetadata, + persistence: BlobPersistence, + ) -> SuiClientResult<()> { + let price = self + .write_price_for_encoded_length(blob_metadata.encoded_size, false) + .await?; + self.fill_wal_balance(price).await?; + + let pool_arg = self.argument_from_arg_or_obj(storage_pool).await?; + + let args = vec![ + self.system_arg(SharedObjectMutability::Mutable)?, + pool_arg, + self.pt_builder.pure(blob_metadata.blob_id)?, + self.pt_builder.pure(blob_metadata.root_hash.bytes())?, + self.pt_builder.pure(blob_metadata.unencoded_size)?, + self.pt_builder + .pure(u8::from(blob_metadata.encoding_type))?, + self.pt_builder.pure(persistence.is_deletable())?, + self.wal_coin_arg()?, + ]; + self.walrus_move_call(contracts::system::register_pooled_blob, args)?; + self.reduce_wal_balance(price)?; + Ok(()) + } + + /// Adds a call to `certify_pooled_blob` to the `pt_builder`. + pub async fn certify_pooled_blob( + &mut self, + storage_pool: ArgumentOrOwnedObject, + blob_id: BlobId, + certificate: &ConfirmationCertificate, + ) -> SuiClientResult<()> { + let signers = self.signers_to_bitmap(&certificate.signers).await?; + + let args = vec![ + self.system_arg(SharedObjectMutability::Immutable)?, + self.argument_from_arg_or_obj(storage_pool).await?, + self.pt_builder.pure(blob_id)?, + self.pt_builder.pure(certificate.signature.as_bytes())?, + self.pt_builder.pure(&signers)?, + self.pt_builder.pure(&certificate.serialized_message)?, + ]; + self.walrus_move_call(contracts::system::certify_pooled_blob, args)?; + Ok(()) + } + + /// Adds a call to `delete_pooled_blob` to the `pt_builder`. + pub async fn delete_pooled_blob( + &mut self, + storage_pool: ArgumentOrOwnedObject, + blob_id: BlobId, + ) -> SuiClientResult<()> { + let args = vec![ + self.system_arg(SharedObjectMutability::Immutable)?, + self.argument_from_arg_or_obj(storage_pool).await?, + self.pt_builder.pure(blob_id)?, + ]; + self.walrus_move_call(contracts::system::delete_pooled_blob, args)?; + Ok(()) + } + + /// Adds a call to `extend_storage_pool` to the `pt_builder`. + pub async fn extend_storage_pool( + &mut self, + storage_pool: ArgumentOrOwnedObject, + extended_epochs: EpochCount, + storage_size: u64, + ) -> SuiClientResult<()> { + let price = self + .storage_price_for_encoded_length(storage_size, extended_epochs, false) + .await?; + self.fill_wal_balance(price).await?; + + let args = vec![ + self.system_arg(SharedObjectMutability::Mutable)?, + self.argument_from_arg_or_obj(storage_pool).await?, + self.pt_builder.pure(extended_epochs)?, + self.wal_coin_arg()?, + ]; + self.walrus_move_call(contracts::system::extend_storage_pool, args)?; + self.reduce_wal_balance(price)?; + Ok(()) + } + /// Adds a transfer to the PTB. If the recipient is `None`, the sender address is used. #[tracing::instrument(level = Level::DEBUG, skip_all)] pub async fn transfer>( diff --git a/crates/walrus-sui/src/types/events.rs b/crates/walrus-sui/src/types/events.rs index e2d6a38c25..61c8a25055 100644 --- a/crates/walrus-sui/src/types/events.rs +++ b/crates/walrus-sui/src/types/events.rs @@ -233,6 +233,12 @@ pub enum BlobEvent { InvalidBlobID(InvalidBlobId), /// A deny list blob deleted event. DenyListBlobDeleted(DenyListBlobDeleted), + /// A pooled blob registration event. + PooledBlobRegistered(PooledBlobRegistered), + /// A pooled blob certification event. + PooledBlobCertified(PooledBlobCertified), + /// A pooled blob deletion event. + PooledBlobDeleted(PooledBlobDeleted), } impl From for BlobEvent { @@ -304,6 +310,9 @@ impl BlobEvent { BlobEvent::Deleted(event) => event.blob_id, BlobEvent::InvalidBlobID(event) => event.blob_id, BlobEvent::DenyListBlobDeleted(event) => event.blob_id, + BlobEvent::PooledBlobRegistered(event) => event.blob_id, + BlobEvent::PooledBlobCertified(event) => event.blob_id, + BlobEvent::PooledBlobDeleted(event) => event.blob_id, } } @@ -315,6 +324,9 @@ impl BlobEvent { BlobEvent::Deleted(event) => Some(event.object_id), BlobEvent::InvalidBlobID(_) => None, BlobEvent::DenyListBlobDeleted(_) => None, + BlobEvent::PooledBlobRegistered(event) => Some(event.object_id), + BlobEvent::PooledBlobCertified(event) => Some(event.object_id), + BlobEvent::PooledBlobDeleted(event) => Some(event.object_id), } } @@ -326,6 +338,9 @@ impl BlobEvent { BlobEvent::Deleted(event) => event.event_id, BlobEvent::InvalidBlobID(event) => event.event_id, BlobEvent::DenyListBlobDeleted(event) => event.event_id, + BlobEvent::PooledBlobRegistered(event) => event.event_id, + BlobEvent::PooledBlobCertified(event) => event.event_id, + BlobEvent::PooledBlobDeleted(event) => event.event_id, } } @@ -346,6 +361,9 @@ impl BlobEvent { BlobEvent::Deleted(event) => Some(event.epoch), BlobEvent::InvalidBlobID(event) => Some(event.epoch), BlobEvent::DenyListBlobDeleted(event) => Some(event.epoch), + BlobEvent::PooledBlobRegistered(event) => Some(event.epoch), + BlobEvent::PooledBlobCertified(event) => Some(event.epoch), + BlobEvent::PooledBlobDeleted(event) => Some(event.epoch), } } @@ -357,6 +375,9 @@ impl BlobEvent { BlobEvent::Deleted(_) => "BlobDeleted", BlobEvent::InvalidBlobID(_) => "InvalidBlobID", BlobEvent::DenyListBlobDeleted(_) => "DenyListBlobDeleted", + BlobEvent::PooledBlobRegistered(_) => "PooledBlobRegistered", + BlobEvent::PooledBlobCertified(_) => "PooledBlobCertified", + BlobEvent::PooledBlobDeleted(_) => "PooledBlobDeleted", } } } @@ -931,12 +952,6 @@ impl TryFrom for StoragePoolExtendedEvent { pub enum StoragePoolEvent { /// A storage pool was created. StoragePoolCreated(StoragePoolCreatedEvent), - /// A blob was registered against a storage pool. - PooledBlobRegistered(PooledBlobRegistered), - /// A blob in a storage pool was certified. - PooledBlobCertified(PooledBlobCertified), - /// A blob was deleted from a storage pool. - PooledBlobDeleted(PooledBlobDeleted), /// A storage pool's lifetime was extended. StoragePoolExtended(StoragePoolExtendedEvent), } @@ -946,9 +961,6 @@ impl StoragePoolEvent { pub fn event_id(&self) -> EventID { match self { StoragePoolEvent::StoragePoolCreated(event) => event.event_id, - StoragePoolEvent::PooledBlobRegistered(event) => event.event_id, - StoragePoolEvent::PooledBlobCertified(event) => event.event_id, - StoragePoolEvent::PooledBlobDeleted(event) => event.event_id, StoragePoolEvent::StoragePoolExtended(event) => event.event_id, } } @@ -957,9 +969,6 @@ impl StoragePoolEvent { pub fn event_epoch(&self) -> Option { match self { StoragePoolEvent::StoragePoolCreated(event) => Some(event.epoch), - StoragePoolEvent::PooledBlobRegistered(event) => Some(event.epoch), - StoragePoolEvent::PooledBlobCertified(event) => Some(event.epoch), - StoragePoolEvent::PooledBlobDeleted(event) => Some(event.epoch), StoragePoolEvent::StoragePoolExtended(event) => Some(event.epoch), } } @@ -968,31 +977,14 @@ impl StoragePoolEvent { pub fn name(&self) -> &'static str { match self { StoragePoolEvent::StoragePoolCreated(_) => "StoragePoolCreated", - StoragePoolEvent::PooledBlobRegistered(_) => "PooledBlobRegistered", - StoragePoolEvent::PooledBlobCertified(_) => "PooledBlobCertified", - StoragePoolEvent::PooledBlobDeleted(_) => "PooledBlobDeleted", StoragePoolEvent::StoragePoolExtended(_) => "StoragePoolExtended", } } - /// Returns the blob ID if the event is a blob event. - pub fn blob_id(&self) -> Option { - match self { - StoragePoolEvent::StoragePoolCreated(_) => None, - StoragePoolEvent::PooledBlobRegistered(event) => Some(event.blob_id), - StoragePoolEvent::PooledBlobCertified(event) => Some(event.blob_id), - StoragePoolEvent::PooledBlobDeleted(event) => Some(event.blob_id), - StoragePoolEvent::StoragePoolExtended(_) => None, - } - } - /// Returns the storage pool ID. pub fn storage_pool_id(&self) -> ObjectID { match self { StoragePoolEvent::StoragePoolCreated(event) => event.storage_pool_id, - StoragePoolEvent::PooledBlobRegistered(event) => event.storage_pool_id, - StoragePoolEvent::PooledBlobCertified(event) => event.storage_pool_id, - StoragePoolEvent::PooledBlobDeleted(event) => event.storage_pool_id, StoragePoolEvent::StoragePoolExtended(event) => event.storage_pool_id, } } @@ -1235,7 +1227,7 @@ impl ContractEvent { ContractEvent::PackageEvent(_) => None, ContractEvent::DenyListEvent(_) => None, ContractEvent::ProtocolEvent(_) => None, - ContractEvent::StoragePoolEvent(event) => event.blob_id(), + ContractEvent::StoragePoolEvent(_) => None, } } @@ -1315,14 +1307,14 @@ impl TryFrom for ContractEvent { contracts::events::StoragePoolCreated => Ok(ContractEvent::StoragePoolEvent( StoragePoolEvent::StoragePoolCreated(value.try_into()?), )), - contracts::events::PooledBlobRegistered => Ok(ContractEvent::StoragePoolEvent( - StoragePoolEvent::PooledBlobRegistered(value.try_into()?), + contracts::events::PooledBlobRegistered => Ok(ContractEvent::BlobEvent( + BlobEvent::PooledBlobRegistered(value.try_into()?), )), - contracts::events::PooledBlobCertified => Ok(ContractEvent::StoragePoolEvent( - StoragePoolEvent::PooledBlobCertified(value.try_into()?), + contracts::events::PooledBlobCertified => Ok(ContractEvent::BlobEvent( + BlobEvent::PooledBlobCertified(value.try_into()?), )), - contracts::events::PooledBlobDeleted => Ok(ContractEvent::StoragePoolEvent( - StoragePoolEvent::PooledBlobDeleted(value.try_into()?), + contracts::events::PooledBlobDeleted => Ok(ContractEvent::BlobEvent( + BlobEvent::PooledBlobDeleted(value.try_into()?), )), contracts::events::StoragePoolExtended => Ok(ContractEvent::StoragePoolEvent( StoragePoolEvent::StoragePoolExtended(value.try_into()?),