diff --git a/crates/boundless-market/Cargo.toml b/crates/boundless-market/Cargo.toml index a49ffef24..dfcdfc94b 100644 --- a/crates/boundless-market/Cargo.toml +++ b/crates/boundless-market/Cargo.toml @@ -61,9 +61,9 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } utoipa = { workspace = true } -[build-dependencies] -serde_json = { workspace = true } - [dev-dependencies] boundless-test-utils = { workspace = true } -tracing-test = { workspace = true } +tracing-test = { workspace = true, features = ["no-env-filter"] } + +[build-dependencies] +serde_json = { workspace = true } diff --git a/crates/boundless-market/src/contracts/boundless_market.rs b/crates/boundless-market/src/contracts/boundless_market.rs index 71ebb53a5..494b4bb85 100644 --- a/crates/boundless-market/src/contracts/boundless_market.rs +++ b/crates/boundless-market/src/contracts/boundless_market.rs @@ -110,9 +110,18 @@ pub enum MarketError { #[error("Other error: {0:?}")] Error(#[from] anyhow::Error), - /// Timeout reached. - #[error("Timeout: 0x{0:x}")] - TimeoutReached(U256), + /// Transaction confirmation timed out. + #[error( + "Transaction confirmation timed out: tx_hash={tx_hash}, nonce={nonce:?}, source={source:?}" + )] + TxnTimedOut { + /// The transaction hash that timed out + tx_hash: B256, + /// The nonce used for the transaction, if known + nonce: Option, + /// The underlying error that caused the timeout + source: anyhow::Error, + }, } impl From for MarketError { @@ -482,9 +491,43 @@ impl BoundlessMarketService

{ } let client_sig_bytes = client_sig.into(); + let receipt = self.lock_request_with_nonce(request, client_sig_bytes, None, None).await?; + + self.check_collateral_balance().await?; + + Ok(receipt.block_number.context("TXN Receipt missing block number")?) + } + + async fn lock_request_with_nonce( + &self, + request: &ProofRequest, + client_sig_bytes: Bytes, + nonce: Option, + fees: Option<(u128, u128)>, + ) -> Result { tracing::trace!("Calling lockRequest({:x?}, {:x?})", request, client_sig_bytes); - let call = self.instance.lockRequest(request.clone(), client_sig_bytes).from(self.caller); + let mut call = + self.instance.lockRequest(request.clone(), client_sig_bytes).from(self.caller); + + if let Some(nonce_value) = nonce { + tracing::debug!( + "Retrying lockRequest({:x}) with explicit nonce {}", + request.id, + nonce_value + ); + call = call.nonce(nonce_value); + } + + if let Some((max_fee, max_priority_fee)) = fees { + tracing::debug!( + "Retrying lockRequest({:x}) with bumped fees (max_fee={}, priority_fee={})", + request.id, + max_fee, + max_priority_fee + ); + call = call.max_fee_per_gas(max_fee).max_priority_fee_per_gas(max_priority_fee); + } tracing::trace!("Sending tx {}", format!("{:?}", call)); let pending_tx = call.send().await?; @@ -505,9 +548,252 @@ impl BoundlessMarketService

{ receipt.transaction_hash ); - self.check_collateral_balance().await?; + Ok(receipt) + } - Ok(receipt.block_number.context("TXN Receipt missing block number")?) + async fn bump_fees_for_replacement( + &self, + tx_hash: B256, + previous: Option<(u128, u128)>, + ) -> Option<(u128, u128)> { + let (base_max_fee, base_priority_fee) = if let Some(fees) = previous { + fees + } else { + match self.instance.provider().get_transaction_by_hash(tx_hash).await { + Ok(Some(tx)) => { + (tx.max_fee_per_gas(), tx.max_priority_fee_per_gas().unwrap_or_default()) + } + Ok(None) => { + tracing::debug!( + "Previous transaction {} not yet available when bumping fees", + tx_hash + ); + return None; + } + Err(err) => { + tracing::warn!( + "Failed to fetch previous transaction {} for fee bump: {err:?}", + tx_hash + ); + return None; + } + } + }; + + let bump = |value: u128| value.saturating_mul(110).saturating_div(100).saturating_add(1); + + let estimate = match self.instance.provider().estimate_eip1559_fees().await { + Ok(fees) => Some(fees), + Err(err) => { + tracing::warn!("Failed to estimate eip1559 fees for replacement tx: {err:?}"); + None + } + }; + + let bumped_max = bump(base_max_fee); + let bumped_priority = bump(base_priority_fee); + + let final_max_fee = + estimate.map_or(bumped_max, |fees| bumped_max.max(fees.max_fee_per_gas)); + let final_priority_fee = estimate + .map_or(bumped_priority, |fees| bumped_priority.max(fees.max_priority_fee_per_gas)); + + Some((final_max_fee, final_priority_fee)) + } + + /// Lock the request to the prover with automatic retry on timeout. + /// + /// This method attempts to lock the request, and if the transaction times out, + /// it will retry with the same nonce for better RBF (Replace-By-Fee) handling. + /// Before each retry, it checks if any previous pending transactions have been confirmed. + /// + /// This method should be called from the address of the prover. + pub async fn lock_request_with_retry( + &self, + request: &ProofRequest, + client_sig: impl Into, + max_retry_attempts: u32, + ) -> Result { + let max_attempts = max_retry_attempts.max(1); + let client_sig_bytes = client_sig.into(); + let mut pending_tx_hashes: Vec = Vec::new(); + let mut retry_nonce: Option = None; + let mut replacement_fees: Option<(u128, u128)> = None; + + let mut attempt = 1; + while attempt <= max_attempts { + if let Some(block_number) = + self.finalize_if_confirmed(&pending_tx_hashes, "before retry").await? + { + return Ok(block_number); + } + + tracing::info!( + "Locking request: 0x{:x} for stake: {} (attempt {}/{})", + request.id, + request.offer.lockCollateral, + attempt, + max_attempts + ); + + match self + .lock_request_with_nonce( + request, + client_sig_bytes.clone(), + retry_nonce, + replacement_fees, + ) + .await + { + Ok(receipt) => { + if attempt > 1 { + tracing::info!("Lock succeeded on retry for {:x}", request.id); + } + self.check_collateral_balance().await?; + return Ok(receipt.block_number.context("TXN Receipt missing block number")?); + } + Err(err @ MarketError::TxnTimedOut { tx_hash, nonce, .. }) => { + if attempt < max_attempts { + tracing::warn!( + "Lock confirmation timeout for {:x}, will retry", + request.id + ); + pending_tx_hashes.push(tx_hash); + let resolved_nonce = match nonce { + Some(value) => Some(value), + None => { + self.fetch_nonce_with_retry( + tx_hash, + self.receipt_query_config.retry_count, + self.receipt_query_config.retry_interval, + ) + .await + } + }; + if let Some(nonce_value) = resolved_nonce { + retry_nonce = Some(nonce_value); + } else { + tracing::warn!( + "Failed to determine nonce for timed-out tx {}; aborting retries", + tx_hash + ); + return Err(err); + } + replacement_fees = + self.bump_fees_for_replacement(tx_hash, replacement_fees).await; + attempt += 1; + continue; + } + if let Some(block_number) = self + .finalize_if_confirmed(&pending_tx_hashes, "while handling timeout failure") + .await? + { + return Ok(block_number); + } + return Err(err); + } + Err(err @ (MarketError::TxnError(_) | MarketError::Error(_))) => { + let err_str = err.to_string().to_lowercase(); + if err_str.contains("nonce too low") { + tracing::info!( + "Nonce too low encountered with {} pending tx hashes", + pending_tx_hashes.len() + ); + tracing::debug!( + "Nonce too low but previous transaction receipt not yet available, waiting..." + ); + tokio::time::sleep(self.receipt_query_config.retry_interval).await; + continue; + } + if let Some(block_number) = self + .finalize_if_confirmed( + &pending_tx_hashes, + "while handling nonce-too-low error", + ) + .await? + { + return Ok(block_number); + } + return Err(err); + } + Err(err) => { + if let Some(block_number) = self + .finalize_if_confirmed(&pending_tx_hashes, "while handling error") + .await? + { + return Ok(block_number); + } + return Err(err); + } + } + } + + Err(MarketError::Error(anyhow!("Max retry attempts exceeded"))) + } + + async fn find_confirmed_receipt(&self, pending_hashes: &[B256]) -> Option { + for &pending_hash in pending_hashes { + match self.instance.provider().get_transaction_receipt(pending_hash).await { + Ok(Some(receipt)) => { + if receipt.status() { + return Some(receipt); + } + } + Ok(None) => {} + Err(err) => { + tracing::debug!( + "Failed to fetch transaction receipt for {}: {err:?}", + pending_hash + ); + } + } + } + None + } + + async fn finalize_if_confirmed( + &self, + pending_hashes: &[B256], + context: &str, + ) -> Result, MarketError> { + if let Some(receipt) = self.find_confirmed_receipt(pending_hashes).await { + tracing::info!("Pending transaction {} confirmed {context}", receipt.transaction_hash); + self.check_collateral_balance().await?; + return Ok(Some(receipt.block_number.context("TXN Receipt missing block number")?)); + } + Ok(None) + } + + async fn fetch_nonce_with_retry( + &self, + tx_hash: B256, + attempts: usize, + interval: Duration, + ) -> Option { + if attempts == 0 { + return None; + } + for attempt in 0..attempts.max(1) { + match self.instance.provider().get_transaction_by_hash(tx_hash).await { + Ok(Some(tx)) => { + let nonce = tx.nonce(); + tracing::debug!("Tx {} broadcasted with nonce {}", tx_hash, nonce); + return Some(nonce); + } + Ok(None) => {} + Err(err) => { + tracing::debug!( + "Failed to fetch transaction {} nonce (attempt {}): {err:?}", + tx_hash, + attempt + 1 + ); + } + } + if attempt + 1 < attempts { + tokio::time::sleep(interval).await; + } + } + None } /// Lock the request to the prover, giving them exclusive rights to be paid to @@ -568,15 +854,11 @@ impl BoundlessMarketService

