diff --git a/crates/broker/src/config.rs b/crates/broker/src/config.rs index 8985d1edb..856c148a0 100644 --- a/crates/broker/src/config.rs +++ b/crates/broker/src/config.rs @@ -283,6 +283,11 @@ pub struct MarketConf { /// - "lock_cycle_price": Process lock-and-fulfill orders by highest ETH price per cycle, then fulfill-after-lock-expire randomly #[serde(default, alias = "expired_order_fulfillment_priority")] pub order_commitment_priority: OrderCommitmentPriority, + /// Whether to cancel Bento proving sessions when the order is no longer actionable + /// If false (default), Bento proving continues even if the order cannot be fulfilled in the + /// market. This should remain false to avoid losing partial PoVW jobs. + #[serde(default)] + pub cancel_proving_expired_orders: bool, } impl Default for MarketConf { @@ -321,6 +326,7 @@ impl Default for MarketConf { max_concurrent_preflights: defaults::max_concurrent_preflights(), order_pricing_priority: OrderPricingPriority::default(), order_commitment_priority: OrderCommitmentPriority::default(), + cancel_proving_expired_orders: false, } } } diff --git a/crates/broker/src/proving.rs b/crates/broker/src/proving.rs index 5d88f9652..204529b00 100644 --- a/crates/broker/src/proving.rs +++ b/crates/broker/src/proving.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::pending; use std::time::Duration; use crate::{ @@ -37,11 +36,11 @@ pub enum ProvingErr { #[error("{code} Proving failed after retries: {0:#}", code = self.code())] ProvingFailed(anyhow::Error), - #[error("{code} Request fulfilled by another prover", code = self.code())] - ExternallyFulfilled, + #[error("{code} Order not actionable, should cancel: {0}", code = self.code())] + ShouldCancel(&'static str), - #[error("{code} Proving timed out", code = self.code())] - ProvingTimedOut, + #[error("{code} Proof completed but order not actionable: {0}", code = self.code())] + CompletedNotActionable(&'static str), #[error("{code} Unexpected error: {0:#}", code = self.code())] UnexpectedError(#[from] anyhow::Error), @@ -53,8 +52,8 @@ impl CodedError for ProvingErr { fn code(&self) -> &str { match self { ProvingErr::ProvingFailed(_) => "[B-PRO-501]", - ProvingErr::ExternallyFulfilled => "[B-PRO-502]", - ProvingErr::ProvingTimedOut => "[B-PRO-503]", + ProvingErr::ShouldCancel(_) => "[B-PRO-502]", + ProvingErr::CompletedNotActionable(_) => "[B-PRO-503]", ProvingErr::UnexpectedError(_) => "[B-PRO-500]", } } @@ -80,18 +79,6 @@ impl ProvingService { Ok(Self { db, prover, config, order_state_tx, priority_requestors }) } - async fn cancel_stark_session(&self, proof_id: &str, order_id: &str, reason: &str) { - if let Err(err) = self.prover.cancel_stark(proof_id).await { - tracing::warn!( - "Failed to cancel proof {} for {} order {}: {}", - proof_id, - reason, - order_id, - err - ); - } - } - async fn monitor_proof_internal( &self, order_id: &str, @@ -208,42 +195,46 @@ impl ProvingService { let proof_id = order.proof_id.as_ref().context("Order should have proof ID")?; + // Check config: should we cancel jobs when orders become not actionable? + let should_cancel_on_not_actionable = + self.config.lock_all().map(|c| c.market.cancel_proving_expired_orders).unwrap_or(false); + let timeout_duration = { - let expiry_timestamp_secs = - order.expire_timestamp.expect("Order should have expiry set"); + let expiry_timestamp_secs = order.request.expires_at(); let now = now_timestamp(); Duration::from_secs(expiry_timestamp_secs.saturating_sub(now)) }; - // Only subscribe to order state events for FulfillAfterLockExpire orders - let mut order_state_rx = if matches!( - order.fulfillment_type, - FulfillmentType::FulfillAfterLockExpire - ) { - let rx = self.order_state_tx.subscribe(); + // Track whether order is not actionable (expired or fulfilled externally) + let mut not_actionable_reason: Option<&'static str> = None; + + let mut order_state_rx = self.order_state_tx.subscribe(); + if matches!(order.fulfillment_type, FulfillmentType::FulfillAfterLockExpire) + || order.expire_timestamp.unwrap() < now_timestamp() + { // Check if the order has already been fulfilled before starting proof match self.db.is_request_fulfilled(request_id).await { Ok(true) => { tracing::debug!( - "Order {} (request {}) was already fulfilled, skipping proof", + "Order {} (request {}) was already fulfilled before monitoring started", order_id, request_id ); - self.cancel_stark_session(proof_id, &order_id, "externally fulfilled").await; - return Err(ProvingErr::ExternallyFulfilled); + if should_cancel_on_not_actionable { + return Err(ProvingErr::ShouldCancel("Already fulfilled")); + } else { + not_actionable_reason = Some("Already fulfilled"); + } } - Ok(false) => Some(rx), + Ok(false) => {} Err(e) => { tracing::warn!( "Failed to check fulfillment status for order {}, will continue proving: {e:?}", order_id, ); - Some(rx) } } - } else { - None - }; + } let monitor_task = self.monitor_proof_internal( &order_id, @@ -253,10 +244,6 @@ impl ProvingService { ); tokio::pin!(monitor_task); - // Note: this timeout may not exactly match the order expiry exactly due to - // discrepancy between wall clock and monotonic clock from the timeout, - // but this time, along with aggregation and submission time, should never - // exceed the actual order expiry. let timeout_future = tokio::time::sleep(timeout_duration); tokio::pin!(timeout_future); @@ -264,37 +251,76 @@ impl ProvingService { tokio::select! { // Proof monitoring completed res = &mut monitor_task => { - break res.with_context(|| { + let status = res.with_context(|| { format!("Monitoring proof failed for order {order_id}, proof_id: {proof_id}") }).map_err(ProvingErr::ProvingFailed)?; + + // If order not actionable, return error instead of success + if let Some(reason) = not_actionable_reason { + tracing::info!( + "Proof completed for order {} but order not actionable: {}", + order_id, + reason + ); + return Err(ProvingErr::CompletedNotActionable(reason)); + } + + break status; } - // Timeout occurred + + // Request expiry timeout _ = &mut timeout_future => { - tracing::debug!( - "Proving timed out for order {}, cancelling proof {}", - order_id, - proof_id - ); - self.cancel_stark_session(proof_id, &order_id, "timed out").await; - return Err(ProvingErr::ProvingTimedOut); - } - // External fulfillment notification (only active for FulfillAfterLockExpire orders) - Some(recv_res) = async { - match &mut order_state_rx { - Some(rx) => Some(rx.recv().await), - None => pending::>>().await, + tracing::debug!("Order {order_id} expired during proving"); + + if should_cancel_on_not_actionable { + return Err(ProvingErr::ShouldCancel("Order expired")); + } else { + tracing::debug!("Waiting for proof completion for capacity tracking"); + not_actionable_reason = Some("Order expired"); + // Disarm timeout so it doesn't fire again + timeout_future.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(365 * 24 * 60 * 60)); } - } => { + } + + // External fulfillment notification + recv_res = order_state_rx.recv() => { match recv_res { Ok(OrderStateChange::Fulfilled { request_id: fulfilled_request_id }) if fulfilled_request_id == request_id => { - tracing::debug!( - "Order {} (request {}) was fulfilled by another prover, cancelling proof {}", - order_id, - request_id, - proof_id - ); - self.cancel_stark_session(proof_id, &order_id, "externally fulfilled").await; - return Err(ProvingErr::ExternallyFulfilled); + // Determine if this makes the order not actionable based on order type + let is_not_actionable = match order.fulfillment_type { + FulfillmentType::FulfillAfterLockExpire => { + // Always not actionable - someone else fulfilled it + true + } + FulfillmentType::LockAndFulfill => { + // Only not actionable if lock already expired + now_timestamp() >= order.request.lock_expires_at() + } + FulfillmentType::FulfillWithoutLocking => { + // Always not actionable - someone else fulfilled it + true + } + }; + + if is_not_actionable { + tracing::debug!( + "Order {} (request {}) fulfilled externally and not actionable", + order_id, + request_id + ); + + if should_cancel_on_not_actionable { + return Err(ProvingErr::ShouldCancel("Externally fulfilled")); + } else { + tracing::debug!("Waiting for proof completion for capacity tracking"); + not_actionable_reason = Some("Externally fulfilled"); + } + } else { + tracing::trace!( + "Order {} fulfilled externally but lock not expired yet, continuing", + order_id + ); + } } Ok(_) => { // Fulfillment for a different request, continue monitoring @@ -356,9 +382,17 @@ impl ProvingService { tracing::error!("Failed to set aggregation status for order {order_id}: {e:?}"); } } - Err(ProvingErr::ExternallyFulfilled) => { - tracing::info!("Order {order_id} was fulfilled by another prover, cancelled proof"); - handle_order_failure(&self.db, &order_id, "Externally fulfilled").await; + Err(ProvingErr::ShouldCancel(reason)) => { + tracing::info!("Order {order_id} not actionable, cancelling proof: {reason}"); + // Config says to cancel - release capacity immediately + cancel_proof_and_fail_order(&self.prover, &self.db, &self.config, &order, reason) + .await; + } + Err(ProvingErr::CompletedNotActionable(reason)) => { + tracing::info!( + "Order {order_id} proof completed but order not actionable: {reason}" + ); + handle_order_failure(&self.db, &order_id, reason).await; } Err(err) => { tracing::error!( @@ -378,31 +412,17 @@ impl ProvingService { self.db.get_active_proofs().await.context("Failed to get active proofs")?; tracing::info!("Found {} proofs currently proving", current_proofs.len()); - let now = crate::now_timestamp(); for order in current_proofs { let order_id = order.id(); - if order.expire_timestamp.unwrap() < now { - tracing::warn!("Order {} had expired on proving task start", order_id); - cancel_proof_and_fail_order( - &self.prover, - &self.db, - &order, - "Order expired on startup", - ) - .await; - } - let prove_serv = self.clone(); if order.proof_id.is_none() { tracing::error!("Order in status Proving missing proof_id: {order_id}"); - handle_order_failure(&prove_serv.db, &order_id, "Proving status missing proof_id") - .await; + handle_order_failure(&self.db, &order_id, "Proving status missing proof_id").await; continue; } - // TODO: Manage these tasks in a joinset? - // They should all be fail-able without triggering a larger failure so it should be - // fine. + // Spawn monitoring task - it will handle expiry/cancellation based on config + let prove_serv = self.clone(); tokio::spawn(async move { prove_serv.prove_and_update_db(order).await }); } @@ -753,7 +773,7 @@ mod tests { let request_id = U256::from(123); let proof_id = prover.prove_stark(&image_id, &input_id, vec![]).await.unwrap(); - // Test 1: FulfillAfterLockExpire order cancelled by matching fulfillment event + // Test 1: FulfillAfterLockExpire order waits for completion when fulfilled externally let order = create_test_order( request_id, image_id.clone(), @@ -771,13 +791,13 @@ mod tests { proving_service_clone.monitor_proof_with_timeout(order_clone).await }); - // Send fulfillment event for the same request - should cancel proof + // Send fulfillment event - should wait for proof completion (default config) send_order_state_event(order_state_tx.clone(), OrderStateChange::Fulfilled { request_id }) .await; let result = monitor_task.await.unwrap(); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Request fulfilled by another prover")); + assert!(result.unwrap_err().to_string().contains("order not actionable")); // Test 2: FulfillAfterLockExpire order ignores different request ID let request_id_2 = U256::from(456); @@ -812,6 +832,6 @@ mod tests { assert!(result_2.is_ok()); assert_eq!(result_2.unwrap(), OrderStatus::PendingAgg); - assert!(logs_contain("was fulfilled by another prover")); + assert!(logs_contain("fulfilled externally")); } } diff --git a/crates/broker/src/reaper.rs b/crates/broker/src/reaper.rs index c20de166d..26c89f1ce 100644 --- a/crates/broker/src/reaper.rs +++ b/crates/broker/src/reaper.rs @@ -35,9 +35,6 @@ pub enum ReaperError { #[error("{code} Config error {0}", code = self.code())] ConfigReadErr(#[from] ConfigErr), - - #[error("{code} Failed to update expired order status: {0}", code = self.code())] - UpdateFailed(anyhow::Error), } impl CodedError for ReaperError { @@ -45,7 +42,6 @@ impl CodedError for ReaperError { match self { ReaperError::DbError(_) => "[B-REAP-001]", ReaperError::ConfigReadErr(_) => "[B-REAP-002]", - ReaperError::UpdateFailed(_) => "[B-REAP-003]", } } } @@ -80,19 +76,11 @@ impl ReaperTask { cancel_proof_and_fail_order( &self.prover, &self.db, + &self.config, &order, "Order expired in reaper", ) .await; - match self.db.set_order_failure(&order_id, "Order expired").await { - Ok(()) => { - warn!("Order {} has expired, marked as failed", order_id); - } - Err(err) => { - error!("Failed to update status for expired order {}: {}", order_id, err); - return Err(ReaperError::UpdateFailed(err.into())); - } - } } } @@ -263,9 +251,9 @@ mod tests { let stored_expired2 = db.get_order(&expired_order2.id()).await.unwrap().unwrap(); assert_eq!(stored_expired1.status, OrderStatus::Failed); - assert_eq!(stored_expired1.error_msg, Some("Order expired".to_string())); + assert_eq!(stored_expired1.error_msg, Some("Order expired in reaper".to_string())); assert_eq!(stored_expired2.status, OrderStatus::Failed); - assert_eq!(stored_expired2.error_msg, Some("Order expired".to_string())); + assert_eq!(stored_expired2.error_msg, Some("Order expired in reaper".to_string())); // Check non-expired orders remain unchanged let stored_active = db.get_order(&active_order.id()).await.unwrap().unwrap(); @@ -311,7 +299,7 @@ mod tests { for order in orders { let stored_order = db.get_order(&order.id()).await.unwrap().unwrap(); assert_eq!(stored_order.status, OrderStatus::Failed); - assert_eq!(stored_order.error_msg, Some("Order expired".to_string())); + assert_eq!(stored_order.error_msg, Some("Order expired in reaper".to_string())); } } } diff --git a/crates/broker/src/utils.rs b/crates/broker/src/utils.rs index 8233ad8dd..03beb2807 100644 --- a/crates/broker/src/utils.rs +++ b/crates/broker/src/utils.rs @@ -31,15 +31,31 @@ pub const ERC1271_MAX_GAS_FOR_CHECK: u64 = 100000; pub async fn cancel_proof_and_fail_order( prover: &crate::provers::ProverObj, db: &crate::db::DbObj, + config: &crate::config::ConfigLock, order: &Order, failure_reason: &'static str, ) { let order_id = order.id(); - if let Some(proof_id) = order.proof_id.as_ref() { - if matches!(order.status, OrderStatus::Proving) { - tracing::debug!("Cancelling proof {} for order {}", proof_id, order_id); - if let Err(err) = prover.cancel_stark(proof_id).await { - tracing::warn!("[B-UTL-001] Failed to cancel proof {proof_id} with reason: {failure_reason} for order {order_id}: {err}"); + + let should_cancel = match config.lock_all() { + Ok(conf) => conf.market.cancel_proving_expired_orders, + Err(err) => { + tracing::warn!( + "[B-UTL-002] Failed to read config for cancellation decision; skipping cancel: {err:?}" + ); + false + } + }; + + if should_cancel { + if let Some(proof_id) = order.proof_id.as_ref() { + if matches!(order.status, OrderStatus::Proving) { + tracing::debug!("Cancelling proof {} for order {}", proof_id, order_id); + if let Err(err) = prover.cancel_stark(proof_id).await { + tracing::warn!( + "[B-UTL-001] Failed to cancel proof {proof_id} with reason: {failure_reason} for order {order_id}: {err}" + ); + } } } }