diff --git a/crates/boundless-market/Cargo.toml b/crates/boundless-market/Cargo.toml index a49ffef24..6963e0aa1 100644 --- a/crates/boundless-market/Cargo.toml +++ b/crates/boundless-market/Cargo.toml @@ -66,4 +66,4 @@ serde_json = { workspace = true } [dev-dependencies] boundless-test-utils = { workspace = true } -tracing-test = { workspace = true } +tracing-test = { workspace = true, features = ["no-env-filter"] } diff --git a/crates/boundless-market/src/contracts/boundless_market.rs b/crates/boundless-market/src/contracts/boundless_market.rs index 160b60c19..1446d234b 100644 --- a/crates/boundless-market/src/contracts/boundless_market.rs +++ b/crates/boundless-market/src/contracts/boundless_market.rs @@ -28,21 +28,20 @@ use alloy::{ signers::Signer, }; -use alloy_sol_types::{SolCall, SolEvent}; +use alloy_sol_types::{SolCall, SolEvent, SolInterface}; use anyhow::{anyhow, Context, Result}; use risc0_ethereum_contracts::event_query::EventQueryConfig; use thiserror::Error; -use crate::{ - contracts::token::{IERC20Permit, IHitPoints::IHitPointsErrors, Permit, IERC20}, - deployments::collateral_token_supports_permit, -}; - use super::{ eip712_domain, AssessorReceipt, EIP712DomainSaltless, Fulfillment, - IBoundlessMarket::{self, IBoundlessMarketInstance, ProofDelivered}, + IBoundlessMarket::{self, IBoundlessMarketErrors, IBoundlessMarketInstance, ProofDelivered}, Offer, ProofRequest, RequestError, RequestId, RequestStatus, TxnErr, TXN_CONFIRM_TIMEOUT, }; +use crate::{ + contracts::token::{IERC20Permit, IHitPoints::IHitPointsErrors, Permit, IERC20}, + deployments::collateral_token_supports_permit, +}; /// Fraction of collateral the protocol gives to the prover who fills an order that was locked by another prover but expired /// This is determined by the constant SLASHING_BURN_BPS defined in the BoundlessMarket contract. @@ -113,6 +112,16 @@ pub enum MarketError { /// Timeout reached. #[error("Timeout: 0x{0:x}")] TimeoutReached(U256), + + /// Payment requirements failed + #[error("Payment requirements failed during order fulfillment: {0:?}")] + PaymentRequirementsFailed(IBoundlessMarketErrors), + + /// Payment requirements failed, unable to decode error + #[error( + "Payment requirements failed during order fulfillment: unrecognized error payload {0:?}" + )] + PaymentRequirementsFailedUnknownError(Bytes), } impl From for MarketError { @@ -209,6 +218,39 @@ fn extract_tx_log( } } +fn validate_fulfill_receipt(receipt: TransactionReceipt) -> Result<(), MarketError> { + for (idx, log) in receipt.inner.logs().iter().enumerate() { + if log.topic0().is_some_and(|topic| { + *topic == IBoundlessMarket::PaymentRequirementsFailed::SIGNATURE_HASH + }) { + match log.log_decode::() { + Ok(decoded) => { + let raw_error = Bytes::copy_from_slice(decoded.inner.data.error.as_ref()); + match IBoundlessMarketErrors::abi_decode(&raw_error) { + Ok(err) => tracing::warn!( + tx_hash = ?receipt.transaction_hash, + log_index = idx, + "Payment requirements failed for at least one fulfillment: {err:?}" + ), + Err(_) => tracing::warn!( + tx_hash = ?receipt.transaction_hash, + log_index = idx, + raw = ?raw_error, + "Payment requirements failed for at least one fulfillment, but error payload was unrecognized" + ), + } + } + Err(err) => tracing::warn!( + tx_hash = ?receipt.transaction_hash, + log_index = idx, + "Failed to decode PaymentRequirementsFailed event: {err:?}" + ), + } + } + } + Ok(()) +} + /// Data returned when querying for a RequestSubmitted event #[derive(Debug, Clone)] pub struct RequestSubmittedEventData { @@ -729,7 +771,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted proof for batch {:?}: {}", fill_ids, receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(receipt) } /// Fulfill a batch of requests by delivering the proof for each application and withdraw from the prover balance. @@ -751,7 +793,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted proof for batch {:?}: {}", fill_ids, receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `fulfill`. @@ -784,7 +826,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `fulfillAndWithdraw`. @@ -813,7 +855,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// A combined call to `IBoundlessMarket.priceRequest` and `IBoundlessMarket.fulfill`. @@ -856,7 +898,7 @@ impl BoundlessMarketService

