From f3b70fa58770109732325467a9e55cef43f56a22 Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Tue, 10 Mar 2026 23:30:44 -0700 Subject: [PATCH] Introduce blob info V2 --- .../src/node/storage/blob_info.rs | 2532 +++++++++++++---- 1 file changed, 2004 insertions(+), 528 deletions(-) diff --git a/crates/walrus-service/src/node/storage/blob_info.rs b/crates/walrus-service/src/node/storage/blob_info.rs index 64348dc80d..da79fd38ba 100644 --- a/crates/walrus-service/src/node/storage/blob_info.rs +++ b/crates/walrus-service/src/node/storage/blob_info.rs @@ -597,10 +597,13 @@ impl BlobInfoTable { .safe_iter_with_snapshot(&snapshot) .context("failed to create per-object blob info snapshot iterator")? { - let Ok((object_id, PerObjectBlobInfo::V1(per_object_blob_info))) = result else { - return Err(anyhow::anyhow!( - "error encountered while iterating over per-object blob info: {result:?}" - )); + let (object_id, per_object_blob_info) = match result { + Ok(v) => v, + Err(e) => { + return Err(anyhow::anyhow!( + "error encountered while iterating over per-object blob info: {e:?}" + )); + } }; let blob_id = per_object_blob_info.blob_id(); per_object_table_blob_ids.insert(blob_id); @@ -613,44 +616,77 @@ impl BlobInfoTable { per-object blob info entry exists (object ID: {object_id})" )); }; - let BlobInfo::V1(BlobInfoV1::Valid(ValidBlobInfoV1 { + + // Extract deletable counts for cross-checking (works for both V1 and V2). + let ( + v1_blob, count_deletable_total, count_deletable_certified, - latest_seen_deletable_registered_end_epoch, - latest_seen_deletable_certified_end_epoch, - .. - })) = blob_info - else { - continue; + latest_reg, + latest_cert, + ) = match &blob_info { + BlobInfo::V1(BlobInfoV1::Valid(v)) => ( + true, + v.count_deletable_total, + v.count_deletable_certified, + v.latest_seen_deletable_registered_end_epoch, + v.latest_seen_deletable_certified_end_epoch, + ), + BlobInfo::V2(BlobInfoV2::Valid(v)) => { + anyhow::ensure!( + !matches!(blob_info, BlobInfo::V1(_)), + "V2 per-object blob info requires V2 aggregate blob info, but found V1" + ); + ( + false, + v.count_deletable_total, + v.count_deletable_certified, + // V2 doesn't track end epochs; skip epoch-based assertions. + None, + None, + ) + } + _ => continue, }; - if per_object_blob_info.is_deletable() { - let per_object_end_epoch = per_object_blob_info.end_epoch; + + // Below checks the invariants of last seen epochs on deletable blobs, which is only + // relevant for V1 per-object entries. + if v1_blob && per_object_blob_info.is_deletable() { + let per_object_end_epoch = match &per_object_blob_info { + PerObjectBlobInfo::V1(v) => v.end_epoch, + PerObjectBlobInfo::V2(v) => match v.end_epoch_info { + per_object_blob_info::EndEpochInfo::Individual(e) => e, + per_object_blob_info::EndEpochInfo::StoragePool(_) => { + unreachable!("storage_pool_id() returned None above") + } + }, + }; anyhow::ensure!( count_deletable_total > 0, "count_deletable_total is 0 for blob ID {blob_id}, even though a deletable \ blob object exists (object ID: {object_id})" ); anyhow::ensure!( - latest_seen_deletable_registered_end_epoch - .is_some_and(|e| e >= per_object_end_epoch), + latest_reg.is_some_and(|e| e >= per_object_end_epoch), "latest_seen_deletable_registered_end_epoch for blob ID {blob_id} is \ - {latest_seen_deletable_registered_end_epoch:?}, which is inconsistent with the \ - end epoch of a deletable blob object: {per_object_end_epoch} (object ID: \ - {object_id})" + {latest_reg:?}, which is inconsistent with the end epoch of a deletable blob \ + object: {per_object_end_epoch} (object ID: {object_id})" ); - if per_object_blob_info.certified_epoch.is_some() { + let certified_epoch = match &per_object_blob_info { + PerObjectBlobInfo::V1(v) => v.certified_epoch, + PerObjectBlobInfo::V2(v) => v.certified_epoch, + }; + if certified_epoch.is_some() { anyhow::ensure!( count_deletable_certified > 0, "count_deletable_certified is 0 for blob ID {blob_id}, even though a \ deletable certified blob object exists (object ID: {object_id})" ); anyhow::ensure!( - latest_seen_deletable_certified_end_epoch - .is_some_and(|e| e >= per_object_end_epoch), + latest_cert.is_some_and(|e| e >= per_object_end_epoch), "latest_seen_deletable_certified_end_epoch for blob ID {blob_id} is \ - {latest_seen_deletable_certified_end_epoch:?}, which is inconsistent with \ - the end epoch of a deletable certified blob object: {per_object_end_epoch} \ - (object ID: {object_id})" + {latest_cert:?}, which is inconsistent with the end epoch of a deletable \ + certified blob object: {per_object_end_epoch} (object ID: {object_id})" ); } } @@ -666,18 +702,33 @@ impl BlobInfoTable { "error encountered while iterating over aggregate blob info: {result:?}" )); }; - let BlobInfo::V1(BlobInfoV1::Valid(blob_info)) = blob_info else { - continue; - }; - if !blob_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) { - return Err(anyhow::anyhow!( - "per-object blob info not found for blob ID {blob_id}, even though a \ - valid aggregate blob info entry referencing objects exists: {blob_info:?}" - )); + match &blob_info { + BlobInfo::V1(BlobInfoV1::Valid(v1_info)) => { + if !v1_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) { + return Err(anyhow::anyhow!( + "per-object blob info not found for blob ID {blob_id}, even though a \ + valid V1 aggregate blob info entry referencing objects exists: \ + {v1_info:?}" + )); + } + v1_info.check_invariants().context(format!( + "aggregate blob info invariants violated for blob ID {blob_id}" + ))?; + } + BlobInfo::V2(BlobInfoV2::Valid(v2_info)) => { + if !v2_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) { + return Err(anyhow::anyhow!( + "per-object blob info not found for blob ID {blob_id}, even though a \ + valid V2 aggregate blob info entry referencing objects exists: \ + {v2_info:?}" + )); + } + v2_info.check_invariants().context(format!( + "aggregate blob info V2 invariants violated for blob ID {blob_id}" + ))?; + } + _ => continue, } - blob_info.check_invariants().context(format!( - "aggregate blob info invariants violated for blob ID {blob_id}" - ))?; } Ok(()) } @@ -892,6 +943,17 @@ pub(super) enum BlobStatusChangeType { Delete { was_certified: bool }, } +/// Change info for storage pool blob events. +/// +/// Unlike `BlobStatusChangeInfo`, this does not carry `end_epoch` because the lifetime of pool +/// blobs is determined by the pool's end_epoch (tracked in `storage_pool_end_epochs`), not by +/// the individual blob's end_epoch at registration time. +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] +pub(super) struct PooledBlobChangeInfo { + pub(super) epoch: Epoch, + pub(super) storage_pool_id: ObjectID, +} + trait ChangeTypeAndInfo { fn change_type(&self) -> BlobStatusChangeType; fn change_info(&self) -> BlobStatusChangeInfo; @@ -970,6 +1032,16 @@ pub(super) enum BlobInfoMergeOperand { PermanentExpired { was_certified: bool, }, + /// A status change for a blob in a storage pool. + PooledBlobChangeStatus { + change_type: BlobStatusChangeType, + change_info: PooledBlobChangeInfo, + }, + /// A blob in a storage pool has expired (pool lifetime ended). + PoolExpired { + storage_pool_id: ObjectID, + was_certified: bool, + }, } impl ToBytes for BlobInfoMergeOperand {} @@ -1680,6 +1752,11 @@ impl Mergeable for BlobInfoV1 { Self::Valid(valid_blob_info), BlobInfoMergeOperand::PermanentExpired { was_certified }, ) => valid_blob_info.permanent_expired(was_certified), + // Pool operands should never reach V1 — they are intercepted at the BlobInfo level. + (_, BlobInfoMergeOperand::PooledBlobChangeStatus { .. }) + | (_, BlobInfoMergeOperand::PoolExpired { .. }) => { + unreachable!("pool operands should be handled in BlobInfoV2") + } } self } @@ -1748,334 +1825,1068 @@ impl Mergeable for BlobInfoV1 { ); None } + // Pool operands should never reach V1 — they are intercepted at the BlobInfo level. + BlobInfoMergeOperand::PooledBlobChangeStatus { .. } + | BlobInfoMergeOperand::PoolExpired { .. } => { + unreachable!("pool operands should be handled in BlobInfoV2") + } } } } -/// Represents the status of a blob. +/// V2 aggregate blob info that supports both regular blobs and storage pool blobs. /// -/// Currently only used for testing. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)] -#[repr(u8)] -#[cfg(test)] -pub(super) enum BlobCertificationStatus { - Registered, - Certified, - Invalid, +/// Regular blob fields are almost identical to V1, except that we no longer track +/// highest seen end epochs for deletable blobs. Highest seen end epoch is an optimization to give +/// better hint about whether a deletable blob may be alive or not. However, it may not give +/// accurate answer given ongoing GC. So we removed it from V2 and rely on GC to keep blob +/// epoch status up to date. +/// +/// Storage pool blobs are tracked via flat counters, similar to deletable blobs. +// +// INV: same invariants as V1 for regular fields, plus: +// INV: count_pooled_refs_total >= count_pooled_refs_certified +// INV: initial_certified_epoch considers both regular AND pool certified counts +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub(crate) struct ValidBlobInfoV2 { + // Common fields for both regular and storage pool blobs. + pub is_metadata_stored: bool, + pub initial_certified_epoch: Option, + + // Regular blob fields (same as V1). + pub count_deletable_total: u32, + pub count_deletable_certified: u32, + pub permanent_total: Option, + pub permanent_certified: Option, + + // Storage pool references counters. + pub count_pooled_refs_total: u32, + pub count_pooled_refs_certified: u32, } -#[cfg(test)] -impl PartialOrd for BlobCertificationStatus { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) +// Conversion from V1 to V2. This is needed because even when storage pool is supported, we still +// create V1 blob info if the blob is a regular blob. Only after seen a pooled blob event, we then +// upgrade to V2. This makes all the storage nodes to have consistent behavior despite of when +// they upgrade their nodes. +impl From for ValidBlobInfoV2 { + fn from(v1: ValidBlobInfoV1) -> Self { + Self { + is_metadata_stored: v1.is_metadata_stored, + count_deletable_total: v1.count_deletable_total, + count_deletable_certified: v1.count_deletable_certified, + permanent_total: v1.permanent_total, + permanent_certified: v1.permanent_certified, + initial_certified_epoch: v1.initial_certified_epoch, + count_pooled_refs_total: 0, + count_pooled_refs_certified: 0, + } } } -#[cfg(test)] -impl Ord for BlobCertificationStatus { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - use std::cmp::Ordering::*; - - use BlobCertificationStatus::*; +impl ValidBlobInfoV2 { + /// Handles regular blob status changes (same logic as V1). + fn update_status( + &mut self, + change_type: BlobStatusChangeType, + change_info: BlobStatusChangeInfo, + ) { + let was_certified = self.is_certified(change_info.epoch); + // This should be the same as V1, except that we no longer track end epochs for deletable + // blobs. + if change_info.deletable { + match change_type { + BlobStatusChangeType::Register => { + self.count_deletable_total += 1; + } + BlobStatusChangeType::Certify => { + if self.count_deletable_total <= self.count_deletable_certified { + tracing::error!( + "attempt to certify a deletable blob before corresponding register" + ); + return; + } + self.count_deletable_certified += 1; + } + BlobStatusChangeType::Extend => { + // No-op for V2 deletable blobs (no end epoch tracking). + } + BlobStatusChangeType::Delete { was_certified } => { + self.update_deletable_counters(was_certified); + } + } + } else { + // These should be the same as V1. + match change_type { + BlobStatusChangeType::Register => { + ValidBlobInfoV1::register_permanent(&mut self.permanent_total, &change_info); + } + BlobStatusChangeType::Certify => { + if !ValidBlobInfoV1::certify_permanent( + &self.permanent_total, + &mut self.permanent_certified, + &change_info, + ) { + return; + } + } + BlobStatusChangeType::Extend => { + ValidBlobInfoV1::extend_permanent(&mut self.permanent_total, &change_info); + ValidBlobInfoV1::extend_permanent(&mut self.permanent_certified, &change_info); + } + BlobStatusChangeType::Delete { .. } => { + tracing::error!("attempt to delete a permanent blob"); + return; + } + } + } - match (self, other) { - (Registered, Certified) | (Registered, Invalid) | (Certified, Invalid) => Less, - (left, right) if left == right => Equal, - _ => Greater, + // Update initial certified epoch. + match change_type { + BlobStatusChangeType::Certify => { + self.update_initial_certified_epoch(change_info.epoch, !was_certified); + } + BlobStatusChangeType::Delete { .. } => { + self.maybe_unset_initial_certified_epoch(); + } + BlobStatusChangeType::Register | BlobStatusChangeType::Extend => (), } } -} -/// Internal representation of the aggregate blob information for use in the database etc. Use -/// [`walrus_storage_node_client::api::BlobStatus`] for anything public facing (e.g., communication -/// to the client). -#[enum_dispatch(CertifiedBlobInfoApi)] -#[enum_dispatch(BlobInfoApi)] -#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] -pub(crate) enum BlobInfo { - V1(BlobInfoV1), -} + fn deletable_expired(&mut self, was_certified: bool) { + self.update_deletable_counters(was_certified); + self.maybe_unset_initial_certified_epoch(); + } -impl BlobInfo { - /// Creates a new (permanent) blob for testing purposes. - #[cfg(test)] - pub(super) fn new_for_testing( - end_epoch: Epoch, - status: BlobCertificationStatus, - current_status_event: EventID, - _registered_epoch: Option, - certified_epoch: Option, - invalidated_epoch: Option, - ) -> Self { - let blob_info = match status { - BlobCertificationStatus::Invalid => BlobInfoV1::Invalid { - epoch: invalidated_epoch - .expect("invalidated_epoch must be provided for Invalid status"), - event: current_status_event, - }, + fn permanent_expired(&mut self, was_certified: bool) { + ValidBlobInfoV1::decrement_blob_info_inner(&mut self.permanent_total); + if was_certified { + ValidBlobInfoV1::decrement_blob_info_inner(&mut self.permanent_certified); + } + self.maybe_unset_initial_certified_epoch(); + } - BlobCertificationStatus::Registered | BlobCertificationStatus::Certified => { - let permanent_total = - PermanentBlobInfoV1::new_first(end_epoch, current_status_event); - let permanent_certified = matches!(status, BlobCertificationStatus::Certified) - .then(|| permanent_total.clone()); - ValidBlobInfoV1 { - permanent_total: Some(permanent_total), - permanent_certified, - initial_certified_epoch: certified_epoch, - ..Default::default() - } - .into() - } - }; - Self::V1(blob_info) + fn update_deletable_counters(&mut self, was_certified: bool) { + self.count_deletable_total = self.count_deletable_total.saturating_sub(1); + if was_certified { + self.count_deletable_certified = self.count_deletable_certified.saturating_sub(1); + } } -} -impl ToBytes for BlobInfo {} + // --- Storage pool operations --- -impl Mergeable for BlobInfo { - type MergeOperand = BlobInfoMergeOperand; - type Key = BlobId; + /// Registers a blob in a storage pool. + fn pool_register(&mut self) { + self.count_pooled_refs_total += 1; + } - fn merge_with(self, operand: Self::MergeOperand) -> Self { - match self { - Self::V1(value) => Self::V1(value.merge_with(operand)), + /// Certifies a blob in a storage pool. + fn pool_certify(&mut self, epoch: Epoch) { + let was_certified = self.has_any_certified(); + if self.count_pooled_refs_total <= self.count_pooled_refs_certified { + tracing::error!("attempt to certify a pool blob before corresponding register"); + return; } + self.count_pooled_refs_certified += 1; + self.update_initial_certified_epoch(epoch, !was_certified); } - fn merge_new(operand: Self::MergeOperand) -> Option { - BlobInfoV1::merge_new(operand).map(Self::from) + /// Deletes a blob from a storage pool. + fn pool_delete(&mut self, was_certified: bool) { + if self.count_pooled_refs_total == 0 { + tracing::error!("attempt to delete a pool blob that is not registered"); + return; + } + self.count_pooled_refs_total = self.count_pooled_refs_total.saturating_sub(1); + if was_certified { + self.count_pooled_refs_certified = self.count_pooled_refs_certified.saturating_sub(1); + } + self.maybe_unset_initial_certified_epoch(); } -} - -mod per_object_blob_info { - use super::*; - #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] - pub(crate) struct PerObjectBlobInfoMergeOperand { - pub change_type: BlobStatusChangeType, - pub change_info: BlobStatusChangeInfo, + /// Handles a pool blob expiring (pool lifetime ended). + fn pool_expired(&mut self, was_certified: bool) { + self.pool_delete(was_certified); } - impl ToBytes for PerObjectBlobInfoMergeOperand {} + // --- Helper methods --- - impl PerObjectBlobInfoMergeOperand { - pub fn from_blob_info_merge_operand( - blob_info_merge_operand: BlobInfoMergeOperand, - ) -> Option { - let BlobInfoMergeOperand::ChangeStatus { - change_type, - change_info, - } = blob_info_merge_operand - else { - return None; - }; - Some(Self { - change_type, - change_info, - }) + /// Returns true if any certified references exist (regular or pool). + fn has_any_certified(&self) -> bool { + if self.count_deletable_certified > 0 || self.permanent_certified.is_some() { + return true; } + self.count_pooled_refs_certified > 0 } - impl From<&T> for PerObjectBlobInfoMergeOperand { - fn from(value: &T) -> Self { - Self { - change_type: value.change_type(), - change_info: value.change_info(), - } + fn update_initial_certified_epoch(&mut self, new_certified_epoch: Epoch, force: bool) { + if force + || self + .initial_certified_epoch + .is_none_or(|existing_epoch| existing_epoch > new_certified_epoch) + { + self.initial_certified_epoch = Some(new_certified_epoch); } } - /// Trait defining methods for retrieving information about a blob object. - // NB: Before adding functions to this trait, think twice if you really need it as it needs to - // be implementable by future internal representations of the per-object blob status as well. - #[enum_dispatch] - #[allow(dead_code)] - pub(crate) trait PerObjectBlobInfoApi: CertifiedBlobInfoApi { - /// Returns the blob ID associated with this object. - fn blob_id(&self) -> BlobId; - /// Returns true iff the object is deletable. - fn is_deletable(&self) -> bool; - /// Returns true iff the object is not expired and not deleted. - fn is_registered(&self, current_epoch: Epoch) -> bool; - /// Returns true iff the object is already deleted. - fn is_deleted(&self) -> bool; + fn maybe_unset_initial_certified_epoch(&mut self) { + if !self.has_any_certified() { + self.initial_certified_epoch = None; + } } - #[enum_dispatch(CertifiedBlobInfoApi)] - #[enum_dispatch(PerObjectBlobInfoApi)] - #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] - pub(crate) enum PerObjectBlobInfo { - V1(PerObjectBlobInfoV1), + fn has_no_objects(&self) -> bool { + self.count_deletable_total == 0 + && self.count_deletable_certified == 0 + && self.permanent_total.is_none() + && self.permanent_certified.is_none() + && self.initial_certified_epoch.is_none() + && self.count_pooled_refs_total == 0 } - impl PerObjectBlobInfo { - #[cfg(test)] - pub(crate) fn new_for_testing( - blob_id: BlobId, - registered_epoch: Epoch, - certified_epoch: Option, - end_epoch: Epoch, - deletable: bool, - event: EventID, - deleted: bool, - ) -> Self { - Self::V1(PerObjectBlobInfoV1 { - blob_id, - registered_epoch, - certified_epoch, - end_epoch, - deletable, - event, - deleted, - }) + /// Checks the invariants of this V2 blob info. + fn check_invariants(&self) -> anyhow::Result<()> { + let has_regular_certified = + self.count_deletable_certified > 0 || self.permanent_certified.is_some(); + let has_pool_certified = self.count_pooled_refs_certified > 0; + + anyhow::ensure!(self.count_deletable_total >= self.count_deletable_certified); + match self.initial_certified_epoch { + None => { + anyhow::ensure!(!has_regular_certified && !has_pool_certified) + } + Some(_) => { + anyhow::ensure!(has_regular_certified || has_pool_certified) + } + } + + match (&self.permanent_total, &self.permanent_certified) { + (None, Some(_)) => { + anyhow::bail!("permanent_total.is_none() => permanent_certified.is_none()") + } + (Some(total_inner), Some(certified_inner)) => { + anyhow::ensure!(total_inner.end_epoch >= certified_inner.end_epoch); + anyhow::ensure!(total_inner.count >= certified_inner.count); + } + _ => (), } + + anyhow::ensure!( + self.count_pooled_refs_total >= self.count_pooled_refs_certified, + "pool ref count_pooled_refs_total < count_pooled_refs_certified" + ); + + Ok(()) } +} - impl ToBytes for PerObjectBlobInfo {} +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) enum BlobInfoV2 { + Invalid { epoch: Epoch, event: EventID }, + Valid(ValidBlobInfoV2), +} - impl Mergeable for PerObjectBlobInfo { - type MergeOperand = PerObjectBlobInfoMergeOperand; - type Key = ObjectID; +impl ToBytes for BlobInfoV2 {} - fn merge_with(self, operand: Self::MergeOperand) -> Self { - match self { - Self::V1(value) => Self::V1(value.merge_with(operand)), - } +impl From for BlobInfoV2 { + fn from(v1: BlobInfoV1) -> Self { + match v1 { + BlobInfoV1::Invalid { epoch, event } => BlobInfoV2::Invalid { epoch, event }, + BlobInfoV1::Valid(v) => BlobInfoV2::Valid(v.into()), } + } +} - fn merge_new(operand: Self::MergeOperand) -> Option { - PerObjectBlobInfoV1::merge_new(operand).map(Self::from) - } +impl CertifiedBlobInfoApi for ValidBlobInfoV2 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + let exists_certified_permanent_blob = self + .permanent_certified + .as_ref() + .is_some_and(|p| p.end_epoch > current_epoch); + + // Note that at the beginning of the epoch before GC runs, newly expired deletable blob or + // pooled blob's certified counter may still be non-zero, but the blob is already expired. + // This will make the blob appears to be certified until GC finishes. + self.initial_certified_epoch + .is_some_and(|epoch| epoch <= current_epoch) + && (exists_certified_permanent_blob + || self.count_deletable_certified > 0 + || self.count_pooled_refs_certified > 0) } - #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] - pub(crate) struct PerObjectBlobInfoV1 { - /// The blob ID. - pub blob_id: BlobId, - /// The epoch in which the blob has been registered. - pub registered_epoch: Epoch, - /// The epoch in which the blob was first certified, `None` if the blob is uncertified. - pub certified_epoch: Option, - /// The epoch in which the blob expires. - pub end_epoch: Epoch, - /// Whether the blob is deletable. - pub deletable: bool, - /// The ID of the last blob event related to this object. - pub event: EventID, - /// Whether the blob has been deleted. - pub deleted: bool, + fn initial_certified_epoch(&self) -> Option { + self.initial_certified_epoch } +} - impl CertifiedBlobInfoApi for PerObjectBlobInfoV1 { - fn is_certified(&self, current_epoch: Epoch) -> bool { - self.is_registered(current_epoch) - && self - .certified_epoch - .is_some_and(|epoch| epoch <= current_epoch) +impl CertifiedBlobInfoApi for BlobInfoV2 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + match self { + Self::Valid(v) => v.is_certified(current_epoch), + Self::Invalid { .. } => false, } + } - fn initial_certified_epoch(&self) -> Option { - self.certified_epoch + fn initial_certified_epoch(&self) -> Option { + match self { + Self::Valid(v) => v.initial_certified_epoch, + Self::Invalid { .. } => None, } } +} - impl PerObjectBlobInfoApi for PerObjectBlobInfoV1 { - fn blob_id(&self) -> BlobId { - self.blob_id - } +impl BlobInfoApi for BlobInfoV2 { + fn is_metadata_stored(&self) -> bool { + matches!( + self, + Self::Valid(ValidBlobInfoV2 { + is_metadata_stored: true, + .. + }) + ) + } - fn is_deletable(&self) -> bool { - self.deletable - } + fn is_registered(&self, _current_epoch: Epoch) -> bool { + let Self::Valid(v) = self else { + return false; + }; - fn is_registered(&self, current_epoch: Epoch) -> bool { - self.end_epoch > current_epoch && !self.deleted + let exists_registered_permanent_blob = v + .permanent_total + .as_ref() + .is_some_and(|p| p.end_epoch > _current_epoch); + + // Note that at the beginning of the epoch before GC runs, newly expired deletable blob or + // pooled blob's registered counter may still be non-zero, but the blob is already expired. + // This will make the blob appears to be registered until GC finishes. + exists_registered_permanent_blob + || v.count_deletable_total > 0 + || v.count_pooled_refs_total > 0 + } + + fn can_blob_info_be_deleted(&self, current_epoch: Epoch) -> bool { + match self { + Self::Invalid { .. } => false, + Self::Valid(v) => { + v.count_deletable_total == 0 + && v.permanent_total.is_none() + && v.count_pooled_refs_total == 0 + && self.can_data_be_deleted(current_epoch) + } } + } - fn is_deleted(&self) -> bool { - self.deleted + fn invalidation_event(&self) -> Option { + if let Self::Invalid { event, .. } = self { + Some(*event) + } else { + None } } - impl ToBytes for PerObjectBlobInfoV1 {} + fn to_blob_status(&self, current_epoch: Epoch) -> BlobStatus { + match self { + BlobInfoV2::Invalid { event, .. } => BlobStatus::Invalid { event: *event }, + BlobInfoV2::Valid(v) => { + let initial_certified_epoch = v.initial_certified_epoch; + + // Deletable counts include both regular and pool refs. + let deletable_counts = DeletableCounts { + count_deletable_total: v + .count_deletable_total + .saturating_add(v.count_pooled_refs_total), + count_deletable_certified: v + .count_deletable_certified + .saturating_add(v.count_pooled_refs_certified), + }; - impl Mergeable for PerObjectBlobInfoV1 { - type MergeOperand = PerObjectBlobInfoMergeOperand; - type Key = ObjectID; + if let Some(PermanentBlobInfoV1 { + end_epoch, event, .. + }) = v.permanent_certified.as_ref() + && *end_epoch > current_epoch + { + return BlobStatus::Permanent { + end_epoch: *end_epoch, + is_certified: true, + status_event: *event, + deletable_counts, + initial_certified_epoch, + }; + } + if let Some(PermanentBlobInfoV1 { + end_epoch, event, .. + }) = v.permanent_total.as_ref() + && *end_epoch > current_epoch + { + return BlobStatus::Permanent { + end_epoch: *end_epoch, + is_certified: false, + status_event: *event, + deletable_counts, + initial_certified_epoch, + }; + } - fn merge_with( - mut self, - PerObjectBlobInfoMergeOperand { - change_type, - change_info, - }: PerObjectBlobInfoMergeOperand, - ) -> Self { - assert_eq!( - self.blob_id, change_info.blob_id, - "blob ID mismatch in merge operand" - ); - assert_eq!( - self.deletable, change_info.deletable, - "deletable mismatch in merge operand" - ); - assert!( - !self.deleted, - "attempt to update an already deleted blob {}", - self.blob_id - ); - self.event = change_info.status_event; - match change_type { - // We ensure that the blob info is only updated a single time for each event. So if - // we see a duplicated registered or certified event for the some object, this is a - // serious bug somewhere. + if deletable_counts != Default::default() { + BlobStatus::Deletable { + initial_certified_epoch, + deletable_counts, + } + } else { + BlobStatus::Nonexistent + } + } + } + } +} + +impl Mergeable for BlobInfoV2 { + type MergeOperand = BlobInfoMergeOperand; + type Key = BlobId; + + fn merge_with(mut self, operand: Self::MergeOperand) -> Self { + match (&mut self, operand) { + // If the blob is already marked as invalid, do not update the status. + (Self::Invalid { .. }, _) => (), + ( + _, + BlobInfoMergeOperand::MarkInvalid { + epoch, + status_event, + }, + ) => { + return Self::Invalid { + epoch, + event: status_event, + }; + } + ( + Self::Valid(ValidBlobInfoV2 { + is_metadata_stored, .. + }), + BlobInfoMergeOperand::MarkMetadataStored(new_is_metadata_stored), + ) => { + *is_metadata_stored = new_is_metadata_stored; + } + // Regular blob status changes. + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::ChangeStatus { + change_type, + change_info, + }, + ) => valid_blob_info.update_status(change_type, change_info), + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::DeletableExpired { was_certified }, + ) => valid_blob_info.deletable_expired(was_certified), + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::PermanentExpired { was_certified }, + ) => valid_blob_info.permanent_expired(was_certified), + // Storage pool operations. + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::PooledBlobChangeStatus { + change_type, + change_info, + .. + }, + ) => match change_type { BlobStatusChangeType::Register => { - panic!( - "cannot register an already registered blob {}", - self.blob_id - ); + valid_blob_info.pool_register(); } BlobStatusChangeType::Certify => { - assert!( - self.certified_epoch.is_none(), - "cannot certify an already certified blob {}", - self.blob_id - ); - self.certified_epoch = Some(change_info.epoch); + valid_blob_info.pool_certify(change_info.epoch); } BlobStatusChangeType::Extend => { - assert!( - self.certified_epoch.is_some(), - "cannot extend an uncertified blob {}", - self.blob_id - ); - self.end_epoch = change_info.end_epoch; + // Extensions don't change ref counts for storage pool. + // The pool's end_epoch is tracked separately. } BlobStatusChangeType::Delete { was_certified } => { - assert_eq!(self.certified_epoch.is_some(), was_certified); - self.deleted = true; + valid_blob_info.pool_delete(was_certified); } - } - self + }, + ( + Self::Valid(valid_blob_info), + BlobInfoMergeOperand::PoolExpired { was_certified, .. }, + ) => valid_blob_info.pool_expired(was_certified), } + self + } - fn merge_new(operand: Self::MergeOperand) -> Option { - let PerObjectBlobInfoMergeOperand { + fn merge_new(operand: Self::MergeOperand) -> Option { + match operand { + BlobInfoMergeOperand::PooledBlobChangeStatus { change_type: BlobStatusChangeType::Register, - change_info: - BlobStatusChangeInfo { - blob_id, - deletable, - epoch, - end_epoch, - status_event, - }, - } = operand - else { - tracing::error!( - ?operand, - "encountered an update other than 'register' for an untracked blob object" + .. + } => { + let mut v2 = ValidBlobInfoV2::default(); + v2.pool_register(); + Some(BlobInfoV2::Valid(v2)) + } + BlobInfoMergeOperand::MarkInvalid { + epoch, + status_event, + } => Some(BlobInfoV2::Invalid { + epoch, + event: status_event, + }), + BlobInfoMergeOperand::MarkMetadataStored(is_metadata_stored) => { + tracing::info!( + is_metadata_stored, + "marking metadata stored for an untracked blob ID; blob info will be removed \ + during the next garbage collection" + ); + Some(BlobInfoV2::Valid(ValidBlobInfoV2 { + is_metadata_stored, + ..Default::default() + })) + } + BlobInfoMergeOperand::ChangeStatus { + change_type: BlobStatusChangeType::Register, + change_info: + BlobStatusChangeInfo { + deletable, + end_epoch, + status_event, + .. + }, + } => Some(BlobInfoV2::Valid(if deletable { + ValidBlobInfoV2 { + count_deletable_total: 1, + ..Default::default() + } + } else { + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_first(end_epoch, status_event)), + ..Default::default() + } + })), + _ => { + tracing::error!( + ?operand, + "encountered an unexpected update for an untracked blob ID (V2)" + ); + debug_assert!( + false, + "encountered an unexpected update for an untracked blob ID (V2): {operand:?}", + ); + None + } + } + } +} + +/// Represents the status of a blob. +/// +/// Currently only used for testing. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)] +#[repr(u8)] +#[cfg(test)] +pub(super) enum BlobCertificationStatus { + Registered, + Certified, + Invalid, +} + +#[cfg(test)] +impl PartialOrd for BlobCertificationStatus { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[cfg(test)] +impl Ord for BlobCertificationStatus { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + use std::cmp::Ordering::*; + + use BlobCertificationStatus::*; + + match (self, other) { + (Registered, Certified) | (Registered, Invalid) | (Certified, Invalid) => Less, + (left, right) if left == right => Equal, + _ => Greater, + } + } +} + +/// Internal representation of the aggregate blob information for use in the database etc. Use +/// [`walrus_storage_node_client::api::BlobStatus`] for anything public facing (e.g., communication +/// to the client). +#[enum_dispatch(CertifiedBlobInfoApi)] +#[enum_dispatch(BlobInfoApi)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] +pub(crate) enum BlobInfo { + V1(BlobInfoV1), + V2(BlobInfoV2), +} + +impl BlobInfo { + /// Creates a new (permanent) blob for testing purposes. + #[cfg(test)] + pub(super) fn new_for_testing( + end_epoch: Epoch, + status: BlobCertificationStatus, + current_status_event: EventID, + _registered_epoch: Option, + certified_epoch: Option, + invalidated_epoch: Option, + ) -> Self { + let blob_info = match status { + BlobCertificationStatus::Invalid => BlobInfoV1::Invalid { + epoch: invalidated_epoch + .expect("invalidated_epoch must be provided for Invalid status"), + event: current_status_event, + }, + + BlobCertificationStatus::Registered | BlobCertificationStatus::Certified => { + let permanent_total = + PermanentBlobInfoV1::new_first(end_epoch, current_status_event); + let permanent_certified = matches!(status, BlobCertificationStatus::Certified) + .then(|| permanent_total.clone()); + ValidBlobInfoV1 { + permanent_total: Some(permanent_total), + permanent_certified, + initial_certified_epoch: certified_epoch, + ..Default::default() + } + .into() + } + }; + Self::V1(blob_info) + } +} + +impl ToBytes for BlobInfo {} + +impl Mergeable for BlobInfo { + type MergeOperand = BlobInfoMergeOperand; + type Key = BlobId; + + fn merge_with(self, operand: Self::MergeOperand) -> Self { + match self { + Self::V1(v1) => match &operand { + // Only upgrade to V2 when a pool operand hits a V1 entry. + // This makes sure that all the storage nodes have consistent behavior despite of + // when they upgrade their nodes. + BlobInfoMergeOperand::PooledBlobChangeStatus { .. } + // Although it's impossible to have pool expired as the first pool event for a + // blob, the internal processing of pool expired will return error. + | BlobInfoMergeOperand::PoolExpired { .. } => { + Self::V2(BlobInfoV2::from(v1).merge_with(operand)) + } + // Regular operands keep V1 as V1. + _ => Self::V1(v1.merge_with(operand)), + }, + // V2 handles all operand types (regular AND pool). + Self::V2(v2) => Self::V2(v2.merge_with(operand)), + } + } + + fn merge_new(operand: Self::MergeOperand) -> Option { + match &operand { + // First event for a blob_id is a pool event → create V2. + BlobInfoMergeOperand::PooledBlobChangeStatus { .. } => { + BlobInfoV2::merge_new(operand).map(Self::V2) + } + // First event is a regular event → create V1 (as before). + _ => BlobInfoV1::merge_new(operand).map(Self::V1), + } + } +} + +mod per_object_blob_info { + use super::*; + + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] + pub(crate) struct PerObjectBlobInfoMergeOperand { + pub change_type: BlobStatusChangeType, + pub change_info: BlobStatusChangeInfo, + } + + impl ToBytes for PerObjectBlobInfoMergeOperand {} + + impl PerObjectBlobInfoMergeOperand { + pub fn from_blob_info_merge_operand( + blob_info_merge_operand: BlobInfoMergeOperand, + ) -> Option { + let BlobInfoMergeOperand::ChangeStatus { + change_type, + change_info, + } = blob_info_merge_operand + else { + return None; + }; + Some(Self { + change_type, + change_info, + }) + } + } + + impl From<&T> for PerObjectBlobInfoMergeOperand { + fn from(value: &T) -> Self { + Self { + change_type: value.change_type(), + change_info: value.change_info(), + } + } + } + + /// Trait defining methods for retrieving information about a blob object. + // NB: Before adding functions to this trait, think twice if you really need it as it needs to + // be implementable by future internal representations of the per-object blob status as well. + #[enum_dispatch] + #[allow(dead_code)] + pub(crate) trait PerObjectBlobInfoApi: CertifiedBlobInfoApi { + /// Returns the blob ID associated with this object. + fn blob_id(&self) -> BlobId; + /// Returns true iff the object is deletable. + fn is_deletable(&self) -> bool; + /// Returns true iff the object is not expired and not deleted. + fn is_registered(&self, current_epoch: Epoch) -> bool; + /// Returns true iff the object is already deleted. + fn is_deleted(&self) -> bool; + /// Returns the storage pool ID if this is a storage pool blob. + fn storage_pool_id(&self) -> Option; + } + + #[enum_dispatch(CertifiedBlobInfoApi)] + #[enum_dispatch(PerObjectBlobInfoApi)] + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] + pub(crate) enum PerObjectBlobInfo { + V1(PerObjectBlobInfoV1), + V2(PerObjectBlobInfoV2), + } + + impl PerObjectBlobInfo { + #[cfg(test)] + pub(crate) fn new_for_testing( + blob_id: BlobId, + registered_epoch: Epoch, + certified_epoch: Option, + end_epoch: Epoch, + deletable: bool, + event: EventID, + deleted: bool, + ) -> Self { + Self::V1(PerObjectBlobInfoV1 { + blob_id, + registered_epoch, + certified_epoch, + end_epoch, + deletable, + event, + deleted, + }) + } + } + + impl ToBytes for PerObjectBlobInfo {} + + impl Mergeable for PerObjectBlobInfo { + type MergeOperand = PerObjectBlobInfoMergeOperand; + type Key = ObjectID; + + fn merge_with(self, operand: Self::MergeOperand) -> Self { + match self { + Self::V1(value) => Self::V1(value.merge_with(operand)), + Self::V2(value) => Self::V2(value.merge_with(operand)), + } + } + + fn merge_new(operand: Self::MergeOperand) -> Option { + // We never create PerObjectBlobInfoV2 via merge operator. This is because the + // PerObjectBlobInfoMergeOperand struct can only used for V1 for registration. + // So any newly created PerObjectBlobInfoV2 is directly inserted into the table. + // + // The certify and delete operation can be used in both V1 and V2, so merge_with above + // works for both. + PerObjectBlobInfoV1::merge_new(operand).map(Self::from) + } + } + + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] + pub(crate) struct PerObjectBlobInfoV1 { + /// The blob ID. + pub blob_id: BlobId, + /// The epoch in which the blob has been registered. + pub registered_epoch: Epoch, + /// The epoch in which the blob was first certified, `None` if the blob is uncertified. + pub certified_epoch: Option, + /// The epoch in which the blob expires. + pub end_epoch: Epoch, + /// Whether the blob is deletable. + pub deletable: bool, + /// The ID of the last blob event related to this object. + pub event: EventID, + /// Whether the blob has been deleted. + pub deleted: bool, + } + + impl CertifiedBlobInfoApi for PerObjectBlobInfoV1 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + self.is_registered(current_epoch) + && self + .certified_epoch + .is_some_and(|epoch| epoch <= current_epoch) + } + + fn initial_certified_epoch(&self) -> Option { + self.certified_epoch + } + } + + impl PerObjectBlobInfoApi for PerObjectBlobInfoV1 { + fn blob_id(&self) -> BlobId { + self.blob_id + } + + fn is_deletable(&self) -> bool { + self.deletable + } + + fn is_registered(&self, current_epoch: Epoch) -> bool { + self.end_epoch > current_epoch && !self.deleted + } + + fn is_deleted(&self) -> bool { + self.deleted + } + + fn storage_pool_id(&self) -> Option { + None + } + } + + impl ToBytes for PerObjectBlobInfoV1 {} + + /// How the end epoch of a per-object blob is determined. + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] + pub(crate) enum EndEpochInfo { + /// Regular blob with an individual end epoch. + Individual(Epoch), + /// Pooled blob whose lifetime is determined by the storage pool's end epoch. + StoragePool(ObjectID), + } + + /// V2 per-object blob info that supports both regular blobs and storage pool blobs. + /// + /// Unlike V1, the end epoch and storage pool ID are combined into a single `EndEpochInfo` + /// enum: regular blobs have `EndEpochInfo::Individual(end_epoch)`, while pool blobs have + /// `EndEpochInfo::StoragePool(pool_id)`. + #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] + pub(crate) struct PerObjectBlobInfoV2 { + /// The blob ID. + pub blob_id: BlobId, + /// The epoch in which the blob has been registered. + pub registered_epoch: Epoch, + /// The epoch in which the blob was first certified, `None` if the blob is uncertified. + pub certified_epoch: Option, + /// How the blob's end epoch is determined. + pub end_epoch_info: EndEpochInfo, + /// Whether the blob is deletable. + pub deletable: bool, + /// The ID of the last blob event related to this object. + pub event: EventID, + /// Whether the blob has been deleted. + pub deleted: bool, + } + + impl CertifiedBlobInfoApi for PerObjectBlobInfoV2 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + self.is_registered(current_epoch) + && self + .certified_epoch + .is_some_and(|epoch| epoch <= current_epoch) + } + + fn initial_certified_epoch(&self) -> Option { + self.certified_epoch + } + } + + impl PerObjectBlobInfoApi for PerObjectBlobInfoV2 { + fn blob_id(&self) -> BlobId { + self.blob_id + } + + fn is_deletable(&self) -> bool { + self.deletable + } + + fn is_registered(&self, current_epoch: Epoch) -> bool { + if self.deleted { + return false; + } + match self.end_epoch_info { + EndEpochInfo::Individual(end_epoch) => end_epoch > current_epoch, + // Pool blob liveness depends on the pool, not the blob. + EndEpochInfo::StoragePool(_) => true, + } + } + + fn is_deleted(&self) -> bool { + self.deleted + } + + fn storage_pool_id(&self) -> Option { + match &self.end_epoch_info { + EndEpochInfo::StoragePool(id) => Some(*id), + EndEpochInfo::Individual(_) => None, + } + } + } + + impl ToBytes for PerObjectBlobInfoV2 {} + + impl Mergeable for PerObjectBlobInfoV2 { + type MergeOperand = PerObjectBlobInfoMergeOperand; + type Key = ObjectID; + + fn merge_with( + mut self, + PerObjectBlobInfoMergeOperand { + change_type, + change_info, + }: PerObjectBlobInfoMergeOperand, + ) -> Self { + assert_eq!( + self.blob_id, change_info.blob_id, + "blob ID mismatch in merge operand" + ); + assert_eq!( + self.deletable, change_info.deletable, + "deletable mismatch in merge operand" + ); + assert!( + !self.deleted, + "attempt to update an already deleted blob {}", + self.blob_id + ); + self.event = change_info.status_event; + match change_type { + BlobStatusChangeType::Register => { + panic!( + "cannot register an already registered blob {}", + self.blob_id + ); + } + BlobStatusChangeType::Certify => { + assert!( + self.certified_epoch.is_none(), + "cannot certify an already certified blob {}", + self.blob_id + ); + self.certified_epoch = Some(change_info.epoch); + } + BlobStatusChangeType::Extend => { + assert!( + self.certified_epoch.is_some(), + "cannot extend an uncertified blob {}", + self.blob_id + ); + self.end_epoch_info = EndEpochInfo::Individual(change_info.end_epoch); + } + BlobStatusChangeType::Delete { was_certified } => { + assert_eq!(self.certified_epoch.is_some(), was_certified); + self.deleted = true; + } + } + self + } + + fn merge_new(operand: Self::MergeOperand) -> Option { + // V2 entries are created via direct insert, not merge_new. + tracing::error!( + ?operand, + "PerObjectBlobInfoV2::merge_new should not be called; V2 entries are created via \ + direct insert" + ); + debug_assert!( + false, + "PerObjectBlobInfoV2::merge_new should not be called: {operand:?}" + ); + None + } + } + + impl Mergeable for PerObjectBlobInfoV1 { + type MergeOperand = PerObjectBlobInfoMergeOperand; + type Key = ObjectID; + + fn merge_with( + mut self, + PerObjectBlobInfoMergeOperand { + change_type, + change_info, + }: PerObjectBlobInfoMergeOperand, + ) -> Self { + assert_eq!( + self.blob_id, change_info.blob_id, + "blob ID mismatch in merge operand" + ); + assert_eq!( + self.deletable, change_info.deletable, + "deletable mismatch in merge operand" + ); + assert!( + !self.deleted, + "attempt to update an already deleted blob {}", + self.blob_id + ); + self.event = change_info.status_event; + match change_type { + // We ensure that the blob info is only updated a single time for each event. So if + // we see a duplicated registered or certified event for the some object, this is a + // serious bug somewhere. + BlobStatusChangeType::Register => { + panic!( + "cannot register an already registered blob {}", + self.blob_id + ); + } + BlobStatusChangeType::Certify => { + assert!( + self.certified_epoch.is_none(), + "cannot certify an already certified blob {}", + self.blob_id + ); + self.certified_epoch = Some(change_info.epoch); + } + BlobStatusChangeType::Extend => { + assert!( + self.certified_epoch.is_some(), + "cannot extend an uncertified blob {}", + self.blob_id + ); + self.end_epoch = change_info.end_epoch; + } + BlobStatusChangeType::Delete { was_certified } => { + assert_eq!(self.certified_epoch.is_some(), was_certified); + self.deleted = true; + } + } + self + } + + fn merge_new(operand: Self::MergeOperand) -> Option { + let PerObjectBlobInfoMergeOperand { + change_type: BlobStatusChangeType::Register, + change_info: + BlobStatusChangeInfo { + blob_id, + deletable, + epoch, + end_epoch, + status_event, + }, + } = operand + else { + tracing::error!( + ?operand, + "encountered an update other than 'register' for an untracked blob object" ); debug_assert!( false, @@ -2162,6 +2973,8 @@ mod tests { } } + // ==================== V1 BlobInfo Tests ==================== + param_test! { test_merge_new_expected_failure_cases: [ #[should_panic] certify_permanent: (BlobInfoMergeOperand::new_change_for_testing( @@ -2532,318 +3345,981 @@ mod tests { permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), ..Default::default() }, - BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Register, false, 2, 3, fixed_event_id_for_testing(1) - ), - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 3, 1)), - ..Default::default() - }, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, false, 2, 3, fixed_event_id_for_testing(1) + ), + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 3, 1)), + ..Default::default() + }, + ), + expire_permanent_blob: ( + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(3, 5, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 1)), + initial_certified_epoch: Some(1), + ..Default::default() + }, + BlobInfoMergeOperand::PermanentExpired { + was_certified: true, + }, + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 1)), + initial_certified_epoch: Some(1), + ..Default::default() + }, + ), + expire_last_permanent_blob: ( + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 1)), + initial_certified_epoch: Some(1), + ..Default::default() + }, + BlobInfoMergeOperand::PermanentExpired { + was_certified: true, + }, + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 0)), + permanent_certified: None, + initial_certified_epoch: None, + ..Default::default() + }, + ), + delete_deletable_blob: ( + ValidBlobInfoV1{ + count_deletable_total: 3, + count_deletable_certified: 2, + initial_certified_epoch: Some(1), + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: Some(4), + ..Default::default() + }, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: true }, + true, + 1, + 6, + event_id_for_testing(), + ), + ValidBlobInfoV1{ + count_deletable_total: 2, + count_deletable_certified: 1, + initial_certified_epoch: Some(1), + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: Some(4), + ..Default::default() + }, + ), + expire_deletable_blob: ( + ValidBlobInfoV1{ + count_deletable_total: 3, + count_deletable_certified: 2, + initial_certified_epoch: Some(1), + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: Some(4), + ..Default::default() + }, + BlobInfoMergeOperand::DeletableExpired { + was_certified: true, + }, + ValidBlobInfoV1{ + count_deletable_total: 2, + count_deletable_certified: 1, + initial_certified_epoch: Some(1), + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: Some(4), + ..Default::default() + }, + ), + delete_last_deletable_blob: ( + ValidBlobInfoV1{ + count_deletable_total: 2, + count_deletable_certified: 1, + initial_certified_epoch: Some(1), + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: Some(4), + ..Default::default() + }, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: true }, + true, + 1, + 4, + event_id_for_testing(), + ), + ValidBlobInfoV1{ + count_deletable_total: 1, + count_deletable_certified: 0, + initial_certified_epoch: None, + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: None, + ..Default::default() + }, + ), + expire_last_deletable_blob: ( + ValidBlobInfoV1{ + count_deletable_total: 2, + count_deletable_certified: 1, + initial_certified_epoch: Some(1), + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: Some(4), + ..Default::default() + }, + BlobInfoMergeOperand::DeletableExpired { + was_certified: true, + }, + ValidBlobInfoV1{ + count_deletable_total: 1, + count_deletable_certified: 0, + initial_certified_epoch: None, + latest_seen_deletable_registered_end_epoch: Some(5), + latest_seen_deletable_certified_end_epoch: None, + ..Default::default() + }, + ), + expire_uncertified_permanent_blob: ( + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(3, 5, 0)), + ..Default::default() + }, + BlobInfoMergeOperand::PermanentExpired { + was_certified: false, + }, + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), + ..Default::default() + }, + ), + expire_last_uncertified_permanent_blob: ( + ValidBlobInfoV1{ + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 0)), + ..Default::default() + }, + BlobInfoMergeOperand::PermanentExpired { + was_certified: false, + }, + ValidBlobInfoV1{ + permanent_total: None, + ..Default::default() + }, + ), + delete_uncertified_deletable_blob: ( + ValidBlobInfoV1{ + count_deletable_total: 3, + latest_seen_deletable_registered_end_epoch: Some(5), + ..Default::default() + }, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: false }, + true, + 1, + 6, + event_id_for_testing(), + ), + ValidBlobInfoV1{ + count_deletable_total: 2, + latest_seen_deletable_registered_end_epoch: Some(5), + ..Default::default() + }, + ), + delete_last_uncertified_deletable_blob: ( + ValidBlobInfoV1{ + count_deletable_total: 1, + latest_seen_deletable_registered_end_epoch: Some(5), + ..Default::default() + }, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: false }, + true, + 2, + 6, + event_id_for_testing(), + ), + ValidBlobInfoV1{ + count_deletable_total: 0, + latest_seen_deletable_registered_end_epoch: None, + ..Default::default() + }, + ), + ] + } + fn test_merge_preexisting_expected_successes( + preexisting_info: ValidBlobInfoV1, + operand: BlobInfoMergeOperand, + expected_info: ValidBlobInfoV1, + ) { + preexisting_info + .check_invariants() + .expect("preexisting blob info invariants violated"); + expected_info + .check_invariants() + .expect("expected blob info invariants violated"); + + let updated_info = BlobInfoV1::Valid(preexisting_info).merge_with(operand); + + assert_eq!(updated_info, expected_info.into()); + } + + param_test! { + test_merge_preexisting_expected_failures: [ + certify_permanent_without_register: ( + Default::default(), + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Certify, false, 42, 314, event_id_for_testing() + ), + ), + extend_permanent_without_certify: ( + Default::default(), + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Extend, false, 42, 314, event_id_for_testing() + ), + ), + certify_deletable_without_register: ( + Default::default(), + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Certify, true, 42, 314, event_id_for_testing() + ), + ), + ] + } + fn test_merge_preexisting_expected_failures( + preexisting_info: ValidBlobInfoV1, + operand: BlobInfoMergeOperand, + ) { + preexisting_info + .check_invariants() + .expect("preexisting blob info invariants violated"); + let preexisting_info = BlobInfoV1::Valid(preexisting_info); + let blob_info = preexisting_info.clone().merge_with(operand); + assert_eq!(preexisting_info, blob_info); + } + + param_test! { + test_blob_status_is_inexistent_for_expired_blobs: [ + expired_permanent_registered_0: ( + ValidBlobInfoV1 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), + ..Default::default() + }, + 1, + 2, ), - expire_permanent_blob: ( - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(3, 5, 0)), - permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 1)), - initial_certified_epoch: Some(1), - ..Default::default() - }, - BlobInfoMergeOperand::PermanentExpired { - was_certified: true, - }, - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), - permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 1)), - initial_certified_epoch: Some(1), + expired_permanent_registered_1: ( + ValidBlobInfoV1 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 3, 0)), ..Default::default() }, + 2, + 4, ), - expire_last_permanent_blob: ( - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), - permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 1)), - initial_certified_epoch: Some(1), + expired_permanent_certified: ( + ValidBlobInfoV1 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 2, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), ..Default::default() }, - BlobInfoMergeOperand::PermanentExpired { - was_certified: true, - }, - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 0)), - permanent_certified: None, - initial_certified_epoch: None, + 1, + 2, + ), + expired_deletable_registered: ( + ValidBlobInfoV1 { + count_deletable_total: 1, + latest_seen_deletable_registered_end_epoch: Some(2), ..Default::default() }, + 1, + 2, ), - delete_deletable_blob: ( - ValidBlobInfoV1{ - count_deletable_total: 3, - count_deletable_certified: 2, - initial_certified_epoch: Some(1), - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: Some(4), + expired_deletable_certified: ( + ValidBlobInfoV1 { + count_deletable_total: 1, + latest_seen_deletable_registered_end_epoch: Some(2), + count_deletable_certified: 1, + latest_seen_deletable_certified_end_epoch: Some(2), ..Default::default() }, + 1, + 2, + ), + ] + } + fn test_blob_status_is_inexistent_for_expired_blobs( + blob_info: ValidBlobInfoV1, + epoch_not_expired: Epoch, + epoch_expired: Epoch, + ) { + assert_ne!( + BlobInfoV1::Valid(blob_info.clone()).to_blob_status(epoch_not_expired), + BlobStatus::Nonexistent, + ); + assert_eq!( + BlobInfoV1::Valid(blob_info).to_blob_status(epoch_expired), + BlobStatus::Nonexistent, + ); + } + + // ==================== V2 BlobInfo Tests ==================== + + use per_object_blob_info::{EndEpochInfo, PerObjectBlobInfoV2}; + + fn pool_id() -> ObjectID { + walrus_sui::test_utils::object_id_for_testing() + } + + /// Shorthand: build a pool register/certify/delete operand. + fn pool_op(change_type: BlobStatusChangeType, epoch: Epoch) -> BlobInfoMergeOperand { + BlobInfoMergeOperand::PooledBlobChangeStatus { + change_type, + change_info: PooledBlobChangeInfo { + epoch, + storage_pool_id: pool_id(), + }, + } + } + + fn check_v2_invariants(info: &BlobInfoV2) { + if let BlobInfoV2::Valid(v) = info { + v.check_invariants() + .expect("V2 blob info invariants violated") + } + } + + // --- V2 merge_new success cases --- + + param_test! { + test_v2_merge_new_expected_success_cases_invariants: [ + register_permanent: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, false, 42, 314, event_id_for_testing() + )), + register_deletable: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, true, 42, 314, event_id_for_testing() + )), + invalidate: (BlobInfoMergeOperand::MarkInvalid { + epoch: 0, status_event: event_id_for_testing() + }), + metadata_true: (BlobInfoMergeOperand::MarkMetadataStored(true)), + metadata_false: (BlobInfoMergeOperand::MarkMetadataStored(false)), + pool_register: (pool_op(BlobStatusChangeType::Register, 1)), + ] + } + fn test_v2_merge_new_expected_success_cases_invariants(operand: BlobInfoMergeOperand) { + let blob_info = BlobInfoV2::merge_new(operand).expect("should be some"); + check_v2_invariants(&blob_info); + } + + // --- V2 mark metadata stored keeps everything else unchanged --- + + param_test! { + test_v2_mark_metadata_stored_keeps_everything_else_unchanged: [ + default: (ValidBlobInfoV2::default()), + deletable: (ValidBlobInfoV2 { + count_deletable_total: 2, ..Default::default() + }), + deletable_certified: (ValidBlobInfoV2 { + count_deletable_total: 2, count_deletable_certified: 1, + initial_certified_epoch: Some(0), ..Default::default() + }), + permanent_certified: (ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 3, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), + initial_certified_epoch: Some(1), ..Default::default() + }), + pool_refs: (ValidBlobInfoV2 { + count_pooled_refs_total: 3, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(2), ..Default::default() + }), + mixed_regular_and_pool: (ValidBlobInfoV2 { + count_deletable_total: 1, count_deletable_certified: 1, + count_pooled_refs_total: 2, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() + }), + ] + } + fn test_v2_mark_metadata_stored_keeps_everything_else_unchanged( + preexisting_info: ValidBlobInfoV2, + ) { + preexisting_info + .check_invariants() + .expect("preexisting invariants violated"); + let expected = ValidBlobInfoV2 { + is_metadata_stored: true, + ..preexisting_info.clone() + }; + expected + .check_invariants() + .expect("expected invariants violated"); + + let updated = BlobInfoV2::Valid(preexisting_info) + .merge_with(BlobInfoMergeOperand::MarkMetadataStored(true)); + assert_eq!(updated, BlobInfoV2::Valid(expected)); + } + + // --- V2 mark invalid marks everything invalid --- + + param_test! { + test_v2_mark_invalid_marks_everything_invalid: [ + default: (ValidBlobInfoV2::default()), + deletable: (ValidBlobInfoV2 { + count_deletable_total: 2, ..Default::default() + }), + deletable_certified: (ValidBlobInfoV2 { + count_deletable_total: 2, count_deletable_certified: 1, + initial_certified_epoch: Some(0), ..Default::default() + }), + permanent_certified: (ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 3, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), + initial_certified_epoch: Some(1), ..Default::default() + }), + pool_refs: (ValidBlobInfoV2 { + count_pooled_refs_total: 3, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(2), ..Default::default() + }), + mixed_regular_and_pool: (ValidBlobInfoV2 { + count_deletable_total: 1, count_deletable_certified: 1, + count_pooled_refs_total: 2, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() + }), + ] + } + fn test_v2_mark_invalid_marks_everything_invalid(preexisting_info: ValidBlobInfoV2) { + preexisting_info + .check_invariants() + .expect("preexisting invariants violated"); + let event = event_id_for_testing(); + let updated = + BlobInfoV2::Valid(preexisting_info).merge_with(BlobInfoMergeOperand::MarkInvalid { + epoch: 2, + status_event: event, + }); + assert_eq!(BlobInfoV2::Invalid { epoch: 2, event }, updated); + } + + // --- V2 merge preexisting expected successes --- + // Covers regular ops (deletable, permanent, expire) and pool ops on ValidBlobInfoV2. + // V2 does not track latest_seen_deletable_*_end_epoch, so those cases are omitted. + + param_test! { + test_v2_merge_preexisting_expected_successes: [ + // --- regular deletable --- + register_first_deletable: ( + ValidBlobInfoV2::default(), BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Delete { was_certified: true }, - true, - 1, - 6, - event_id_for_testing(), + BlobStatusChangeType::Register, true, 1, 2, event_id_for_testing() ), - ValidBlobInfoV1{ - count_deletable_total: 2, - count_deletable_certified: 1, - initial_certified_epoch: Some(1), - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: Some(4), - ..Default::default() - }, + ValidBlobInfoV2 { count_deletable_total: 1, ..Default::default() }, ), - expire_deletable_blob: ( - ValidBlobInfoV1{ - count_deletable_total: 3, - count_deletable_certified: 2, - initial_certified_epoch: Some(1), - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: Some(4), - ..Default::default() - }, - BlobInfoMergeOperand::DeletableExpired { - was_certified: true, - }, - ValidBlobInfoV1{ - count_deletable_total: 2, - count_deletable_certified: 1, - initial_certified_epoch: Some(1), - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: Some(4), - ..Default::default() + register_additional_deletable: ( + ValidBlobInfoV2 { count_deletable_total: 3, ..Default::default() }, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, true, 1, 5, event_id_for_testing() + ), + ValidBlobInfoV2 { count_deletable_total: 4, ..Default::default() }, + ), + certify_first_deletable: ( + ValidBlobInfoV2 { count_deletable_total: 3, ..Default::default() }, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Certify, true, 1, 4, event_id_for_testing() + ), + ValidBlobInfoV2 { + count_deletable_total: 3, count_deletable_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() }, ), - delete_last_deletable_blob: ( - ValidBlobInfoV1{ - count_deletable_total: 2, - count_deletable_certified: 1, - initial_certified_epoch: Some(1), - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: Some(4), - ..Default::default() + certify_additional_deletable_keeps_earlier_epoch: ( + ValidBlobInfoV2 { + count_deletable_total: 3, count_deletable_certified: 1, + initial_certified_epoch: Some(0), ..Default::default() }, BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Delete { was_certified: true }, - true, - 1, - 4, - event_id_for_testing(), + BlobStatusChangeType::Certify, true, 1, 2, event_id_for_testing() ), - ValidBlobInfoV1{ - count_deletable_total: 1, - count_deletable_certified: 0, - initial_certified_epoch: None, - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: None, - ..Default::default() + ValidBlobInfoV2 { + count_deletable_total: 3, count_deletable_certified: 2, + initial_certified_epoch: Some(0), ..Default::default() }, ), - expire_last_deletable_blob: ( - ValidBlobInfoV1{ - count_deletable_total: 2, - count_deletable_certified: 1, - initial_certified_epoch: Some(1), - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: Some(4), - ..Default::default() - }, - BlobInfoMergeOperand::DeletableExpired { - was_certified: true, + delete_deletable_blob: ( + ValidBlobInfoV2 { + count_deletable_total: 3, count_deletable_certified: 2, + initial_certified_epoch: Some(1), ..Default::default() }, - ValidBlobInfoV1{ - count_deletable_total: 1, - count_deletable_certified: 0, - initial_certified_epoch: None, - latest_seen_deletable_registered_end_epoch: Some(5), - latest_seen_deletable_certified_end_epoch: None, - ..Default::default() + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: true }, true, 1, 6, + event_id_for_testing(), + ), + ValidBlobInfoV2 { + count_deletable_total: 2, count_deletable_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() }, ), - expire_uncertified_permanent_blob: ( - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(3, 5, 0)), - ..Default::default() + delete_last_certified_deletable_clears_epoch: ( + ValidBlobInfoV2 { + count_deletable_total: 2, count_deletable_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() }, - BlobInfoMergeOperand::PermanentExpired { - was_certified: false, + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: true }, true, 1, 4, + event_id_for_testing(), + ), + ValidBlobInfoV2 { + count_deletable_total: 1, count_deletable_certified: 0, + initial_certified_epoch: None, ..Default::default() }, - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), - ..Default::default() + ), + expire_deletable_blob: ( + ValidBlobInfoV2 { + count_deletable_total: 3, count_deletable_certified: 2, + initial_certified_epoch: Some(1), ..Default::default() + }, + BlobInfoMergeOperand::DeletableExpired { was_certified: true }, + ValidBlobInfoV2 { + count_deletable_total: 2, count_deletable_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() }, ), - expire_last_uncertified_permanent_blob: ( - ValidBlobInfoV1{ - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 0)), - ..Default::default() + expire_last_certified_deletable_clears_epoch: ( + ValidBlobInfoV2 { + count_deletable_total: 2, count_deletable_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() }, - BlobInfoMergeOperand::PermanentExpired { - was_certified: false, + BlobInfoMergeOperand::DeletableExpired { was_certified: true }, + ValidBlobInfoV2 { + count_deletable_total: 1, count_deletable_certified: 0, + initial_certified_epoch: None, ..Default::default() }, - ValidBlobInfoV1{ - permanent_total: None, + ), + // --- regular permanent --- + register_first_permanent: ( + ValidBlobInfoV2::default(), + BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, false, 1, 2, + fixed_event_id_for_testing(0) + ), + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), ..Default::default() }, ), - delete_uncertified_deletable_blob: ( - ValidBlobInfoV1{ - count_deletable_total: 3, - latest_seen_deletable_registered_end_epoch: Some(5), + register_additional_permanent: ( + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), ..Default::default() }, BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Delete { was_certified: false }, - true, - 1, - 6, - event_id_for_testing(), + BlobStatusChangeType::Register, false, 2, 3, + fixed_event_id_for_testing(1) ), - ValidBlobInfoV1{ - count_deletable_total: 2, - latest_seen_deletable_registered_end_epoch: Some(5), + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 3, 1)), ..Default::default() }, ), - delete_last_uncertified_deletable_blob: ( - ValidBlobInfoV1{ - count_deletable_total: 1, - latest_seen_deletable_registered_end_epoch: Some(5), + extend_permanent: ( + ValidBlobInfoV2 { + initial_certified_epoch: Some(0), + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 4, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 4, 1)), ..Default::default() }, BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Delete { was_certified: false }, - true, - 2, - 6, - event_id_for_testing(), + BlobStatusChangeType::Extend, false, 3, 42, + fixed_event_id_for_testing(2) ), - ValidBlobInfoV1{ - count_deletable_total: 0, - latest_seen_deletable_registered_end_epoch: None, + ValidBlobInfoV2 { + initial_certified_epoch: Some(0), + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 42, 2)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 42, 2)), ..Default::default() }, ), + expire_permanent_blob: ( + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(3, 5, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 1)), + initial_certified_epoch: Some(1), ..Default::default() + }, + BlobInfoMergeOperand::PermanentExpired { was_certified: true }, + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 1)), + initial_certified_epoch: Some(1), ..Default::default() + }, + ), + expire_last_certified_permanent_clears_epoch: ( + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 5, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 1)), + initial_certified_epoch: Some(1), ..Default::default() + }, + BlobInfoMergeOperand::PermanentExpired { was_certified: true }, + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 5, 0)), + permanent_certified: None, + initial_certified_epoch: None, ..Default::default() + }, + ), + // --- pool operations --- + pool_register: ( + ValidBlobInfoV2::default(), + pool_op(BlobStatusChangeType::Register, 1), + ValidBlobInfoV2 { count_pooled_refs_total: 1, ..Default::default() }, + ), + pool_certify: ( + ValidBlobInfoV2 { count_pooled_refs_total: 1, ..Default::default() }, + pool_op(BlobStatusChangeType::Certify, 3), + ValidBlobInfoV2 { + count_pooled_refs_total: 1, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(3), ..Default::default() + }, + ), + pool_delete_certified: ( + ValidBlobInfoV2 { + count_pooled_refs_total: 2, count_pooled_refs_certified: 2, + initial_certified_epoch: Some(1), ..Default::default() + }, + pool_op(BlobStatusChangeType::Delete { was_certified: true }, 5), + ValidBlobInfoV2 { + count_pooled_refs_total: 1, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() + }, + ), + pool_delete_last_certified_clears_epoch: ( + ValidBlobInfoV2 { + count_pooled_refs_total: 2, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() + }, + pool_op(BlobStatusChangeType::Delete { was_certified: true }, 5), + ValidBlobInfoV2 { + count_pooled_refs_total: 1, count_pooled_refs_certified: 0, + initial_certified_epoch: None, ..Default::default() + }, + ), + pool_expired: ( + ValidBlobInfoV2 { + count_pooled_refs_total: 2, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() + }, + BlobInfoMergeOperand::PoolExpired { + storage_pool_id: pool_id(), was_certified: true, + }, + ValidBlobInfoV2 { + count_pooled_refs_total: 1, count_pooled_refs_certified: 0, + initial_certified_epoch: None, ..Default::default() + }, + ), + // --- mixed: pool cert keeps epoch alive after regular deletion --- + mixed_regular_delete_pool_cert_keeps_epoch: ( + ValidBlobInfoV2 { + count_deletable_total: 1, count_deletable_certified: 1, + count_pooled_refs_total: 1, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(3), ..Default::default() + }, + BlobInfoMergeOperand::DeletableExpired { was_certified: true }, + ValidBlobInfoV2 { + count_deletable_total: 0, count_deletable_certified: 0, + count_pooled_refs_total: 1, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(3), ..Default::default() + }, + ), ] } - fn test_merge_preexisting_expected_successes( - preexisting_info: ValidBlobInfoV1, + fn test_v2_merge_preexisting_expected_successes( + preexisting: ValidBlobInfoV2, operand: BlobInfoMergeOperand, - expected_info: ValidBlobInfoV1, + expected: ValidBlobInfoV2, ) { - preexisting_info + preexisting .check_invariants() - .expect("preexisting blob info invariants violated"); - expected_info + .expect("preexisting invariants violated"); + expected .check_invariants() - .expect("expected blob info invariants violated"); + .expect("expected invariants violated"); + let updated = BlobInfoV2::Valid(preexisting).merge_with(operand); + assert_eq!(updated, BlobInfoV2::Valid(expected)); + } - let updated_info = BlobInfoV1::Valid(preexisting_info).merge_with(operand); + // --- V1→V2 conversion preserves fields and zeroes pool counters --- + #[test] + fn v1_to_v2_conversion() { + let v1 = ValidBlobInfoV1 { + is_metadata_stored: true, + count_deletable_total: 3, + count_deletable_certified: 1, + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 10, 0)), + permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 10, 1)), + initial_certified_epoch: Some(5), + latest_seen_deletable_registered_end_epoch: Some(8), + latest_seen_deletable_certified_end_epoch: Some(7), + }; + let v2: ValidBlobInfoV2 = v1.clone().into(); + assert_eq!(v2.count_deletable_total, v1.count_deletable_total); + assert_eq!(v2.permanent_total, v1.permanent_total); + assert_eq!(v2.initial_certified_epoch, v1.initial_certified_epoch); + assert_eq!( + (v2.count_pooled_refs_total, v2.count_pooled_refs_certified), + (0, 0) + ); + v2.check_invariants() + .expect("v2 blob info invariants violated"); - assert_eq!(updated_info, expected_info.into()); + // Invalid converts too. + let inv = BlobInfoV1::Invalid { + epoch: 5, + event: event_id_for_testing(), + }; + assert!(matches!( + BlobInfoV2::from(inv), + BlobInfoV2::Invalid { epoch: 5, .. } + )); } + // --- Invalid state --- param_test! { - test_merge_preexisting_expected_failures: [ - certify_permanent_without_register: ( - Default::default(), - BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Certify, false, 42, 314, event_id_for_testing() - ), + test_v2_invalid_status_is_not_changed: [ + invalidate: (BlobInfoMergeOperand::MarkInvalid { + epoch: 0, status_event: event_id_for_testing() + }), + metadata_true: (BlobInfoMergeOperand::MarkMetadataStored(true)), + metadata_false: (BlobInfoMergeOperand::MarkMetadataStored(false)), + register_permanent: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, false, 42, 314, event_id_for_testing() + )), + register_deletable: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, true, 42, 314, event_id_for_testing() + )), + certify_permanent: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Certify, false, 42, 314, event_id_for_testing() + )), + certify_deletable: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Certify, true, 42, 314, event_id_for_testing() + )), + extend: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Extend, false, 42, 314, event_id_for_testing() + )), + delete_true: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: true }, + false, 42, 314, event_id_for_testing(), + )), + delete_false: (BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Delete { was_certified: false }, + false, 42, 314, event_id_for_testing(), + )), + pool_register: (pool_op(BlobStatusChangeType::Register, 1)), + pool_certify: (pool_op(BlobStatusChangeType::Certify, 1)), + pool_delete: (pool_op(BlobStatusChangeType::Delete { was_certified: true }, 1)), + pool_expired: (BlobInfoMergeOperand::PoolExpired { + storage_pool_id: pool_id(), was_certified: false, + }), + ] + } + fn test_v2_invalid_status_is_not_changed(operand: BlobInfoMergeOperand) { + let blob_info = BlobInfoV2::Invalid { + epoch: 42, + event: event_id_for_testing(), + }; + assert_eq!(blob_info, blob_info.clone().merge_with(operand)); + } + + // --- BlobInfo enum: V1→V2 upgrade routing --- + #[test] + fn blob_info_upgrades_v1_to_v2_on_pool_operand() { + let v1 = BlobInfo::V1(BlobInfoV1::Valid(ValidBlobInfoV1 { + count_deletable_total: 2, + count_deletable_certified: 1, + initial_certified_epoch: Some(3), + latest_seen_deletable_registered_end_epoch: Some(10), + latest_seen_deletable_certified_end_epoch: Some(10), + ..Default::default() + })); + let result = v1.merge_with(pool_op(BlobStatusChangeType::Register, 1)); + let BlobInfo::V2(BlobInfoV2::Valid(v)) = &result else { + panic!("expected V2, got {result:?}") + }; + assert_eq!(v.count_deletable_total, 2); + assert_eq!(v.count_pooled_refs_total, 1); + v.check_invariants().unwrap(); + } + + #[test] + fn blob_info_stays_v1_for_regular_operand() { + let v1 = BlobInfo::V1(BlobInfoV1::Valid(Default::default())); + let result = v1.merge_with(BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, + true, + 1, + 10, + event_id_for_testing(), + )); + assert!(matches!(result, BlobInfo::V1(_))); + } + + #[test] + fn blob_info_merge_new_creates_v2_for_pool_v1_for_regular() { + let pool = BlobInfo::merge_new(pool_op(BlobStatusChangeType::Register, 1)).unwrap(); + assert!(matches!(pool, BlobInfo::V2(_))); + + let regular = BlobInfo::merge_new(BlobInfoMergeOperand::new_change_for_testing( + BlobStatusChangeType::Register, + true, + 1, + 10, + event_id_for_testing(), + )) + .unwrap(); + assert!(matches!(regular, BlobInfo::V1(_))); + } + + // --- CertifiedBlobInfoApi on V2 --- + param_test! { + test_v2_is_certified: [ + before_certified_epoch: ( + ValidBlobInfoV2 { + count_pooled_refs_total: 1, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(5), ..Default::default() + }, 4, false, ), - extend_permanent_without_certify: ( - Default::default(), - BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Extend, false, 42, 314, event_id_for_testing() - ), + at_certified_epoch: ( + ValidBlobInfoV2 { + count_pooled_refs_total: 1, count_pooled_refs_certified: 1, + initial_certified_epoch: Some(5), ..Default::default() + }, 5, true, ), - certify_deletable_without_register: ( - Default::default(), - BlobInfoMergeOperand::new_change_for_testing( - BlobStatusChangeType::Certify, true, 42, 314, event_id_for_testing() - ), + no_certified_refs: ( + ValidBlobInfoV2 { + count_pooled_refs_total: 1, ..Default::default() + }, 0, false, + ), + regular_deletable_certified: ( + ValidBlobInfoV2 { + count_deletable_total: 1, count_deletable_certified: 1, + initial_certified_epoch: Some(2), ..Default::default() + }, 3, true, ), ] } - fn test_merge_preexisting_expected_failures( - preexisting_info: ValidBlobInfoV1, - operand: BlobInfoMergeOperand, - ) { - preexisting_info - .check_invariants() - .expect("preexisting blob info invariants violated"); - let preexisting_info = BlobInfoV1::Valid(preexisting_info); - let blob_info = preexisting_info.clone().merge_with(operand); - assert_eq!(preexisting_info, blob_info); + fn test_v2_is_certified(info: ValidBlobInfoV2, epoch: Epoch, expected: bool) { + info.check_invariants().unwrap(); + assert_eq!(BlobInfoV2::Valid(info).is_certified(epoch), expected); } + // --- BlobInfoApi on V2 --- param_test! { - test_blob_status_is_inexistent_for_expired_blobs: [ - expired_permanent_registered_0: ( - ValidBlobInfoV1 { - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), - ..Default::default() - }, - 1, - 2, - ), - expired_permanent_registered_1: ( - ValidBlobInfoV1 { - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 3, 0)), - ..Default::default() - }, - 2, - 4, + test_v2_is_registered: [ + pool_refs: ( + ValidBlobInfoV2 { count_pooled_refs_total: 1, ..Default::default() }, + 9999, true, ), - expired_permanent_certified: ( - ValidBlobInfoV1 { - permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(2, 2, 0)), - permanent_certified: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 2, 0)), - ..Default::default() - }, - 1, - 2, + deletable_refs: ( + ValidBlobInfoV2 { count_deletable_total: 1, ..Default::default() }, + 9999, true, ), - expired_deletable_registered: ( - ValidBlobInfoV1 { - count_deletable_total: 1, - latest_seen_deletable_registered_end_epoch: Some(2), + permanent_before_expiry: ( + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 10, 0)), ..Default::default() - }, - 1, - 2, + }, 9, true, ), - expired_deletable_certified: ( - ValidBlobInfoV1 { - count_deletable_total: 1, - latest_seen_deletable_registered_end_epoch: Some(2), - count_deletable_certified: 1, - latest_seen_deletable_certified_end_epoch: Some(2), + permanent_at_expiry: ( + ValidBlobInfoV2 { + permanent_total: Some(PermanentBlobInfoV1::new_fixed_for_testing(1, 10, 0)), ..Default::default() - }, - 1, - 2, + }, 10, false, ), + empty: (ValidBlobInfoV2::default(), 0, false), ] } - fn test_blob_status_is_inexistent_for_expired_blobs( - blob_info: ValidBlobInfoV1, - epoch_not_expired: Epoch, - epoch_expired: Epoch, - ) { - assert_ne!( - BlobInfoV1::Valid(blob_info.clone()).to_blob_status(epoch_not_expired), - BlobStatus::Nonexistent, - ); - assert_eq!( - BlobInfoV1::Valid(blob_info).to_blob_status(epoch_expired), - BlobStatus::Nonexistent, - ); + fn test_v2_is_registered(info: ValidBlobInfoV2, epoch: Epoch, expected: bool) { + info.check_invariants().unwrap(); + assert_eq!(BlobInfoV2::Valid(info).is_registered(epoch), expected); + } + + #[test] + fn v2_to_blob_status_combines_regular_and_pool_counts() { + let info = BlobInfoV2::Valid(ValidBlobInfoV2 { + count_deletable_total: 2, + count_deletable_certified: 1, + count_pooled_refs_total: 3, + count_pooled_refs_certified: 2, + initial_certified_epoch: Some(1), + ..Default::default() + }); + let BlobStatus::Deletable { + deletable_counts, .. + } = info.to_blob_status(0) + else { + panic!("expected Deletable") + }; + assert_eq!(deletable_counts.count_deletable_total, 5); + assert_eq!(deletable_counts.count_deletable_certified, 3); + } + + // --- V2 invariant violation detection --- + param_test! { + test_v2_invariant_violations: [ + pool_total_lt_certified: (ValidBlobInfoV2 { + count_pooled_refs_certified: 1, + initial_certified_epoch: Some(1), ..Default::default() + }), + certified_epoch_without_refs: (ValidBlobInfoV2 { + initial_certified_epoch: Some(1), ..Default::default() + }), + certified_refs_without_epoch: (ValidBlobInfoV2 { + count_pooled_refs_total: 1, count_pooled_refs_certified: 1, + ..Default::default() + }), + ] + } + fn test_v2_invariant_violations(info: ValidBlobInfoV2) { + assert!(info.check_invariants().is_err()); + } + + // --- PerObjectBlobInfoV2 --- + fn make_per_object_v2(end_epoch_info: EndEpochInfo) -> PerObjectBlobInfoV2 { + PerObjectBlobInfoV2 { + blob_id: walrus_core::test_utils::blob_id_from_u64(42), + registered_epoch: 1, + certified_epoch: None, + end_epoch_info, + deletable: true, + event: event_id_for_testing(), + deleted: false, + } + } + + #[test] + fn per_object_v2_certify_and_delete_merge() { + let info = make_per_object_v2(EndEpochInfo::StoragePool(pool_id())); + let cert_operand = PerObjectBlobInfoMergeOperand { + change_type: BlobStatusChangeType::Certify, + change_info: BlobStatusChangeInfo { + blob_id: walrus_core::test_utils::blob_id_from_u64(42), + deletable: true, + epoch: 3, + end_epoch: 0, + status_event: event_id_for_testing(), + }, + }; + let certified = info.merge_with(cert_operand); + assert_eq!(certified.certified_epoch, Some(3)); + assert!(!certified.deleted); + + let del_operand = PerObjectBlobInfoMergeOperand { + change_type: BlobStatusChangeType::Delete { + was_certified: true, + }, + change_info: BlobStatusChangeInfo { + blob_id: walrus_core::test_utils::blob_id_from_u64(42), + deletable: true, + epoch: 5, + end_epoch: 0, + status_event: event_id_for_testing(), + }, + }; + let deleted = certified.merge_with(del_operand); + assert!(deleted.deleted); + assert!(!deleted.is_registered(0)); } }