{ ) -> Result { let tx_hash = *pending_tx.tx_hash(); - // Get the nonce of the transaction for debugging purposes. - // It is possible that the transaction is not found immediately after broadcast, so we don't error if it's not found. - let tx_result = self.instance.provider().get_transaction_by_hash(tx_hash).await; - if let Ok(Some(tx)) = tx_result { - let nonce = tx.nonce(); - tracing::debug!("Tx {} broadcasted with nonce {}", tx_hash, nonce); - } else { + // Attempt to fetch the nonce for logging/error reporting, but don't fail if it's missing. + let mut nonce = self.fetch_nonce_with_retry(tx_hash, 5, Duration::from_millis(100)).await; + if nonce.is_none() { tracing::debug!( - "Tx {} not found immediately after broadcast. Can't get nonce.", + "Tx {} not yet available when fetching nonce; continuing without cached nonce", tx_hash ); } @@ -598,19 +880,46 @@ impl BoundlessMarketService

{ tokio::time::sleep(self.receipt_query_config.retry_interval).await; } - Err(anyhow!( - "Transaction {:?} confirmed, but receipt was not found after {} retries.", + if nonce.is_none() { + nonce = self + .fetch_nonce_with_retry( + tx_hash, + self.receipt_query_config.retry_count, + self.receipt_query_config.retry_interval, + ) + .await; + } + Err(MarketError::TxnTimedOut { tx_hash, - self.receipt_query_config.retry_count - ) - .into()) + nonce, + source: anyhow!( + "Transaction {:?} confirmed, but receipt was not found after {} retries.", + tx_hash, + self.receipt_query_config.retry_count + ), + }) + } + Err(e) => { + if nonce.is_none() { + nonce = self + .fetch_nonce_with_retry( + tx_hash, + self.receipt_query_config.retry_count, + self.receipt_query_config.retry_interval, + ) + .await; + } + Err(MarketError::TxnTimedOut { + tx_hash, + nonce, + source: anyhow!( + "failed to confirm tx {:?} within timeout {:?}: {}", + tx_hash, + self.timeout, + e + ), + }) } - Err(e) => Err(MarketError::TxnConfirmationError(anyhow!( - "failed to confirm tx {:?} within timeout {:?}: {}", - tx_hash, - self.timeout, - e - ))), } } @@ -884,12 +1193,7 @@ impl BoundlessMarketService

