Skip to content

Commit 0a14106

Browse files
committed
update pooled blob reference counting
1 parent bb38f32 commit 0a14106

6 files changed

Lines changed: 257 additions & 527 deletions

File tree

crates/walrus-service/src/node.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,13 +1576,7 @@ impl StorageNode {
15761576
.await?;
15771577
}
15781578
EventStreamElement::ContractEvent(ContractEvent::StoragePoolEvent(pool_event)) => {
1579-
self.process_storage_pool_event(
1580-
blob_event_processor,
1581-
event_handle,
1582-
pool_event,
1583-
checkpoint_position,
1584-
)
1585-
.await?;
1579+
self.process_storage_pool_event(event_handle, pool_event)?;
15861580
}
15871581
EventStreamElement::ContractEvent(ContractEvent::DenyListEvent(_event)) => {
15881582
// TODO: Implement DenyListEvent handling (WAL-424)
@@ -1624,19 +1618,29 @@ impl StorageNode {
16241618
}
16251619

16261620
#[tracing::instrument(skip_all)]
1627-
async fn process_storage_pool_event(
1621+
fn process_storage_pool_event(
16281622
&self,
1629-
blob_event_processor: &BlobEventProcessor,
16301623
event_handle: EventHandle,
16311624
event: StoragePoolEvent,
1632-
checkpoint_position: CheckpointEventPosition,
16331625
) -> anyhow::Result<()> {
16341626
let _scope = monitored_scope::monitored_scope("ProcessEvent::StoragePoolEvent");
16351627

16361628
tracing::debug!(?event, "{} event received", event.name());
1637-
blob_event_processor
1638-
.process_storage_pool_event(event_handle, event, checkpoint_position)
1639-
.await?;
1629+
match event {
1630+
StoragePoolEvent::StoragePoolCreated(ref created) => {
1631+
self.inner
1632+
.storage
1633+
.set_storage_pool_end_epoch(&created.storage_pool_id, created.end_epoch)
1634+
.context("failed to set storage pool end epoch")?;
1635+
}
1636+
StoragePoolEvent::StoragePoolExtended(ref extended) => {
1637+
self.inner
1638+
.storage
1639+
.set_storage_pool_end_epoch(&extended.storage_pool_id, extended.new_end_epoch)
1640+
.context("failed to update storage pool end epoch")?;
1641+
}
1642+
}
1643+
event_handle.mark_as_complete();
16401644
Ok(())
16411645
}
16421646

crates/walrus-service/src/node/blob_event_processor.rs

Lines changed: 6 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33

44
use std::{num::NonZeroUsize, sync::Arc};
55

6-
use anyhow::Context as _;
76
use sui_macros::fail_point_async;
87
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
98
use walrus_core::{BlobId, Epoch};
10-
use walrus_sui::types::{BlobEvent, InvalidBlobId, StoragePoolEvent};
9+
use walrus_sui::types::{BlobEvent, InvalidBlobId};
1110
use walrus_utils::metrics::monitored_scope;
1211

1312
use self::pending_events::{PendingEventCounter, PendingEventGuard};
@@ -416,8 +415,11 @@ impl BlobEventProcessor {
416415
.storage
417416
.update_blob_info(event_handle.index(), &blob_event)?;
418417

419-
if let BlobEvent::Registered(event) = &blob_event {
420-
self.handle_registered_event(event_handle, event.blob_id)
418+
if matches!(
419+
&blob_event,
420+
BlobEvent::Registered(_) | BlobEvent::PooledBlobRegistered(_)
421+
) {
422+
self.handle_registered_event(event_handle, blob_event.blob_id())
421423
.await?;
422424
return Ok(());
423425
}
@@ -464,78 +466,6 @@ impl BlobEventProcessor {
464466
Ok(())
465467
}
466468

467-
/// Processes a storage pool event.
468-
///
469-
/// For blob-related events (Registered, Certified, Deleted), this updates the blob info and
470-
/// storage pool tables synchronously, then delegates to the background processor for
471-
/// post-processing (same as regular blob events). For storage-level events (Created,
472-
/// Extended), only the storage pool end_epoch table is updated.
473-
pub(super) async fn process_storage_pool_event(
474-
&self,
475-
event_handle: EventHandle,
476-
event: StoragePoolEvent,
477-
checkpoint_position: CheckpointEventPosition,
478-
) -> anyhow::Result<()> {
479-
match event {
480-
StoragePoolEvent::StoragePoolCreated(ref created) => {
481-
self.node
482-
.storage
483-
.set_storage_pool_end_epoch(&created.storage_pool_id, created.end_epoch)
484-
.context("failed to set storage pool end epoch")?;
485-
event_handle.mark_as_complete();
486-
}
487-
StoragePoolEvent::PooledBlobRegistered(ref registered) => {
488-
self.node
489-
.storage
490-
.process_pooled_blob_registered(event_handle.index(), registered)
491-
.context("failed to process pooled blob registered")?;
492-
493-
self.handle_registered_event(event_handle, registered.blob_id)
494-
.await?;
495-
}
496-
StoragePoolEvent::PooledBlobCertified(ref certified) => {
497-
self.node
498-
.storage
499-
.process_pooled_blob_certified(event_handle.index(), certified)
500-
.context("failed to process pooled blob certified")?;
501-
502-
let blob_event = BlobEvent::PooledBlobCertified(certified.clone());
503-
self.dispatch_task(
504-
blob_event.blob_id(),
505-
BackgroundTask::ProcessEvent {
506-
event_handle,
507-
blob_event,
508-
checkpoint_position,
509-
},
510-
)?;
511-
}
512-
StoragePoolEvent::PooledBlobDeleted(ref deleted) => {
513-
self.node
514-
.storage
515-
.process_pooled_blob_deleted(event_handle.index(), deleted)
516-
.context("failed to process pooled blob deleted")?;
517-
518-
let blob_event = BlobEvent::PooledBlobDeleted(deleted.clone());
519-
self.dispatch_task(
520-
blob_event.blob_id(),
521-
BackgroundTask::ProcessEvent {
522-
event_handle,
523-
blob_event,
524-
checkpoint_position,
525-
},
526-
)?;
527-
}
528-
StoragePoolEvent::StoragePoolExtended(ref extended) => {
529-
self.node
530-
.storage
531-
.set_storage_pool_end_epoch(&extended.storage_pool_id, extended.new_end_epoch)
532-
.context("failed to update storage pool end epoch")?;
533-
event_handle.mark_as_complete();
534-
}
535-
}
536-
Ok(())
537-
}
538-
539469
pub fn get_pending_event_counter(&self) -> PendingEventCounter {
540470
self.background_pending_event_count.clone()
541471
}

crates/walrus-service/src/node/metrics.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,9 +377,6 @@ impl TelemetryLabel for StoragePoolEvent {
377377
fn label(&self) -> &'static str {
378378
match self {
379379
StoragePoolEvent::StoragePoolCreated(_) => "storage-pool-created",
380-
StoragePoolEvent::PooledBlobRegistered(_) => "pooled-blob-registered",
381-
StoragePoolEvent::PooledBlobCertified(_) => "pooled-blob-certified",
382-
StoragePoolEvent::PooledBlobDeleted(_) => "pooled-blob-deleted",
383380
StoragePoolEvent::StoragePoolExtended(_) => "storage-pool-extended",
384381
}
385382
}

crates/walrus-service/src/node/storage.rs

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -683,36 +683,6 @@ impl Storage {
683683
.set_storage_pool_end_epoch(storage_pool_id, end_epoch)
684684
}
685685

686-
/// Processes a `PooledBlobRegistered` event.
687-
pub(crate) fn process_pooled_blob_registered(
688-
&self,
689-
event_index: u64,
690-
event: &walrus_sui::types::PooledBlobRegistered,
691-
) -> Result<(), TypedStoreError> {
692-
self.blob_info
693-
.process_pooled_blob_registered(event_index, event)
694-
}
695-
696-
/// Processes a `PooledBlobCertified` event.
697-
pub(crate) fn process_pooled_blob_certified(
698-
&self,
699-
event_index: u64,
700-
event: &walrus_sui::types::PooledBlobCertified,
701-
) -> Result<(), TypedStoreError> {
702-
self.blob_info
703-
.process_pooled_blob_certified(event_index, event)
704-
}
705-
706-
/// Processes a `PooledBlobDeleted` event.
707-
pub(crate) fn process_pooled_blob_deleted(
708-
&self,
709-
event_index: u64,
710-
event: &walrus_sui::types::PooledBlobDeleted,
711-
) -> Result<(), TypedStoreError> {
712-
self.blob_info
713-
.process_pooled_blob_deleted(event_index, event)
714-
}
715-
716686
/// Returns the current event cursor and the next event index.
717687
#[tracing::instrument(skip_all)]
718688
pub fn get_event_cursor_and_next_index(

0 commit comments

Comments
 (0)