Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2653,10 +2653,12 @@ impl StorageNodeInner {

/// Waits until the recovery deferral for the given blob ID expires
/// and cancels the recovery deferral upon timeout.
///
/// Returns `true` when a deferral existed and the caller waited for it.
pub async fn wait_until_recovery_deferral_expires(
&self,
blob_id: &BlobId,
) -> anyhow::Result<Arc<tokio_util::sync::CancellationToken>> {
) -> anyhow::Result<bool> {
let (until, token) = {
let map = self.recovery_deferrals.read().await;
if let Some((u, existing_token)) = map.get(blob_id).cloned() {
Expand All @@ -2681,7 +2683,7 @@ impl StorageNodeInner {
self.metrics.recovery_deferral_waiters.dec();
}

Ok(token)
Ok(until.is_some())
}

/// Returns the node capability object ID.
Expand Down
46 changes: 39 additions & 7 deletions crates/walrus-service/src/node/blob_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ use super::{
StorageNodeInner,
committee::CommitteeService,
contract_service::SystemContractService,
metrics::{self, NodeMetricSet, STATUS_IN_PROGRESS, STATUS_QUEUED},
metrics::{
self,
LIVE_UPLOAD_DEFERRAL_OUTCOME_AVOIDED_RECOVERY,
LIVE_UPLOAD_DEFERRAL_OUTCOME_RECOVERY_NEEDED,
NodeMetricSet,
STATUS_IN_PROGRESS,
STATUS_QUEUED,
},
storage::Storage,
system_events::{CompletableHandle, EventHandle},
};
Expand Down Expand Up @@ -418,7 +425,7 @@ impl BlobSyncHandler {
},

(guard, start) = async {
synchronizer.wait_for_recovery_window().await;
let waited_for_deferral = synchronizer.wait_for_recovery_window().await;

let active_start = tokio::time::Instant::now();

Expand All @@ -433,7 +440,9 @@ impl BlobSyncHandler {

let decrement_guard = GaugeGuard::acquire(&in_progress_gauge);

synchronizer.run(permits.sliver_pairs).await;
synchronizer
.run(permits.sliver_pairs, waited_for_deferral)
.await;

(decrement_guard, active_start)
} => {
Expand Down Expand Up @@ -545,22 +554,26 @@ impl BlobSynchronizer {
&self.node.metrics
}

async fn wait_for_recovery_window(&self) {
async fn wait_for_recovery_window(&self) -> bool {
match self
.node
.wait_until_recovery_deferral_expires(&self.blob_id)
.await
{
Ok(_token) => {
Ok(waited_for_deferral) => {
self.node.clear_recovery_deferral(&self.blob_id).await;
waited_for_deferral
}
Err(err) => {
tracing::warn!(?err, "failed to wait out recovery deferral");
false
}
Err(err) => tracing::warn!(?err, "failed to wait out recovery deferral"),
}
}

/// Runs the synchronizer until blob sync is complete.
#[tracing::instrument(skip_all)]
async fn run(self, sliver_permits: Arc<Semaphore>) {
async fn run(self, sliver_permits: Arc<Semaphore>, waited_for_deferral: bool) {
let this = Arc::new(self);
let histograms = &this.metrics().recover_blob_part_duration_seconds;

Expand All @@ -578,9 +591,28 @@ impl BlobSynchronizer {
tracing::debug!(
"blob already stored at all owned shards for current epoch, skipping recovery"
);
if waited_for_deferral {
this.node
.metrics
.live_upload_deferral_avoided_recovery_total
.inc();
walrus_utils::with_label!(
this.node.metrics.live_upload_deferral_outcome_total,
LIVE_UPLOAD_DEFERRAL_OUTCOME_AVOIDED_RECOVERY
)
.inc();
}
Comment on lines +594 to +604
Copy link
Collaborator

@shuowang12 shuowang12 Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if there is difference between live_upload_deferral_avoided_recovery_total and live_upload_deferral_outcome_total[LIVE_UPLOAD_DEFERRAL_OUTCOME_AVOIDED_RECOVERY].

return;
}

if waited_for_deferral {
walrus_utils::with_label!(
this.node.metrics.live_upload_deferral_outcome_total,
LIVE_UPLOAD_DEFERRAL_OUTCOME_RECOVERY_NEEDED
)
.inc();
}

let shared_metadata = this
.clone()
.recover_metadata()
Expand Down
9 changes: 9 additions & 0 deletions crates/walrus-service/src/node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub(crate) const STATUS_IN_PROGRESS: &str = "in-progress";
pub(crate) const STATUS_STARTED: &str = "started";
pub(crate) const STATUS_COMPLETED: &str = "completed";
pub(crate) const STATUS_HIGHEST_FINISHED: &str = "highest_finished";
pub(crate) const LIVE_UPLOAD_DEFERRAL_OUTCOME_AVOIDED_RECOVERY: &str = "avoided_recovery";
pub(crate) const LIVE_UPLOAD_DEFERRAL_OUTCOME_RECOVERY_NEEDED: &str = "recovery_needed";

type U64GaugeVec = GenericGaugeVec<AtomicU64>;
type U64Gauge = GenericGauge<AtomicU64>;
Expand Down Expand Up @@ -95,6 +97,13 @@ walrus_utils::metrics::define_metric_set! {
#[help = "The number of recovery tasks currently waiting for deferrals to expire"]
recovery_deferral_waiters: IntGauge[],

#[help = "Total number of blob recoveries avoided because data was already present after \
waiting for a live-upload deferral"]
live_upload_deferral_avoided_recovery_total: IntCounter[],

#[help = "Total number of waited live-upload deferrals by outcome"]
live_upload_deferral_outcome_total: IntCounterVec["outcome"],

#[help = "Total number of sliver instances returned"]
slivers_retrieved_total: IntCounterVec["sliver_type"],

Expand Down
Loading