{ tracing::trace!("Calldata: {}", call.calldata()); let pending_tx = call.send().await?; tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash()); - let tx_receipt = pending_tx - .with_timeout(Some(self.timeout)) - .get_receipt() - .await - .context("failed to confirm tx") - .map_err(MarketError::TxnConfirmationError)?; + let tx_receipt = self.get_receipt_with_retry(pending_tx).await?; tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); @@ -923,12 +1227,7 @@ impl BoundlessMarketService

{ tracing::trace!("Calldata: {}", call.calldata()); let pending_tx = call.send().await?; tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash()); - let tx_receipt = pending_tx - .with_timeout(Some(self.timeout)) - .get_receipt() - .await - .context("failed to confirm tx") - .map_err(MarketError::TxnConfirmationError)?; + let tx_receipt = self.get_receipt_with_retry(pending_tx).await?; tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); diff --git a/crates/boundless-market/tests/e2e.rs b/crates/boundless-market/tests/e2e.rs index 4236491ac..3142668ff 100644 --- a/crates/boundless-market/tests/e2e.rs +++ b/crates/boundless-market/tests/e2e.rs @@ -36,6 +36,7 @@ use risc0_zkvm::{ sha::{Digest, Digestible}, ReceiptClaim, }; +use std::time::Duration; use tracing_test::traced_test; fn now_timestamp() -> u64 { @@ -574,3 +575,42 @@ async fn test_e2e_claim_digest_no_fulfillment_data() { assert_eq!(fulfillment_data, expected_fulfillment_data); assert_eq!(fulfillment_result.seal, fulfillment.seal); } + +#[tokio::test] +#[traced_test] +async fn test_lock_request_with_retry() { + let anvil = Anvil::new().block_time(1).spawn(); + + let mut ctx = create_test_ctx(&anvil).await.unwrap(); + + let request = new_request(1, &ctx).await; + + let request_id = + ctx.customer_market.submit_request(&request, &ctx.customer_signer).await.unwrap(); + + let deposit = default_allowance(); + ctx.prover_market.deposit_collateral_with_permit(deposit, &ctx.prover_signer).await.unwrap(); + + let eip712_domain = ctx.customer_market.eip712_domain().await.unwrap(); + let customer_sig = request + .sign_request(&ctx.customer_signer, eip712_domain.verifying_contract, anvil.chain_id()) + .await + .unwrap() + .as_bytes(); + + ctx.prover_market = ctx + .prover_market + .clone() + .with_timeout(Duration::from_millis(200)) + .with_receipt_retry_count(6); + + let block_number = + ctx.prover_market.lock_request_with_retry(&request, customer_sig, 2).await.unwrap(); + + assert!(ctx.customer_market.is_locked(request_id).await.unwrap()); + assert!( + ctx.customer_market.get_status(request_id, None).await.unwrap() == RequestStatus::Locked + ); + assert!(block_number > 0); + assert!(logs_contain("bumped fees")); +} diff --git a/crates/broker/src/config.rs b/crates/broker/src/config.rs index e53c03ba0..4db99631a 100644 --- a/crates/broker/src/config.rs +++ b/crates/broker/src/config.rs @@ -73,6 +73,10 @@ mod defaults { 2 } + pub const fn max_lock_retry_attempts() -> u32 { + 3 + } + pub const fn reaper_interval_secs() -> u32 { 60 } @@ -172,7 +176,7 @@ mod defaults { } pub const fn txn_timeout() -> u64 { - 45 + 30 } pub const fn single_txn_fulfill() -> bool { @@ -323,6 +327,12 @@ pub struct MarketConf { /// `gas_priority_mode = { custom = { base_fee_multiplier_percentage = 300, priority_fee_multiplier_percentage = 150, priority_fee_percentile = 15.0, dynamic_multiplier_percentage = 5 } }`. #[serde(default = "defaults::priority_mode")] pub gas_priority_mode: PriorityMode, + /// Maximum number of retry attempts for lock transactions (including initial attempt) + /// + /// When a lock transaction times out, it will be retried with the same nonce for better RBF handling. + /// Defaults to 3 (1 initial attempt + 2 retries). + #[serde(default = "defaults::max_lock_retry_attempts")] + pub max_lock_retry_attempts: u32, /// DEPRECATED: lockRequest priority gas /// @@ -446,6 +456,7 @@ impl Default for MarketConf { allow_client_addresses: None, deny_requestor_addresses: None, gas_priority_mode: defaults::priority_mode(), + max_lock_retry_attempts: defaults::max_lock_retry_attempts(), lockin_priority_gas: None, max_file_size: defaults::max_file_size(), max_fetch_retries: Some(2), diff --git a/crates/broker/src/order_monitor.rs b/crates/broker/src/order_monitor.rs index 40bc4252c..1927859f1 100644 --- a/crates/broker/src/order_monitor.rs +++ b/crates/broker/src/order_monitor.rs @@ -249,65 +249,75 @@ where return Err(OrderMonitorErr::AlreadyLocked); } - tracing::info!( - "Locking request: 0x{:x} for stake: {}", - request_id, - order.request.offer.lockCollateral - ); - let lock_block = - self.market.lock_request(&order.request, order.client_sig.clone()).await.map_err( - |e| -> OrderMonitorErr { - match e { - MarketError::TxnError(txn_err) => match txn_err { - TxnErr::BoundlessMarketErr( - IBoundlessMarketErrors::RequestIsLocked(_), - ) => OrderMonitorErr::AlreadyLocked, - _ => OrderMonitorErr::LockTxFailed(txn_err.to_string()), - }, - MarketError::RequestAlreadyLocked(_e) => OrderMonitorErr::AlreadyLocked, - MarketError::TxnConfirmationError(e) => { - OrderMonitorErr::LockTxNotConfirmed(e.to_string()) - } - MarketError::LockRevert(e) => { - // Note: lock revert could be for any number of reasons; - // 1/ someone may have locked in the block before us, - // 2/ the lock may have expired, - // 3/ the request may have been fulfilled, - // 4/ the requestor may have withdrawn their funds - // Currently we don't have a way to determine the cause of the revert. - OrderMonitorErr::LockTxFailed(format!("Tx hash 0x{e:x}")) + let max_lock_retry_attempts = { + let config = + self.config.lock_all().map_err(|e| OrderMonitorErr::UnexpectedError(e.into()))?; + config.market.max_lock_retry_attempts + }; + + // Attempt to lock with automatic retry on timeout + let lock_block = self + .market + .lock_request_with_retry( + &order.request, + order.client_sig.clone(), + max_lock_retry_attempts, + ) + .await + .map_err(|e| -> OrderMonitorErr { + match e { + MarketError::TxnError(txn_err) => match txn_err { + TxnErr::BoundlessMarketErr(IBoundlessMarketErrors::RequestIsLocked(_)) => { + OrderMonitorErr::AlreadyLocked } - MarketError::Error(e) => { - // Insufficient balance error is thrown both when the requestor has insufficient balance, - // Requestor having insufficient balance can happen and is out of our control. The prover - // having insufficient balance is unexpected as we should have checked for that before - // committing to locking the order. - let prover_addr_str = - self.prover_addr.to_string().to_lowercase().replace("0x", ""); - if e.to_string().contains("InsufficientBalance") { - if e.to_string().to_lowercase().contains(&prover_addr_str) { - OrderMonitorErr::InsufficientBalance - } else { - OrderMonitorErr::LockTxFailed(format!( - "Requestor has insufficient balance at lock time: {e}" - )) - } - } else if e.to_string().contains("RequestIsLocked") { - OrderMonitorErr::AlreadyLocked + _ => OrderMonitorErr::LockTxFailed(txn_err.to_string()), + }, + MarketError::RequestAlreadyLocked(_e) => OrderMonitorErr::AlreadyLocked, + MarketError::TxnConfirmationError(e) => { + OrderMonitorErr::LockTxNotConfirmed(e.to_string()) + } + MarketError::TxnTimedOut { source, .. } => { + OrderMonitorErr::LockTxNotConfirmed(source.to_string()) + } + MarketError::LockRevert(e) => { + // Note: lock revert could be for any number of reasons; + // 1/ someone may have locked in the block before us, + // 2/ the lock may have expired, + // 3/ the request may have been fulfilled, + // 4/ the requestor may have withdrawn their funds + // Currently we don't have a way to determine the cause of the revert. + OrderMonitorErr::LockTxFailed(format!("Tx hash 0x{e:x}")) + } + MarketError::Error(e) => { + // Insufficient balance error is thrown both when the requestor has insufficient balance, + // Requestor having insufficient balance can happen and is out of our control. The prover + // having insufficient balance is unexpected as we should have checked for that before + // committing to locking the order. + let prover_addr_str = + self.prover_addr.to_string().to_lowercase().replace("0x", ""); + if e.to_string().contains("InsufficientBalance") { + if e.to_string().to_lowercase().contains(&prover_addr_str) { + OrderMonitorErr::InsufficientBalance } else { - OrderMonitorErr::UnexpectedError(e) + OrderMonitorErr::LockTxFailed(format!( + "Requestor has insufficient balance at lock time: {e}" + )) } + } else if e.to_string().contains("RequestIsLocked") { + OrderMonitorErr::AlreadyLocked + } else { + OrderMonitorErr::UnexpectedError(e) } - _ => { - if e.to_string().contains("RequestIsLocked") { - OrderMonitorErr::AlreadyLocked - } else { - OrderMonitorErr::UnexpectedError(e.into()) - } + } + _ => { + if e.to_string().contains("RequestIsLocked") { + OrderMonitorErr::AlreadyLocked + } else { + OrderMonitorErr::UnexpectedError(e.into()) } } - }, - )?; + } + })?; // Fetch the block to retrieve the lock timestamp. This has been observed to return // inconsistent state between the receipt being available but the block not yet. diff --git a/crates/broker/src/submitter.rs b/crates/broker/src/submitter.rs index 98bf2b810..6d9db2cae 100644 --- a/crates/broker/src/submitter.rs +++ b/crates/broker/src/submitter.rs @@ -549,7 +549,10 @@ where } } - if let MarketError::TxnConfirmationError(_) = &err { + if matches!( + err, + MarketError::TxnConfirmationError(_) | MarketError::TxnTimedOut { .. } + ) { return Err(SubmitterErr::TxnConfirmationError(err)); }