Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/walrus-service/src/node/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ pub(crate) mod tests {
BlobInfoMergeOperand,
BlobInfoV1,
BlobStatusChangeType,
PermanentBlobInfoV1,
PermanentBlobInfo,
ValidBlobInfoV1,
};
use constants::{
Expand Down Expand Up @@ -1508,7 +1508,7 @@ pub(crate) mod tests {

// Set correct registered event.
let BlobInfo::V1(BlobInfoV1::Valid(ValidBlobInfoV1 {
permanent_total: Some(PermanentBlobInfoV1 { event, .. }),
permanent_total: Some(PermanentBlobInfo { event, .. }),
..
})) = &mut state1
else {
Expand Down
151 changes: 126 additions & 25 deletions crates/walrus-service/src/node/storage/blob_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! Keeping track of the status of blob IDs and on-chain `Blob` objects.

mod blob_info_v1;
mod blob_info_v2;
mod perm_blob_info;

use std::{
collections::HashSet,
Expand All @@ -29,11 +31,13 @@ use walrus_storage_node_client::api::BlobStatus;
use walrus_sui::types::{BlobCertified, BlobDeleted, BlobEvent, BlobRegistered, InvalidBlobId};

#[cfg(test)]
pub(crate) use self::blob_info_v1::PermanentBlobInfoV1;
pub(crate) use self::blob_info_v1::ValidBlobInfoV1;
use self::per_object_blob_info::PerObjectBlobInfoMergeOperand;
pub(crate) use self::{
blob_info_v1::{BlobInfoV1, ValidBlobInfoV1},
blob_info_v1::BlobInfoV1,
blob_info_v2::BlobInfoV2,
per_object_blob_info::{PerObjectBlobInfo, PerObjectBlobInfoApi},
perm_blob_info::PermanentBlobInfo,
};
use super::{DatabaseTableOptionsFactory, constants};
use crate::{
Expand Down Expand Up @@ -619,17 +623,42 @@ impl BlobInfoTable {
per-object blob info entry exists (object ID: {object_id})"
));
};
let BlobInfo::V1(BlobInfoV1::Valid(ValidBlobInfoV1 {

// Extract deletable counts for cross-checking (works for both V1 and V2).
let (
v1_blob,
count_deletable_total,
count_deletable_certified,
latest_seen_deletable_registered_end_epoch,
latest_seen_deletable_certified_end_epoch,
..
})) = blob_info
else {
continue;
) = match &blob_info {
BlobInfo::V1(BlobInfoV1::Valid(v)) => (
true,
v.count_deletable_total,
v.count_deletable_certified,
v.latest_seen_deletable_registered_end_epoch,
v.latest_seen_deletable_certified_end_epoch,
),
BlobInfo::V2(BlobInfoV2::Valid(v)) => {
anyhow::ensure!(
!matches!(blob_info, BlobInfo::V1(_)),
"V2 per-object blob info requires V2 aggregate blob info, but found V1"
);
(
false,
v.count_deletable_total,
v.count_deletable_certified,
// V2 doesn't track end epochs; skip epoch-based assertions.
None,
None,
)
}
_ => continue,
};
if per_object_blob_info.is_deletable() {

// Below checks the invariants of last seen epochs on deletable blobs, which is only
// relevant for V1 per-object entries.
if v1_blob && per_object_blob_info.is_deletable() {
let per_object_end_epoch = per_object_blob_info.end_epoch;
anyhow::ensure!(
count_deletable_total > 0,
Expand Down Expand Up @@ -672,18 +701,34 @@ impl BlobInfoTable {
"error encountered while iterating over aggregate blob info: {result:?}"
));
};
let BlobInfo::V1(BlobInfoV1::Valid(blob_info)) = blob_info else {
continue;
};
if !blob_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) {
return Err(anyhow::anyhow!(
"per-object blob info not found for blob ID {blob_id}, even though a \
valid aggregate blob info entry referencing objects exists: {blob_info:?}"
));
match &blob_info {
BlobInfo::V1(BlobInfoV1::Valid(v1_info)) => {
if !v1_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) {
return Err(anyhow::anyhow!(
"per-object blob info not found for blob ID {blob_id}, even though a \
valid V1 aggregate blob info entry referencing objects exists: \
{v1_info:?}"
));
}
v1_info.check_invariants().context(format!(
"aggregate blob info invariants violated for blob ID {blob_id}"
))?;
}
BlobInfo::V2(BlobInfoV2::Valid(v2_info)) => {
if !v2_info.has_no_objects() && !per_object_table_blob_ids.contains(&blob_id) {
return Err(anyhow::anyhow!(
"per-object blob info not found for blob ID {blob_id}, even though a \
valid V2 aggregate blob info entry referencing objects exists: \
{v2_info:?}"
));
}
v2_info.check_invariants().context(format!(
"aggregate blob info V2 invariants violated for blob ID {blob_id}"
))?;
}
// Invalid blob info.
_ => continue,
}
blob_info.check_invariants().context(format!(
"aggregate blob info invariants violated for blob ID {blob_id}"
))?;
}
Ok(())
}
Expand Down Expand Up @@ -898,6 +943,17 @@ pub(super) enum BlobStatusChangeType {
Delete { was_certified: bool },
}

/// Change info for storage pool blob events.
///
/// Unlike `BlobStatusChangeInfo`, this does not carry `end_epoch` because the lifetime of pool
/// blobs is determined by the pool's end_epoch (tracked in `storage_pool_end_epochs`), not by
/// the individual blob's end_epoch at registration time.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)]
pub(super) struct PooledBlobChangeInfo {
pub(super) epoch: Epoch,
pub(super) storage_pool_id: ObjectID,
}

trait ChangeTypeAndInfo {
fn change_type(&self) -> BlobStatusChangeType;
fn change_info(&self) -> BlobStatusChangeInfo;
Expand Down Expand Up @@ -976,6 +1032,16 @@ pub(super) enum BlobInfoMergeOperand {
PermanentExpired {
was_certified: bool,
},
/// A status change for a blob in a storage pool.
PooledBlobChangeStatus {
change_type: BlobStatusChangeType,
change_info: PooledBlobChangeInfo,
},
/// A blob in a storage pool has expired (pool lifetime ended).
PoolExpired {
storage_pool_id: ObjectID,
was_certified: bool,
},
}

impl ToBytes for BlobInfoMergeOperand {}
Expand Down Expand Up @@ -1085,6 +1151,7 @@ impl Ord for BlobCertificationStatus {
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)]
pub(crate) enum BlobInfo {
V1(BlobInfoV1),
V2(BlobInfoV2),
}

impl BlobInfo {
Expand All @@ -1098,8 +1165,6 @@ impl BlobInfo {
certified_epoch: Option<Epoch>,
invalidated_epoch: Option<Epoch>,
) -> Self {
use blob_info_v1::PermanentBlobInfoV1;

let blob_info = match status {
BlobCertificationStatus::Invalid => BlobInfoV1::Invalid {
epoch: invalidated_epoch
Expand All @@ -1108,8 +1173,7 @@ impl BlobInfo {
},

BlobCertificationStatus::Registered | BlobCertificationStatus::Certified => {
let permanent_total =
PermanentBlobInfoV1::new_first(end_epoch, current_status_event);
let permanent_total = PermanentBlobInfo::new_first(end_epoch, current_status_event);
let permanent_certified = matches!(status, BlobCertificationStatus::Certified)
.then(|| permanent_total.clone());
ValidBlobInfoV1 {
Expand All @@ -1133,12 +1197,49 @@ impl Mergeable for BlobInfo {

fn merge_with(self, operand: Self::MergeOperand) -> Self {
match self {
Self::V1(value) => Self::V1(value.merge_with(operand)),
Self::V1(v1) => match &operand {
// Only upgrade to V2 when a pool operand hits a V1 entry.
// This makes sure that all the storage nodes have consistent behavior despite of
// when they upgrade their nodes.
BlobInfoMergeOperand::PooledBlobChangeStatus { .. }
// Although it's impossible to have pool expired as the first pool event for a
// blob, the internal processing of pool expired will return error.
| BlobInfoMergeOperand::PoolExpired { .. } => {
Self::V2(BlobInfoV2::from(v1).merge_with(operand))
}
// Regular operands keep V1 as V1.
BlobInfoMergeOperand::MarkMetadataStored(_)
| BlobInfoMergeOperand::MarkInvalid { .. }
| BlobInfoMergeOperand::ChangeStatus { .. }
| BlobInfoMergeOperand::DeletableExpired { .. }
| BlobInfoMergeOperand::PermanentExpired { .. } => {
Self::V1(v1.merge_with(operand))
}
},
// V2 handles all operand types (regular AND pool).
Self::V2(v2) => Self::V2(v2.merge_with(operand)),
}
}

fn merge_new(operand: Self::MergeOperand) -> Option<Self> {
BlobInfoV1::merge_new(operand).map(Self::from)
match &operand {
// First event for a blob_id is a pool event → create V2.
BlobInfoMergeOperand::PooledBlobChangeStatus { .. } => {
BlobInfoV2::merge_new(operand).map(Self::V2)
}
// Pool expired is not a valid first event for a blob_id. Adding a branch here to
// account for possible race condition where pool expire event comes after an explicit
// blob delete event.
BlobInfoMergeOperand::PoolExpired { .. } => None,
// First event is a regular event → create V1 (as before).
BlobInfoMergeOperand::MarkMetadataStored(_)
| BlobInfoMergeOperand::MarkInvalid { .. }
| BlobInfoMergeOperand::ChangeStatus { .. }
| BlobInfoMergeOperand::DeletableExpired { .. }
| BlobInfoMergeOperand::PermanentExpired { .. } => {
BlobInfoV1::merge_new(operand).map(Self::V1)
}
}
}
}

Expand Down
Loading
Loading