Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions crates/walrus-service/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2651,12 +2651,16 @@ impl StorageNodeInner {
&self.encoding_config
}

/// Waits until the recovery deferral for the given blob ID expires
/// and cancels the recovery deferral upon timeout.
/// Waits until the recovery deferral for the given blob ID is cancelled or expires.
///
/// This method only waits; it does not remove the deferral entry, which is cleaned up by
/// `clear_recovery_deferral` or the background cleanup task.
///
/// Returns `true` when a deferral existed and the caller waited for it.
pub async fn wait_until_recovery_deferral_expires(
&self,
blob_id: &BlobId,
) -> anyhow::Result<Arc<tokio_util::sync::CancellationToken>> {
) -> anyhow::Result<bool> {
let (until, token) = {
Comment thread
sadhansood marked this conversation as resolved.
let map = self.recovery_deferrals.read().await;
if let Some((u, existing_token)) = map.get(blob_id).cloned() {
Expand All @@ -2681,7 +2685,7 @@ impl StorageNodeInner {
self.metrics.recovery_deferral_waiters.dec();
}

Ok(token)
Ok(until.is_some())
Comment thread
sadhansood marked this conversation as resolved.
}

/// Returns the node capability object ID.
Expand Down Expand Up @@ -4668,6 +4672,59 @@ mod tests {
.expect("storage node creation in setup should not fail")
}

#[tokio::test]
async fn wait_until_recovery_deferral_expires_returns_false_without_deferral() -> TestResult {
let _lock = global_test_lock().lock().await;
let node = StorageNodeHandle::builder().build().await?;

let waited = node
.as_ref()
.inner
.wait_until_recovery_deferral_expires(&random_blob_id())
.await?;

assert!(!waited);

Ok(())
}

#[tokio::test]
async fn wait_until_recovery_deferral_expires_returns_true_with_deferral() -> TestResult {
let _lock = global_test_lock().lock().await;
let node = StorageNodeHandle::builder().build().await?;
let inner = node.as_ref().inner.clone();
let blob_id = random_blob_id();

inner.recovery_deferrals.write().await.insert(
blob_id,
(
std::time::Instant::now() + Duration::from_secs(60),
Arc::new(tokio_util::sync::CancellationToken::new()),
),
);
inner.update_recovery_deferral_size_metric(1);

let wait_task = tokio::spawn({
let inner = inner.clone();
async move { inner.wait_until_recovery_deferral_expires(&blob_id).await }
});

retry_until_success_or_timeout(TIMEOUT, || async {
if inner.metrics.recovery_deferral_waiters.get() == 1 {
Ok(())
} else {
Err(())
}
})
.await
.expect("waiter should start waiting on the deferral");
inner.clear_recovery_deferral(&blob_id).await;

assert!(wait_task.await??);

Ok(())
}

mod get_storage_confirmation {
use fastcrypto::traits::VerifyingKey;

Expand Down
62 changes: 53 additions & 9 deletions crates/walrus-service/src/node/blob_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ use super::{
StorageNodeInner,
committee::CommitteeService,
contract_service::SystemContractService,
metrics::{self, NodeMetricSet, STATUS_IN_PROGRESS, STATUS_QUEUED},
metrics::{
self,
LIVE_UPLOAD_DEFERRAL_OUTCOME_AVOIDED_RECOVERY,
LIVE_UPLOAD_DEFERRAL_OUTCOME_RECOVERY_NEEDED,
NodeMetricSet,
STATUS_IN_PROGRESS,
STATUS_QUEUED,
},
storage::Storage,
system_events::{CompletableHandle, EventHandle},
};
Expand Down Expand Up @@ -418,7 +425,8 @@ impl BlobSyncHandler {
},

(guard, start) = async {
synchronizer.wait_for_recovery_window().await;
let waited_for_live_upload_deferral =
synchronizer.wait_for_recovery_window().await;

let active_start = tokio::time::Instant::now();

Expand All @@ -433,7 +441,9 @@ impl BlobSyncHandler {

let decrement_guard = GaugeGuard::acquire(&in_progress_gauge);

synchronizer.run(permits.sliver_pairs).await;
synchronizer
.run(permits.sliver_pairs, waited_for_live_upload_deferral)
.await;

(decrement_guard, active_start)
} => {
Expand Down Expand Up @@ -545,22 +555,27 @@ impl BlobSynchronizer {
&self.node.metrics
}

async fn wait_for_recovery_window(&self) {
/// Returns whether this sync observed and waited out a live-upload recovery deferral.
async fn wait_for_recovery_window(&self) -> bool {
match self
.node
.wait_until_recovery_deferral_expires(&self.blob_id)
.await
{
Ok(_token) => {
Ok(waited_for_live_upload_deferral) => {
self.node.clear_recovery_deferral(&self.blob_id).await;
waited_for_live_upload_deferral
}
Err(err) => {
tracing::warn!(?err, "failed to wait out recovery deferral");
false
}
Err(err) => tracing::warn!(?err, "failed to wait out recovery deferral"),
}
}

/// Runs the synchronizer until blob sync is complete.
#[tracing::instrument(skip_all)]
async fn run(self, sliver_permits: Arc<Semaphore>) {
async fn run(self, sliver_permits: Arc<Semaphore>, waited_for_live_upload_deferral: bool) {
let this = Arc::new(self);
let histograms = &this.metrics().recover_blob_part_duration_seconds;

Expand All @@ -569,18 +584,47 @@ impl BlobSynchronizer {
.current_event_epoch()
.await
.expect("current event epoch should be set");
if this
let storage_state_after_deferral = match this
.node
.is_stored_at_all_shards_at_epoch(&this.blob_id, current_epoch)
.await
.unwrap_or(false)
{
Ok(is_stored) => Some(is_stored),
Err(error) => {
tracing::warn!(
%this.blob_id,
?error,
"failed to determine whether recovery is still needed after waiting for \
live-upload deferral"
);
None
}
};

if storage_state_after_deferral == Some(true) {
tracing::debug!(
"blob already stored at all owned shards for current epoch, skipping recovery"
);
if waited_for_live_upload_deferral {
walrus_utils::with_label!(
this.node.metrics.live_upload_deferral_outcome_total,
LIVE_UPLOAD_DEFERRAL_OUTCOME_AVOIDED_RECOVERY
)
.inc();
}
return;
}

// Only record an outcome when a live-upload deferral actually existed and the post-wait
// storage check completed successfully.
if waited_for_live_upload_deferral && storage_state_after_deferral == Some(false) {
walrus_utils::with_label!(
this.node.metrics.live_upload_deferral_outcome_total,
LIVE_UPLOAD_DEFERRAL_OUTCOME_RECOVERY_NEEDED
)
.inc();
}

let shared_metadata = this
.clone()
.recover_metadata()
Expand Down
6 changes: 6 additions & 0 deletions crates/walrus-service/src/node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub(crate) const STATUS_IN_PROGRESS: &str = "in-progress";
pub(crate) const STATUS_STARTED: &str = "started";
pub(crate) const STATUS_COMPLETED: &str = "completed";
pub(crate) const STATUS_HIGHEST_FINISHED: &str = "highest_finished";
pub(crate) const LIVE_UPLOAD_DEFERRAL_OUTCOME_AVOIDED_RECOVERY: &str = "avoided_recovery";
pub(crate) const LIVE_UPLOAD_DEFERRAL_OUTCOME_RECOVERY_NEEDED: &str = "recovery_needed";

type U64GaugeVec = GenericGaugeVec<AtomicU64>;
type U64Gauge = GenericGauge<AtomicU64>;
Expand Down Expand Up @@ -95,6 +97,10 @@ walrus_utils::metrics::define_metric_set! {
#[help = "The number of recovery tasks currently waiting for deferrals to expire"]
recovery_deferral_waiters: IntGauge[],

#[help = "Total number of live-upload deferrals that were waited out, by whether recovery \
was still needed afterward"]
live_upload_deferral_outcome_total: IntCounterVec["outcome"],

#[help = "Total number of sliver instances returned"]
slivers_retrieved_total: IntCounterVec["sliver_type"],

Expand Down
Loading