feat: blob info V2 for tracking pooled blobs#3104
Conversation
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f3b70fa587
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
halfprice
left a comment
There was a problem hiding this comment.
For easier reviewing process, please take a look at the notes first.
| return Err(anyhow::anyhow!( | ||
| "error encountered while iterating over per-object blob info: {result:?}" | ||
| )); | ||
| let (object_id, per_object_blob_info) = match result { |
There was a problem hiding this comment.
To simply review: the change in this function is less important, and don't need much attention in the first pass.
| pub(crate) struct ValidBlobInfoV2 { | ||
| // Common fields for both regular and storage pool blobs. | ||
| pub is_metadata_stored: bool, | ||
| pub initial_certified_epoch: Option<Epoch>, | ||
|
|
||
| // Regular blob fields (same as V1). | ||
| pub count_deletable_total: u32, | ||
| pub count_deletable_certified: u32, | ||
| pub permanent_total: Option<PermanentBlobInfoV1>, | ||
| pub permanent_certified: Option<PermanentBlobInfoV1>, | ||
|
|
||
| // Storage pool references counters. | ||
| pub count_pooled_refs_total: u32, | ||
| pub count_pooled_refs_certified: u32, | ||
| } |
There was a problem hiding this comment.
Important!!!
Here defines the new aggregated blob info. Comparing to V1, we add count_pooled_refs_total and count_pooled_refs_certified, two counters to track reference count to pooled blob.
I removed latest_seen_deletable_registered_end_epoch and latest_seen_deletable_registered_end_epoch since these two fields anyway are best effort and does not provide any guarantee. I want V2 to be simpler to reason about.
| use BlobCertificationStatus::*; | ||
| impl ValidBlobInfoV2 { | ||
| /// Handles regular blob status changes (same logic as V1). | ||
| fn update_status( |
There was a problem hiding this comment.
note for reviewers: this function is the same as V1, with only minor difference (e.g., removed updating latest_seen_deletable_registered_end_epoch). I had a version where I create a common function and used in two places, and the diff looks very confusing. Also, I want to make sure that V1 merge op is always the same as before (so better not touch it).
| } | ||
| } | ||
|
|
||
| /// Trait defining methods for retrieving information about a blob object. |
There was a problem hiding this comment.
I hate this diff... This is not removed, nor changed. Newly added code made this diff ugly.
| // Note that at the beginning of the epoch before GC runs, newly expired deletable blob or | ||
| // pooled blob's certified counter may still be non-zero, but the blob is already expired. | ||
| // This will make the blob appears to be certified until GC finishes. | ||
| self.initial_certified_epoch | ||
| .is_some_and(|epoch| epoch <= current_epoch) | ||
| && (exists_certified_permanent_blob | ||
| || self.count_deletable_certified > 0 | ||
| || self.count_pooled_refs_certified > 0) |
| mod per_object_blob_info { | ||
| use super::*; | ||
|
|
||
| #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] | ||
| pub(crate) struct PerObjectBlobInfoMergeOperand { | ||
| pub change_type: BlobStatusChangeType, | ||
| pub change_info: BlobStatusChangeInfo, | ||
| } | ||
|
|
||
| impl ToBytes for PerObjectBlobInfoMergeOperand {} | ||
|
|
||
| impl PerObjectBlobInfoMergeOperand { | ||
| pub fn from_blob_info_merge_operand( | ||
| blob_info_merge_operand: BlobInfoMergeOperand, | ||
| ) -> Option<Self> { | ||
| let BlobInfoMergeOperand::ChangeStatus { | ||
| change_type, | ||
| change_info, | ||
| } = blob_info_merge_operand | ||
| else { | ||
| return None; | ||
| }; | ||
| Some(Self { | ||
| change_type, | ||
| change_info, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| impl<T: ChangeTypeAndInfo> From<&T> for PerObjectBlobInfoMergeOperand { | ||
| fn from(value: &T) -> Self { | ||
| Self { | ||
| change_type: value.change_type(), | ||
| change_info: value.change_info(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Trait defining methods for retrieving information about a blob object. | ||
| // NB: Before adding functions to this trait, think twice if you really need it as it needs to | ||
| // be implementable by future internal representations of the per-object blob status as well. | ||
| #[enum_dispatch] | ||
| #[allow(dead_code)] | ||
| pub(crate) trait PerObjectBlobInfoApi: CertifiedBlobInfoApi { | ||
| /// Returns the blob ID associated with this object. | ||
| fn blob_id(&self) -> BlobId; | ||
| /// Returns true iff the object is deletable. | ||
| fn is_deletable(&self) -> bool; | ||
| /// Returns true iff the object is not expired and not deleted. | ||
| fn is_registered(&self, current_epoch: Epoch) -> bool; | ||
| /// Returns true iff the object is already deleted. | ||
| fn is_deleted(&self) -> bool; | ||
| /// Returns the storage pool ID if this is a storage pool blob. | ||
| fn storage_pool_id(&self) -> Option<ObjectID>; | ||
| } | ||
|
|
||
| #[enum_dispatch(CertifiedBlobInfoApi)] | ||
| #[enum_dispatch(PerObjectBlobInfoApi)] | ||
| #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] | ||
| pub(crate) enum PerObjectBlobInfo { | ||
| V1(PerObjectBlobInfoV1), | ||
| V2(PerObjectBlobInfoV2), | ||
| } | ||
|
|
||
| impl PerObjectBlobInfo { | ||
| #[cfg(test)] | ||
| pub(crate) fn new_for_testing( | ||
| blob_id: BlobId, | ||
| registered_epoch: Epoch, | ||
| certified_epoch: Option<Epoch>, | ||
| end_epoch: Epoch, | ||
| deletable: bool, | ||
| event: EventID, | ||
| deleted: bool, | ||
| ) -> Self { | ||
| Self::V1(PerObjectBlobInfoV1 { | ||
| blob_id, | ||
| registered_epoch, | ||
| certified_epoch, | ||
| end_epoch, | ||
| deletable, | ||
| event, | ||
| deleted, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| impl ToBytes for PerObjectBlobInfo {} |
| fn merge_new(operand: Self::MergeOperand) -> Option<Self> { | ||
| // We never create PerObjectBlobInfoV2 via merge operator. This is because the | ||
| // PerObjectBlobInfoMergeOperand struct can only used for V1 for registration. | ||
| // So any newly created PerObjectBlobInfoV2 is directly inserted into the table. | ||
| // | ||
| // The certify and delete operation can be used in both V1 and V2, so merge_with above | ||
| // works for both. | ||
| PerObjectBlobInfoV1::merge_new(operand).map(Self::from) | ||
| } |
There was a problem hiding this comment.
Important: there is a complication in this PR, that we cannot extend PerObjectBlobInfoMergeOperand easily other than playing some deserialization tricks. This is because PerObjectBlobInfoMergeOperand is not a enum.
| #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)] | ||
| pub(crate) struct PerObjectBlobInfoV1 { | ||
| /// The blob ID. | ||
| pub blob_id: BlobId, | ||
| /// The epoch in which the blob has been registered. | ||
| pub registered_epoch: Epoch, | ||
| /// The epoch in which the blob was first certified, `None` if the blob is uncertified. | ||
| pub certified_epoch: Option<Epoch>, | ||
| /// The epoch in which the blob expires. | ||
| pub end_epoch: Epoch, | ||
| /// Whether the blob is deletable. | ||
| pub deletable: bool, | ||
| /// The ID of the last blob event related to this object. | ||
| pub event: EventID, | ||
| /// Whether the blob has been deleted. | ||
| pub deleted: bool, | ||
| } | ||
|
|
||
| impl CertifiedBlobInfoApi for PerObjectBlobInfoV1 { | ||
| fn is_certified(&self, current_epoch: Epoch) -> bool { | ||
| self.is_registered(current_epoch) | ||
| && self | ||
| .certified_epoch | ||
| .is_some_and(|epoch| epoch <= current_epoch) | ||
| } | ||
|
|
||
| fn initial_certified_epoch(&self) -> Option<Epoch> { | ||
| self.certified_epoch | ||
| } | ||
| } | ||
|
|
||
| impl PerObjectBlobInfoApi for PerObjectBlobInfoV1 { | ||
| fn blob_id(&self) -> BlobId { | ||
| self.blob_id | ||
| } | ||
|
|
||
| fn is_deletable(&self) -> bool { | ||
| self.deletable | ||
| } | ||
|
|
||
| fn is_registered(&self, current_epoch: Epoch) -> bool { | ||
| self.end_epoch > current_epoch && !self.deleted | ||
| } | ||
|
|
||
| fn is_deleted(&self) -> bool { | ||
| self.deleted | ||
| } | ||
|
|
||
| fn storage_pool_id(&self) -> Option<ObjectID> { | ||
| None | ||
| } | ||
| } | ||
|
|
||
| impl ToBytes for PerObjectBlobInfoV1 {} |
There was a problem hiding this comment.
all are existing code.
| pub(crate) struct PerObjectBlobInfoV2 { | ||
| /// The blob ID. | ||
| pub blob_id: BlobId, | ||
| /// The epoch in which the blob has been registered. | ||
| pub registered_epoch: Epoch, | ||
| /// The epoch in which the blob was first certified, `None` if the blob is uncertified. | ||
| pub certified_epoch: Option<Epoch>, | ||
| /// How the blob's end epoch is determined. | ||
| pub end_epoch_info: EndEpochInfo, | ||
| /// Whether the blob is deletable. | ||
| pub deletable: bool, | ||
| /// The ID of the last blob event related to this object. | ||
| pub event: EventID, | ||
| /// Whether the blob has been deleted. | ||
| pub deleted: bool, | ||
| } |
There was a problem hiding this comment.
Important: this defines per object blob info V2. The main difference with V1 is the end_epoch_info field, where it can be either an end epoch, or the object ID of the storage pool.
| impl Mergeable for PerObjectBlobInfoV1 { | ||
| type MergeOperand = PerObjectBlobInfoMergeOperand; | ||
| type Key = ObjectID; | ||
|
|
||
| fn merge_with( | ||
| mut self, | ||
| PerObjectBlobInfoMergeOperand { | ||
| change_type, | ||
| change_info, | ||
| }: PerObjectBlobInfoMergeOperand, | ||
| ) -> Self { | ||
| assert_eq!( | ||
| self.blob_id, change_info.blob_id, | ||
| "blob ID mismatch in merge operand" | ||
| ); | ||
| assert_eq!( | ||
| self.deletable, change_info.deletable, | ||
| "deletable mismatch in merge operand" | ||
| ); | ||
| assert!( | ||
| !self.deleted, | ||
| "attempt to update an already deleted blob {}", | ||
| self.blob_id | ||
| ); | ||
| self.event = change_info.status_event; | ||
| match change_type { | ||
| // We ensure that the blob info is only updated a single time for each event. So if | ||
| // we see a duplicated registered or certified event for the some object, this is a | ||
| // serious bug somewhere. | ||
| BlobStatusChangeType::Register => { | ||
| panic!( | ||
| "cannot register an already registered blob {}", | ||
| self.blob_id | ||
| ); | ||
| } | ||
| BlobStatusChangeType::Certify => { | ||
| assert!( | ||
| self.certified_epoch.is_none(), | ||
| "cannot certify an already certified blob {}", | ||
| self.blob_id | ||
| ); | ||
| self.certified_epoch = Some(change_info.epoch); | ||
| } | ||
| BlobStatusChangeType::Extend => { | ||
| assert!( | ||
| self.certified_epoch.is_some(), | ||
| "cannot extend an uncertified blob {}", | ||
| self.blob_id | ||
| ); | ||
| self.end_epoch = change_info.end_epoch; | ||
| } | ||
| BlobStatusChangeType::Delete { was_certified } => { | ||
| assert_eq!(self.certified_epoch.is_some(), was_certified); | ||
| self.deleted = true; | ||
| } | ||
| } | ||
| self | ||
| } | ||
|
|
||
| fn merge_new(operand: Self::MergeOperand) -> Option<Self> { | ||
| let PerObjectBlobInfoMergeOperand { | ||
| change_type: BlobStatusChangeType::Register, | ||
| change_info: | ||
| BlobStatusChangeInfo { | ||
| blob_id, | ||
| deletable, | ||
| epoch, | ||
| end_epoch, | ||
| status_event, | ||
| }, | ||
| } = operand | ||
| else { | ||
| tracing::error!( | ||
| ?operand, | ||
| "encountered an update other than 'register' for an untracked blob object" |
There was a problem hiding this comment.
All are existing code.
Description
Contribute to WAL-1162
Test plan
Release notes
Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.
For each box you select, include information after the relevant heading that describes the impact of your changes that
a user might notice and any actions they must take to implement updates. (Add release notes after the colon for each item)