Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ use walrus_sdk::{
GENESIS_EPOCH,
PackageEvent,
ProtocolEvent,
StoragePoolEvent,
},
},
};
Expand Down Expand Up @@ -1574,6 +1575,9 @@ impl StorageNode {
self.process_package_event(event_handle, package_event)
.await?;
}
EventStreamElement::ContractEvent(ContractEvent::StoragePoolEvent(pool_event)) => {
self.process_storage_pool_event(event_handle, pool_event)?;
}
EventStreamElement::ContractEvent(ContractEvent::DenyListEvent(_event)) => {
// TODO: Implement DenyListEvent handling (WAL-424)
event_handle.mark_as_complete();
Expand All @@ -1583,13 +1587,6 @@ impl StorageNode {
)) => {
event_handle.mark_as_complete();
}
EventStreamElement::ContractEvent(ContractEvent::StoragePoolEvent(event)) => {
// TODO(WAL-1162): implement storage pool event processing on storage nodes.
panic!(
"storage pool event processing is not yet implemented: {:?}",
event.name()
);
}
EventStreamElement::ContractEvent(ContractEvent::ProtocolEvent(event)) => {
panic!(
"unexpected protocol version update: {:?}",
Expand Down Expand Up @@ -1620,6 +1617,33 @@ impl StorageNode {
Ok(())
}

#[tracing::instrument(skip_all)]
fn process_storage_pool_event(
&self,
event_handle: EventHandle,
event: StoragePoolEvent,
) -> anyhow::Result<()> {
let _scope = monitored_scope::monitored_scope("ProcessEvent::StoragePoolEvent");

tracing::debug!(?event, "{} event received", event.name());
match event {
StoragePoolEvent::StoragePoolCreated(ref created) => {
self.inner
.storage
.set_storage_pool_end_epoch(&created.storage_pool_id, created.end_epoch)
.context("failed to set storage pool end epoch")?;
}
StoragePoolEvent::StoragePoolExtended(ref extended) => {
self.inner
.storage
.set_storage_pool_end_epoch(&extended.storage_pool_id, extended.new_end_epoch)
.context("failed to update storage pool end epoch")?;
}
}
event_handle.mark_as_complete();
Ok(())
}

#[tracing::instrument(skip_all)]
async fn process_epoch_change_event(
&self,
Expand Down Expand Up @@ -3273,11 +3297,16 @@ impl StorageNodeInner {
}

fn is_blob_registered(&self, blob_id: &BlobId) -> Result<bool, anyhow::Error> {
Ok(self
let epoch = self.current_committee_epoch();
if self
.storage
.get_blob_info(blob_id)
.context("could not retrieve blob info")?
.is_some_and(|blob_info| blob_info.is_registered(self.current_committee_epoch())))
.is_some_and(|blob_info| blob_info.is_registered(epoch))
{
return Ok(true);
}
Ok(false)
}

fn notify_registration(&self, blob_id: &BlobId) {
Expand Down Expand Up @@ -3321,11 +3350,16 @@ impl StorageNodeInner {
}

fn is_blob_certified(&self, blob_id: &BlobId) -> Result<bool, anyhow::Error> {
Ok(self
let epoch = self.current_committee_epoch();
if self
.storage
.get_blob_info(blob_id)
.context("could not retrieve blob info")?
.is_some_and(|blob_info| blob_info.is_certified(self.current_committee_epoch())))
.is_some_and(|blob_info| blob_info.is_certified(epoch))
{
return Ok(true);
}
Ok(false)
}

/// Returns true if the blob is currently not certified.
Expand Down
95 changes: 61 additions & 34 deletions crates/walrus-service/src/node/blob_event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{num::NonZeroUsize, sync::Arc};

use sui_macros::fail_point_async;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use walrus_core::BlobId;
use walrus_sui::types::{BlobCertified, BlobDeleted, BlobEvent, BlobRegistered, InvalidBlobId};
use walrus_core::{BlobId, Epoch};
use walrus_sui::types::{BlobEvent, InvalidBlobId};
use walrus_utils::metrics::monitored_scope;

use self::pending_events::{PendingEventCounter, PendingEventGuard};
Expand Down Expand Up @@ -119,12 +119,19 @@ impl BackgroundEventProcessor {
match blob_event {
BlobEvent::Certified(event) => {
let _scope = monitored_scope::monitored_scope("ProcessEvent::BlobEvent::Certified");
self.process_blob_certified_event(event_handle, event, checkpoint_position)
.await?;
self.process_blob_certified_event(
event_handle,
event.blob_id,
event.epoch,
event.is_extension,
checkpoint_position,
)
.await?;
}
BlobEvent::Deleted(event) => {
let _scope = monitored_scope::monitored_scope("ProcessEvent::BlobEvent::Deleted");
self.process_blob_deleted_event(event_handle, event).await?;
self.process_blob_deleted_event(event_handle, event.blob_id, event.epoch)
.await?;
}
BlobEvent::InvalidBlobID(event) => {
let _scope =
Expand All @@ -135,7 +142,26 @@ impl BackgroundEventProcessor {
// TODO (WAL-424): Implement DenyListBlobDeleted event handling.
todo!("DenyListBlobDeleted event handling is not yet implemented");
}
BlobEvent::Registered(_) => {
BlobEvent::PooledBlobCertified(event) => {
let _scope = monitored_scope::monitored_scope(
"ProcessEvent::BlobEvent::PooledBlobCertified",
);
self.process_blob_certified_event(
event_handle,
event.blob_id,
event.epoch,
false,
checkpoint_position,
)
.await?;
}
BlobEvent::PooledBlobDeleted(event) => {
let _scope =
monitored_scope::monitored_scope("ProcessEvent::BlobEvent::PooledBlobDeleted");
self.process_blob_deleted_event(event_handle, event.blob_id, event.epoch)
.await?;
}
BlobEvent::Registered(_) | BlobEvent::PooledBlobRegistered(_) => {
unreachable!("registered event should be processed immediately");
}
}
Expand All @@ -148,7 +174,9 @@ impl BackgroundEventProcessor {
async fn process_blob_certified_event(
&self,
event_handle: EventHandle,
event: BlobCertified,
blob_id: BlobId,
epoch: Epoch,
is_extension: bool,
checkpoint_position: CheckpointEventPosition,
) -> anyhow::Result<()> {
let start = tokio::time::Instant::now();
Expand All @@ -166,28 +194,28 @@ impl BackgroundEventProcessor {
sui_macros::fail_point_if!(
"skip_non_extension_certified_event_triggered_blob_sync",
|| {
skip_blob_sync_in_test = !event.is_extension;
skip_blob_sync_in_test = !is_extension;
}
);

if skip_blob_sync_in_test
|| !self.node.is_blob_certified(&event.blob_id)?
|| !self.node.is_blob_certified(&blob_id)?
|| self.node.storage.node_status()?.is_catching_up()
|| (current_event_epoch.is_some()
&& self
.node
.is_stored_at_all_shards_at_epoch(
&event.blob_id,
&blob_id,
current_event_epoch.expect("just checked that current event epoch is set"),
)
.await?)
{
event_handle.mark_as_complete();

tracing::debug!(
%event.blob_id,
%event.epoch,
%event.is_extension,
%blob_id,
%epoch,
%is_extension,
"skipping syncing blob during certified event processing",
);

Expand All @@ -201,14 +229,14 @@ impl BackgroundEventProcessor {

self.node
.maybe_apply_live_upload_deferral(
event.blob_id,
blob_id,
checkpoint_position.checkpoint_sequence_number,
)
.await;

// Slivers and (possibly) metadata are not stored, so initiate blob sync.
self.blob_sync_handler
.start_sync(event.blob_id, event.epoch, Some(event_handle))
.start_sync(blob_id, epoch, Some(event_handle))
.await?;

walrus_utils::with_label!(histogram_set, metrics::STATUS_QUEUED)
Expand All @@ -220,14 +248,14 @@ impl BackgroundEventProcessor {
/// Processes a blob deleted event.
#[tracing::instrument(
skip_all,
fields(walrus.blob_id = %event.blob_id, walrus.epoch = tracing::field::Empty),
fields(walrus.blob_id = %blob_id, walrus.epoch = tracing::field::Empty),
)]
async fn process_blob_deleted_event(
&self,
event_handle: EventHandle,
event: BlobDeleted,
blob_id: BlobId,
epoch: Epoch,
) -> anyhow::Result<()> {
let blob_id = event.blob_id;
let current_committee_epoch = self.node.current_committee_epoch();
tracing::Span::current().record("walrus.epoch", current_committee_epoch);

Expand All @@ -246,13 +274,13 @@ impl BackgroundEventProcessor {
// *Important*: We use the event's epoch for this check (as opposed to the current
// epoch) as subsequent certify or delete events may update the `blob_info`; so we
// cannot remove it even if it is no longer valid in the *current* epoch
if blob_info.can_data_be_deleted(event.epoch)
if blob_info.can_data_be_deleted(epoch)
&& self.node.garbage_collection_config.enable_data_deletion
{
tracing::debug!("deleting data for deleted blob");
self.node
.storage
.attempt_to_delete_blob_data(&blob_id, event.epoch, &self.node.metrics)
.attempt_to_delete_blob_data(&blob_id, epoch, &self.node.metrics)
.await?;
}
} else if self
Expand Down Expand Up @@ -387,8 +415,12 @@ impl BlobEventProcessor {
.storage
.update_blob_info(event_handle.index(), &blob_event)?;

if let BlobEvent::Registered(event) = &blob_event {
self.handle_registered_event(event_handle, event).await?;
if matches!(
&blob_event,
BlobEvent::Registered(_) | BlobEvent::PooledBlobRegistered(_)
) {
self.handle_registered_event(event_handle, blob_event.blob_id())
.await?;
return Ok(());
}

Expand All @@ -406,7 +438,7 @@ impl BlobEventProcessor {
async fn handle_registered_event(
&self,
event_handle: EventHandle,
event: &BlobRegistered,
blob_id: BlobId,
) -> anyhow::Result<()> {
// Registered event is marked as complete immediately. We need to process registered events
// as fast as possible to catch up to the latest event in order to not miss blob sliver
Expand All @@ -417,22 +449,17 @@ impl BlobEventProcessor {
// registered events.
let _scope = monitored_scope::monitored_scope("ProcessEvent::BlobEvent::Registered");

self.node.notify_registration(&event.blob_id);
self.node.notify_registration(&blob_id);

if let Err(error) = self.dispatch_task(
event.blob_id,
BackgroundTask::FlushPendingCaches {
blob_id: event.blob_id,
},
) {
if let Err(error) =
self.dispatch_task(blob_id, BackgroundTask::FlushPendingCaches { blob_id })
{
tracing::error!(
blob_id = %event.blob_id,
%blob_id,
?error,
"failed to enqueue cache flush after registration, flushing inline",
);
self.node
.flush_pending_caches_with_logging(event.blob_id)
.await;
self.node.flush_pending_caches_with_logging(blob_id).await;
}

event_handle.mark_as_complete();
Expand Down
6 changes: 3 additions & 3 deletions crates/walrus-service/src/node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ impl TelemetryLabel for BlobEvent {
BlobEvent::Deleted(_) => "deleted",
BlobEvent::InvalidBlobID(_) => "invalid-blob",
BlobEvent::DenyListBlobDeleted(_) => "deny-list-deleted",
BlobEvent::PooledBlobRegistered(_) => "pooled-blob-registered",
BlobEvent::PooledBlobCertified(_) => "pooled-blob-certified",
BlobEvent::PooledBlobDeleted(_) => "pooled-blob-deleted",
}
}
}
Expand Down Expand Up @@ -374,9 +377,6 @@ impl TelemetryLabel for StoragePoolEvent {
fn label(&self) -> &'static str {
match self {
StoragePoolEvent::StoragePoolCreated(_) => "storage-pool-created",
StoragePoolEvent::PooledBlobRegistered(_) => "pooled-blob-registered",
StoragePoolEvent::PooledBlobCertified(_) => "pooled-blob-certified",
StoragePoolEvent::PooledBlobDeleted(_) => "pooled-blob-deleted",
StoragePoolEvent::StoragePoolExtended(_) => "storage-pool-extended",
}
}
Expand Down
13 changes: 13 additions & 0 deletions crates/walrus-service/src/node/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ impl Storage {
}

/// Returns the highest epoch for which garbage collection was completed.
#[allow(dead_code)]
pub(crate) fn garbage_collector_last_completed_epoch(&self) -> Result<Epoch, TypedStoreError> {
Ok(self
.garbage_collector_table
Expand Down Expand Up @@ -670,6 +671,18 @@ impl Storage {
self.blob_info.get_per_object_info(object_id)
}

// === Storage Pool Accessors ===

/// Sets the end epoch for a storage pool.
pub(crate) fn set_storage_pool_end_epoch(
&self,
storage_pool_id: &ObjectID,
end_epoch: Epoch,
) -> Result<(), TypedStoreError> {
self.blob_info
.set_storage_pool_end_epoch(storage_pool_id, end_epoch)
}

/// Returns the current event cursor and the next event index.
#[tracing::instrument(skip_all)]
pub fn get_event_cursor_and_next_index(
Expand Down
Loading
Loading