{ tracing::info!("Fulfilled proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// A combined call to `IBoundlessMarket.priceRequest` and `IBoundlessMarket.fulfillAndWithdraw`. @@ -899,7 +941,7 @@ impl BoundlessMarketService

{ tracing::info!("Fulfilled proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `priceAndfulfill`. @@ -938,7 +980,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `priceAndFulfillAndWithdraw`. @@ -977,7 +1019,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Checks if a request is locked in. diff --git a/crates/boundless-market/tests/e2e.rs b/crates/boundless-market/tests/e2e.rs index f0b101279..d544b253d 100644 --- a/crates/boundless-market/tests/e2e.rs +++ b/crates/boundless-market/tests/e2e.rs @@ -374,6 +374,7 @@ async fn test_e2e_price_and_fulfill_batch() { } #[tokio::test] +#[traced_test] async fn test_e2e_no_payment() { // Setup anvil let anvil = Anvil::new().spawn(); @@ -434,11 +435,13 @@ async fn test_e2e_no_payment() { }; let balance_before = ctx.prover_market.balance_of(some_other_address).await.unwrap(); - // fulfill the request. + // fulfill the request. This call emits a PaymentRequirementsFailed log since the lock + // belongs to a different prover, but the request itself still becomes fulfilled on-chain. ctx.prover_market .fulfill(FulfillmentTx::new(vec![fulfillment.clone()], assessor_fill.clone())) .await - .unwrap(); + .expect("fulfillment should succeed even if payment requirements fail"); + assert!(logs_contain("Payment requirements failed for at least one fulfillment")); assert!(ctx.customer_market.is_fulfilled(request_id).await.unwrap()); let balance_after = ctx.prover_market.balance_of(some_other_address).await.unwrap(); assert!(balance_before == balance_after); diff --git a/crates/broker/src/lib.rs b/crates/broker/src/lib.rs index 5506186ec..9e1e2b196 100644 --- a/crates/broker/src/lib.rs +++ b/crates/broker/src/lib.rs @@ -22,7 +22,7 @@ use crate::storage::create_uri_handler; use alloy::{ network::Ethereum, primitives::{Address, Bytes, FixedBytes, U256}, - providers::{Provider, WalletProvider}, + providers::{DynProvider, Provider, WalletProvider}, signers::local::PrivateKeySigner, }; use anyhow::{Context, Result}; @@ -897,16 +897,19 @@ where Arc::new(provers::DefaultProver::new()) }; + let prover_addr = self.provider.default_signer_address(); + let (pricing_tx, pricing_rx) = mpsc::channel(PRICING_CHANNEL_CAPACITY); - let collateral_token_decimals = BoundlessMarketService::new( + let market = Arc::new(BoundlessMarketService::new( self.deployment().boundless_market_address, - self.provider.clone(), - Address::ZERO, - ) - .collateral_token_decimals() - .await - .context("Failed to get stake token decimals. Possible RPC error.")?; + DynProvider::new(self.provider.clone()), + prover_addr, + )); + let collateral_token_decimals = market + .collateral_token_decimals() + .await + .context("Failed to get stake token decimals. Possible RPC error.")?; // Spin up the order picker to pre-flight and find orders to lock let order_picker = Arc::new(order_picker::OrderPicker::new( @@ -939,6 +942,7 @@ where config.clone(), order_state_tx.clone(), self.priority_requestors.clone(), + market.clone(), ) .await .context("Failed to initialize proving service")?, @@ -954,9 +958,6 @@ where Ok(()) }); - let prover_addr = - self.args.private_key.as_ref().expect("Private key must be set").address(); - let order_monitor = Arc::new(order_monitor::OrderMonitor::new( self.db.clone(), self.provider.clone(), diff --git a/crates/broker/src/order_monitor.rs b/crates/broker/src/order_monitor.rs index a2e0741f5..68f6abf96 100644 --- a/crates/broker/src/order_monitor.rs +++ b/crates/broker/src/order_monitor.rs @@ -469,8 +469,16 @@ where } else if !is_within_deadline(&order, current_block_timestamp, min_deadline) { self.skip_order(&order, "expired").await; } else if is_target_time_reached(&order, current_block_timestamp) { - tracing::info!("Request 0x{:x} was locked by another prover but expired unfulfilled, setting status to pending proving", order.request.id); - candidate_orders.push(order); + if self.market.is_fulfilled(order.request.id).await? { + tracing::debug!( + "Lock expiry timeout occurred, but 0x{:x} was already fulfilled by another prover. Skipping.", + order.request.id + ); + self.skip_order(&order, "was fulfilled by other").await; + } else { + tracing::info!("Request 0x{:x} was locked by another prover but expired unfulfilled, setting status to pending proving", order.request.id); + candidate_orders.push(order); + } } } diff --git a/crates/broker/src/proving.rs b/crates/broker/src/proving.rs index d9fb33bcd..aaa0f2485 100644 --- a/crates/broker/src/proving.rs +++ b/crates/broker/src/proving.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use crate::{ config::ConfigLock, @@ -27,7 +27,9 @@ use crate::{ utils::cancel_proof_and_fail_order, FulfillmentType, Order, OrderStateChange, OrderStatus, }; +use alloy::providers::DynProvider; use anyhow::{Context, Result}; +use boundless_market::contracts::boundless_market::BoundlessMarketService; use thiserror::Error; use tokio_util::sync::CancellationToken; @@ -66,6 +68,7 @@ pub struct ProvingService { config: ConfigLock, order_state_tx: tokio::sync::broadcast::Sender, priority_requestors: PriorityRequestors, + fulfillment_market: Arc>, } impl ProvingService { @@ -75,8 +78,9 @@ impl ProvingService { config: ConfigLock, order_state_tx: tokio::sync::broadcast::Sender, priority_requestors: PriorityRequestors, + fulfillment_market: Arc>, ) -> Result { - Ok(Self { db, prover, config, order_state_tx, priority_requestors }) + Ok(Self { db, prover, config, order_state_tx, priority_requestors, fulfillment_market }) } async fn monitor_proof_internal( @@ -339,6 +343,7 @@ impl ProvingService { async fn prove_and_update_db(&self, mut order: Order) { let order_id = order.id(); + let request_id = order.request.id; let (proof_retry_count, proof_retry_sleep_ms) = { let config = self.config.lock_all().unwrap(); @@ -378,6 +383,27 @@ impl ProvingService { Ok(order_status) => { tracing::info!("Successfully completed proof monitoring for order {order_id}"); + // Note: this sanity check isn't strictly necessary, but is to avoid submitting the + // order when the fulfillment event was missed. + if order.fulfillment_type == FulfillmentType::FulfillAfterLockExpire { + let is_fulfilled = self + .fulfillment_market + .is_fulfilled(request_id) + .await + .inspect_err(|e| { + tracing::warn!( + "Failed to sanity check fulfillment status for order {order_id}: {e:?}" + ); + }) + .unwrap_or(false); + if is_fulfilled { + tracing::warn!("Fulfillment event was missed, skipping aggregation for fulfilled order {order_id}"); + handle_order_failure(&self.db, &order_id, "Fulfilled before aggregation") + .await; + return; + } + } + if let Err(e) = self.db.set_aggregation_status(&order_id, order_status).await { tracing::error!("Failed to set aggregation status for order {order_id}: {e:?}"); } @@ -494,15 +520,32 @@ mod tests { FulfillmentType, OrderStatus, }; use alloy::primitives::{Address, Bytes, U256}; + use alloy::providers::{DynProvider, Provider, ProviderBuilder}; + use alloy::transports::mock::Asserter; use boundless_market::contracts::{ Offer, Predicate, ProofRequest, RequestInput, RequestInputType, Requirements, }; use boundless_test_utils::guests::{ECHO_ELF, ECHO_ID}; use chrono::Utc; + use hex::encode; use risc0_zkvm::sha::Digest; use std::sync::Arc; use tracing_test::traced_test; + fn mock_market(responses: Vec) -> Arc> { + let asserter = Asserter::new(); + for fulfilled in responses { + let mut data = [0u8; 32]; + if fulfilled { + data[31] = 1; + } + asserter.push_success(&format!("0x{}", encode(data))); + } + + let provider = ProviderBuilder::new().connect_mocked_client(asserter).erased(); + Arc::new(BoundlessMarketService::new(Address::ZERO, provider, Address::ZERO)) + } + fn create_test_order( request_id: U256, image_id: String, @@ -573,6 +616,7 @@ mod tests { async fn prove_order() { let db: DbObj = Arc::new(SqliteDb::new("sqlite::memory:").await.unwrap()); let config = ConfigLock::default(); + let market = mock_market(vec![false; 4]); let prover: ProverObj = Arc::new(DefaultProver::new()); let image_id = Digest::from(ECHO_ID).to_string(); @@ -590,6 +634,7 @@ mod tests { config.clone(), order_state_tx, priority_requestors, + market.clone(), ) .await .unwrap(); @@ -619,6 +664,7 @@ mod tests { config.clone(), order_state_tx.clone(), priority_requestors, + market.clone(), ) .await .unwrap(); @@ -656,6 +702,7 @@ mod tests { let config = ConfigLock::default(); let prover: ProverObj = Arc::new(DefaultProver::new()); + let market = mock_market(vec![false; 6]); let image_id = Digest::from(ECHO_ID).to_string(); prover.upload_image(&image_id, ECHO_ELF.to_vec()).await.unwrap(); @@ -675,6 +722,7 @@ mod tests { config.clone(), order_state_tx, priority_requestors, + market, ) .await .unwrap(); @@ -750,6 +798,7 @@ mod tests { let db: DbObj = Arc::new(SqliteDb::new("sqlite::memory:").await.unwrap()); let config = ConfigLock::default(); let prover: ProverObj = Arc::new(DefaultProver::new()); + let market = mock_market(vec![false; 6]); let image_id = Digest::from(ECHO_ID).to_string(); prover.upload_image(&image_id, ECHO_ELF.to_vec()).await.unwrap(); @@ -766,6 +815,7 @@ mod tests { config.clone(), order_state_tx.clone(), priority_requestors, + market, ) .await .unwrap(); diff --git a/crates/broker/src/submitter.rs b/crates/broker/src/submitter.rs index e0d312775..ab0b19dd9 100644 --- a/crates/broker/src/submitter.rs +++ b/crates/broker/src/submitter.rs @@ -585,6 +585,24 @@ where ); return Ok(()); } + Err(SubmitterErr::MarketError( + MarketError::PaymentRequirementsFailedUnknownError(raw), + )) => { + tracing::warn!( + "Payment requirement failed for one or more orders, will not retry (raw error: {raw:?})" + ); + errors.push(SubmitterErr::MarketError( + MarketError::PaymentRequirementsFailedUnknownError(raw), + )); + break; + } + Err(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailed(err))) => { + tracing::warn!("Payment requirement failed for one or more orders: {err:?}, will not retry"); + errors.push(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailed( + err, + ))); + break; + } Err(err) => { tracing::warn!( "Batch submission attempt {}/{} failed. Error: {err:?}",