From 011eea2ffbf1c6059f9dacfbf0d61859c699e031 Mon Sep 17 00:00:00 2001 From: mintybasil <163682877+mintybasil@users.noreply.github.com> Date: Thu, 13 Nov 2025 22:44:18 +0000 Subject: [PATCH 01/11] Add check for PaymentRequirementsFulfilled --- .../src/contracts/boundless_market.rs | 40 ++++++++++++++----- crates/broker/src/submitter.rs | 10 +++++ 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/crates/boundless-market/src/contracts/boundless_market.rs b/crates/boundless-market/src/contracts/boundless_market.rs index 160b60c19..34edef26f 100644 --- a/crates/boundless-market/src/contracts/boundless_market.rs +++ b/crates/boundless-market/src/contracts/boundless_market.rs @@ -28,7 +28,7 @@ use alloy::{ signers::Signer, }; -use alloy_sol_types::{SolCall, SolEvent}; +use alloy_sol_types::{SolCall, SolEvent, SolInterface}; use anyhow::{anyhow, Context, Result}; use risc0_ethereum_contracts::event_query::EventQueryConfig; use thiserror::Error; @@ -37,10 +37,9 @@ use crate::{ contracts::token::{IERC20Permit, IHitPoints::IHitPointsErrors, Permit, IERC20}, deployments::collateral_token_supports_permit, }; - use super::{ eip712_domain, AssessorReceipt, EIP712DomainSaltless, Fulfillment, - IBoundlessMarket::{self, IBoundlessMarketInstance, ProofDelivered}, + IBoundlessMarket::{self, IBoundlessMarketInstance, ProofDelivered, IBoundlessMarketErrors}, Offer, ProofRequest, RequestError, RequestId, RequestStatus, TxnErr, TXN_CONFIRM_TIMEOUT, }; @@ -113,6 +112,14 @@ pub enum MarketError { /// Timeout reached. #[error("Timeout: 0x{0:x}")] TimeoutReached(U256), + + /// Payment requirements failed + #[error("Payment requirements failed during order fulfillment: {0:?}")] + PaymentRequirementsFailed(IBoundlessMarketErrors), + + /// Payment requirements failed, unable to decode error + #[error("Payment requirements failed during order fulfillment: error unknown")] + PaymentRequirementsFailedUnknownError, } impl From for MarketError { @@ -209,6 +216,17 @@ fn extract_tx_log( } } +fn validate_fulfill_receipt(receipt: TransactionReceipt) -> Result<(), MarketError> { + if let Some(log) = receipt.decoded_log::() { + match IBoundlessMarketErrors::abi_decode(&**log.error) { + Ok(err) => Err(MarketError::PaymentRequirementsFailed(err)), + Err(_) => Err(MarketError::PaymentRequirementsFailedUnknownError) + } + } else { + Ok(()) + } +} + /// Data returned when querying for a RequestSubmitted event #[derive(Debug, Clone)] pub struct RequestSubmittedEventData { @@ -729,7 +747,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted proof for batch {:?}: {}", fill_ids, receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(receipt) } /// Fulfill a batch of requests by delivering the proof for each application and withdraw from the prover balance. @@ -751,7 +769,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted proof for batch {:?}: {}", fill_ids, receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `fulfill`. @@ -784,7 +802,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `fulfillAndWithdraw`. @@ -813,7 +831,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// A combined call to `IBoundlessMarket.priceRequest` and `IBoundlessMarket.fulfill`. @@ -856,7 +874,7 @@ impl BoundlessMarketService

{ tracing::info!("Fulfilled proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// A combined call to `IBoundlessMarket.priceRequest` and `IBoundlessMarket.fulfillAndWithdraw`. @@ -899,7 +917,7 @@ impl BoundlessMarketService

{ tracing::info!("Fulfilled proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `priceAndfulfill`. @@ -938,7 +956,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Combined function to submit a new merkle root to the set-verifier and call `priceAndFulfillAndWithdraw`. @@ -977,7 +995,7 @@ impl BoundlessMarketService

{ tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash); - Ok(()) + validate_fulfill_receipt(tx_receipt) } /// Checks if a request is locked in. diff --git a/crates/broker/src/submitter.rs b/crates/broker/src/submitter.rs index e0d312775..bee100a1f 100644 --- a/crates/broker/src/submitter.rs +++ b/crates/broker/src/submitter.rs @@ -585,6 +585,16 @@ where ); return Ok(()); } + Err(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailedUnknownError)) => { + tracing::warn!("Payment requirement failed for one or more orders, will not retry"); + errors.push(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailedUnknownError)); + break; + } + Err(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailed(err))) => { + tracing::warn!("Payment requirement failed for one or more orders: {err:?}, will not retry"); + errors.push(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailed(err))); + break; + } Err(err) => { tracing::warn!( "Batch submission attempt {}/{} failed. Error: {err:?}", From 0327b7566d0de807f5045457d73a4b8f3b4f2b80 Mon Sep 17 00:00:00 2001 From: mintybasil <163682877+mintybasil@users.noreply.github.com> Date: Thu, 13 Nov 2025 22:46:34 +0000 Subject: [PATCH 02/11] Add fulfillment check before proving --- crates/broker/src/order_monitor.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/broker/src/order_monitor.rs b/crates/broker/src/order_monitor.rs index a2e0741f5..255800675 100644 --- a/crates/broker/src/order_monitor.rs +++ b/crates/broker/src/order_monitor.rs @@ -469,8 +469,12 @@ where } else if !is_within_deadline(&order, current_block_timestamp, min_deadline) { self.skip_order(&order, "expired").await; } else if is_target_time_reached(&order, current_block_timestamp) { - tracing::info!("Request 0x{:x} was locked by another prover but expired unfulfilled, setting status to pending proving", order.request.id); - candidate_orders.push(order); + if self.market.is_fulfilled(order.request.id).await? { + tracing::info!("Order already fulfilled by another prover, skipping 0x{:x}", order.request.id); + } else { + tracing::info!("Request 0x{:x} was locked by another prover but expired unfulfilled, setting status to pending proving", order.request.id); + candidate_orders.push(order); + } } } From 687aef79536b1e496052f835a0cb84b494661a0a Mon Sep 17 00:00:00 2001 From: mintybasil <163682877+mintybasil@users.noreply.github.com> Date: Sun, 16 Nov 2025 17:20:23 +0000 Subject: [PATCH 03/11] Skip fulfilled order, fmt --- .../commands/prover/generate_config.rs.backup | 1473 +++++++++++++++++ crates/broker/src/order_monitor.rs | 6 +- crates/broker/src/submitter.rs | 16 +- 3 files changed, 1490 insertions(+), 5 deletions(-) create mode 100644 crates/boundless-cli/src/commands/prover/generate_config.rs.backup diff --git a/crates/boundless-cli/src/commands/prover/generate_config.rs.backup b/crates/boundless-cli/src/commands/prover/generate_config.rs.backup new file mode 100644 index 000000000..1e135f351 --- /dev/null +++ b/crates/boundless-cli/src/commands/prover/generate_config.rs.backup @@ -0,0 +1,1473 @@ +// Copyright 2025 Boundless Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use alloy::primitives::{Address, U256}; +use alloy::providers::Provider; +use anyhow::{bail, Context, Result}; +use chrono::Utc; +use clap::Args; +use inquire::{Confirm, Select, Text}; +use rand::Rng; +use url::Url; + +use super::benchmark::ProverBenchmark; +use crate::config::{GlobalConfig, ProverConfig, ProvingBackendConfig}; +use crate::config_file::Config; +use crate::display::{obscure_url, DisplayManager}; +use boundless_market::client::ClientBuilder; +use boundless_market::contracts::{RequestId, RequestInput, RequestInputType}; +use boundless_market::GuestEnv; +use risc0_zkvm::serde::from_slice; + +// Priority requestor addresses for market pricing analysis +const OG_OFFCHAIN_REQUESTOR: &str = "0xc197ebe12c7bcf1d9f3b415342bdbc795425335c"; +const OG_ONCHAIN_REQUESTOR: &str = "0xe198c6944cae382902a375b0b8673084270a7f8e"; +const SIGNAL_REQUESTOR: &str = "0x734df7809c4ef94da037449c287166d114503198"; + +// Cycle count range for signal requestor (50B to 54B cycles) +const SIGNAL_REQUESTOR_MIN_CYCLES: u64 = 50_000_000_000; +const SIGNAL_REQUESTOR_MAX_CYCLES: u64 = 54_000_000_000; + +// Number of blocks to query for market pricing analysis +const MARKET_PRICE_BLOCKS_TO_QUERY: u64 = 30000; + +// Chunk size for querying events to avoid RPC limits +const EVENT_QUERY_CHUNK_SIZE: u64 = 500; + +// Priority requestor list URLs +const PRIORITY_REQUESTOR_LIST_STANDARD: &str = + "https://requestors.boundless.network/boundless-recommended-priority-list.standard.json"; +const PRIORITY_REQUESTOR_LIST_LARGE: &str = + "https://requestors.boundless.network/boundless-recommended-priority-list.large.json"; + +// Peak performance threshold for enabling large requestor list (kHz) +const LARGE_REQUESTOR_LIST_THRESHOLD_KHZ: f64 = 4000.0; + +// Default minimum price per mega-cycle in collateral token (ZKC) for fulfilling +// orders locked by other provers that exceeded their lock timeout +const DEFAULT_MIN_MCYCLE_PRICE_COLLATERAL_TOKEN: &str = "0.0005"; + +mod selection_strings { + // Benchmark performance options + pub const BENCHMARK_RUN_SUITE: &str = + "Run the Boundless benchmark suite (requires a Bento instance running)"; + pub const BENCHMARK_MANUAL_ENTRY: &str = "Manually set peak performance (in kHz)"; + + // File handling strategy options + pub const FILE_MODIFY_EXISTING: &str = "Modify existing"; + pub const FILE_GENERATE_NEW: &str = "Generate new"; + pub const FILE_CANCEL: &str = "Cancel"; +} + +#[derive(Debug, Clone)] +struct MarketPricing { + median: f64, + percentile_25: f64, + sample_size: usize, +} + +/// Generate optimized broker.toml and compose.yml configuration files +#[derive(Args, Clone, Debug)] +pub struct ProverGenerateConfig { + /// Path to output broker.toml file + #[clap(long, default_value = "./broker.toml")] + pub broker_toml_file: PathBuf, + + /// Path to output compose.yml file + #[clap(long, default_value = "./compose.yml")] + pub compose_yml_file: PathBuf, + + /// Skip creating backups of existing files + #[clap(long)] + pub skip_backup: bool, +} + +#[derive(Debug)] +#[allow(dead_code)] +struct WizardConfig { + num_threads: usize, + num_gpus: usize, + max_exec_agents: usize, + max_concurrent_preflights: usize, + max_concurrent_proofs: usize, + peak_prove_khz: f64, + priority_requestor_lists: Vec, + max_collateral: String, + min_mcycle_price: String, + min_mcycle_price_collateral_token: String, +} + +#[derive(Debug, Clone, Copy)] +enum FileHandlingStrategy { + ModifyExisting, + GenerateNew, +} + +impl ProverGenerateConfig { + /// Run the generate-config command + pub async fn run(&self, global_config: &GlobalConfig) -> Result<()> { + let display = DisplayManager::new(); + + display.header("Boundless Prover Configuration Wizard"); + display.note("This wizard helps you create Broker and Bento configuration files,"); + display.note("customized for your prover setup, that allow you to compete in the"); + display.note("market and earn rewards."); + display.separator(); + + // Check file handling strategy + let broker_strategy = + self.ask_file_handling_strategy(&self.broker_toml_file, "broker.toml", &display)?; + let compose_strategy = + self.ask_file_handling_strategy(&self.compose_yml_file, "compose.yml", &display)?; + + display.separator(); + + // Run wizard to collect configuration + let config = self.run_wizard(&display, global_config).await?; + + display.separator(); + display.header("Generating Configuration Files"); + + // Backup and generate broker.toml + if let Some(backup_path) = self.backup_file(&self.broker_toml_file)? { + display.item_colored("Backup saved", backup_path.display(), "cyan"); + } + self.generate_broker_toml(&config, broker_strategy, &display)?; + display.item_colored("Created", self.broker_toml_file.display(), "green"); + + // Backup and generate compose.yml + if let Some(backup_path) = self.backup_file(&self.compose_yml_file)? { + display.item_colored("Backup saved", backup_path.display(), "cyan"); + } + self.generate_compose_yml(&config, compose_strategy)?; + display.item_colored("Created", self.compose_yml_file.display(), "green"); + + display.separator(); + self.show_success_message(&config, &display)?; + + Ok(()) + } + + async fn run_wizard( + &self, + display: &DisplayManager, + global_config: &GlobalConfig, + ) -> Result { + // Step 1: Machine configuration + display.step(1, 7, "Machine Configuration"); + + let run_on_single_machine = + Confirm::new("Do you plan to run your prover entirely on your current machine?") + .with_default(true) + .with_help_message("This wizard is optimized for single-machine setups") + .prompt() + .context("Failed to get user input")?; + + if !run_on_single_machine { + display.note("⚠ This wizard is optimized for single-machine setups."); + display.note(" Cluster setups may require additional manual configuration."); + display.note(&format!( + " Please refer to our documentation: {}", + "https://docs.boundless.network/provers/broker" + )); + + let continue_anyway = Confirm::new("Continue with configuration anyway?") + .with_default(false) + .with_help_message("Generated config may need manual adjustments for clusters") + .prompt() + .context("Failed to get confirmation")?; + + if !continue_anyway { + display.note("Configuration cancelled."); + bail!("Configuration cancelled by user"); + } + } + + let detected_threads = detect_cpu_threads()?; + display.item_colored("Detected", format!("{} CPU threads", detected_threads), "cyan"); + + let input = Text::new("How many CPU threads do you want to use?") + .with_default(&detected_threads.to_string()) + .prompt() + .context("Failed to get CPU thread count")?; + let num_threads = input.parse::().context("Invalid number format")?; + display.item_colored("Using", format!("{} CPU threads", num_threads), "green"); + + // Step 2: GPU configuration + display.separator(); + display.step(2, 7, "GPU Configuration"); + + let num_gpus = match detect_gpus() { + Ok(count) if count > 0 => { + display.item_colored("Detected", format!("{} GPU(s)", count), "cyan"); + count + } + _ => { + let input = Text::new("How many GPUs do you have?") + .with_default("1") + .with_help_message("Enter 0 if you don't have any GPUs") + .prompt() + .context("Failed to get GPU count")?; + input.parse::().context("Invalid number format")? + } + }; + + // Step 3: Calculated Configuration + display.separator(); + display.step(3, 7, "Calculated Configuration"); + + display.note("The following values are calculated based on your hardware:"); + display.note(""); + + let max_exec_agents = (num_threads.saturating_sub(4).saturating_sub(num_gpus * 2)) / 2; + display.note(" Formula: max_exec_agents ="); + display.note(" ("); + display.note(&format!(" {} threads", num_threads)); + display.note(" - 1 # reserve for postgres"); + display.note(" - 1 # reserve for redis"); + display.note(" - 1 # reserve for minio"); + display.note(&format!(" - {} GPUs × 2 # reserve two threads per GPU", num_gpus)); + display.note(" )"); + display.note(" / 2 # 2 threads per exec agent"); + display.item_colored(" Result", format!("{} exec agents", max_exec_agents), "cyan"); + display.note(""); + + let max_concurrent_preflights = max_exec_agents.saturating_sub(2).max(1); + display.note(" Formula: max_concurrent_preflights ="); + display.note(" ("); + display.note(&format!(" {} exec agents", max_exec_agents)); + display.note(" - 1 # reserve for proofs"); + display.note(" - 1 # reserve for mining"); + display.note(" )"); + display.item_colored( + " Result", + format!("{} concurrent preflights", max_concurrent_preflights), + "cyan", + ); + display.note(""); + + let max_concurrent_proofs = 1; + display.note(" Formula: max_concurrent_proofs = 1 (fixed)"); + display.item_colored( + " Result", + format!("{} concurrent proof", max_concurrent_proofs), + "cyan", + ); + + // Step 4: Performance Benchmarking + display.separator(); + display.step(4, 7, "Performance Benchmarking"); + + let peak_prove_khz = self.get_peak_performance(display, global_config).await?.floor(); + display.item_colored( + "Setting `peak_prove_khz` to", + format!("{:.0} kHz", peak_prove_khz), + "green", + ); + + // Step 5: Priority Requestor Lists + display.separator(); + display.step(5, 7, "Priority Requestor Lists"); + + display.note("Requestor priority lists specify proof requestors that the broker should"); + display + .note("prioritize for proving. Requestors on these lists are considered more likely"); + display + .note("to request useful work with profitable pricing, and thus are prioritized over"); + display.note("other requestors."); + display.note(""); + display.note("Boundless Networks maintains recommended requestor lists:"); + display.note(" • Standard list: For all provers (general workloads)"); + display.note(&format!( + " • Large list: For provers >{:.0} kHz (includes high-cycle orders)", + LARGE_REQUESTOR_LIST_THRESHOLD_KHZ + )); + display.note(""); + + let priority_requestor_lists = if peak_prove_khz > LARGE_REQUESTOR_LIST_THRESHOLD_KHZ { + display.note(&format!( + "Your cluster's peak performance of {:.0} kHz qualifies for large orders.", + peak_prove_khz + )); + display.note("We recommend enabling both the standard and large requestor lists."); + vec![ + PRIORITY_REQUESTOR_LIST_STANDARD.to_string(), + PRIORITY_REQUESTOR_LIST_LARGE.to_string(), + ] + } else { + display.note(&format!( + "Your cluster's peak performance of {:.0} kHz is suitable for standard orders.", + peak_prove_khz + )); + display.note("We recommend enabling the standard requestor list."); + vec![PRIORITY_REQUESTOR_LIST_STANDARD.to_string()] + }; + + for list in &priority_requestor_lists { + display.item_colored(" List", list, "cyan"); + } + + // Step 6: Collateral Configuration + display.separator(); + display.step(6, 7, "Collateral Configuration"); + + let recommended_collateral = if priority_requestor_lists.len() > 1 { "200" } else { "50" }; + + display.note(&format!( + "We recommend a max collateral of {} ZKC for your configuration.", + recommended_collateral + )); + display.note(" • 50 ZKC: Recommended for standard requestor list (lower risk)"); + display.note(" • 200 ZKC: Recommended for standard + large lists (higher rewards, higher risk)"); + display.note(""); + display.note("Higher collateral enables higher-reward orders but increases slashing risks."); + + let max_collateral = Text::new("Max collateral (ZKC):") + .with_default(recommended_collateral) + .with_help_message("Press Enter to use recommended value") + .prompt() + .context("Failed to get max collateral")?; + + display.item_colored("Max collateral", format!("{} ZKC", max_collateral), "green"); + + // Step 7: Pricing Configuration + display.separator(); + display.step(7, 7, "Pricing Configuration"); + + display.note("Analyzing recent market prices to determine competitive pricing..."); + display.note(""); + + // Get RPC URL for market query + let rpc_url = self.get_or_prompt_rpc_url(display)?; + + // Validate chain ID to ensure it's Base Mainnet + display.status("Status", "Validating RPC connection", "yellow"); + let temp_provider = alloy::providers::ProviderBuilder::new() + .connect(rpc_url.as_ref()) + .await + .context("Failed to connect to RPC provider")?; + + let chain_id = temp_provider + .get_chain_id() + .await + .context("Failed to query chain ID from RPC provider")?; + + if chain_id == 1 { + display.note("⚠ Detected Ethereum Mainnet (Chain ID: 1)"); + display.note(" Boundless Market is deployed on Base Mainnet (Chain ID: 8453)"); + display.note(" Please use a Base Mainnet RPC URL instead."); + bail!("Incorrect chain detected: Ethereum Mainnet. Base Mainnet required."); + } + + if chain_id != 8453 { + display.note(&format!("⚠ Detected Chain ID: {}", chain_id)); + display.note(" Boundless Market is deployed on Base Mainnet (Chain ID: 8453)"); + let continue_anyway = Confirm::new("Continue with this RPC anyway?") + .with_default(false) + .prompt() + .context("Failed to get confirmation")?; + if !continue_anyway { + bail!("Incorrect chain detected"); + } + } + + // Query market pricing with fallback to defaults + let market_pricing = match self.query_market_pricing(&rpc_url, display, global_config).await + { + Ok(pricing) => { + display.item_colored( + "Market analysis", + format!("{} orders analyzed", pricing.sample_size), + "green", + ); + display.item_colored( + "Median price", + format!( + "{:.10} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", + pricing.median, + pricing.median * 1e9, + pricing.median * 1e12 + ), + "cyan", + ); + display.item_colored( + "25th percentile", + format!( + "{:.10} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", + pricing.percentile_25, + pricing.percentile_25 * 1e9, + pricing.percentile_25 * 1e12 + ), + "cyan", + ); + Some(pricing) + } + Err(e) => { + display.note(&format!("⚠ Failed to query market prices: {}", e)); + display.note("Falling back to default pricing"); + None + } + }; + + // Prompt user to accept or override + let min_mcycle_price = if let Some(pricing) = market_pricing { + display.note(""); + display.note(&format!( + "Recommended minimum price: {:.10} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", + pricing.percentile_25, + pricing.percentile_25 * 1e9, + pricing.percentile_25 * 1e12 + )); + display.note(""); + display + .note("This value is computed based on recent market prices. It ensures you are"); + display + .note("priced competitively such that you will be able to lock and fulfill orders"); + display.note("for ETH rewards in the market."); + display.note(""); + + Text::new("Press Enter to accept or enter custom price:") + .with_default(&format!("{:.10}", pricing.percentile_25)) + .with_help_message("You can update this later in broker.toml") + .prompt() + .context("Failed to get price")? + } else { + // Fallback to manual entry if query failed + Text::new("Minimum price per megacycle (ETH):") + .with_default("0.00000001") + .with_help_message("You can update this later in broker.toml") + .prompt() + .context("Failed to get price")? + }; + + // Collateral token pricing + display.separator(); + display.note(""); + display.note("Collateral Token Pricing (Proof Races):"); + display.note(""); + display.note("When another prover fails to fulfill their locked order within the timeout,"); + display.note("they are slashed. A portion of their collateral (in ZKC) becomes available"); + display.note("as a reward for any prover who can fulfill that order in a 'proof race'."); + display.note(""); + display.note("The setting below controls the minimum ZKC reward your broker will accept"); + display.note("to participate in these proof races (competing to fulfill slashed orders)."); + display.note(""); + display.note("Example: If set to 0.0005 ZKC/Mcycle, a 1000 Mcycle slashed order must"); + display.note(" offer at least 0.5 ZKC reward for your broker to compete for it."); + display.note(""); + display.note(&format!( + "Default minimum price: {} ZKC/Mcycle", + DEFAULT_MIN_MCYCLE_PRICE_COLLATERAL_TOKEN + )); + display.note(""); + + let min_mcycle_price_collateral_token = + Text::new("Minimum price per Mcycle (in ZKC collateral rewards):") + .with_default(DEFAULT_MIN_MCYCLE_PRICE_COLLATERAL_TOKEN) + .with_help_message("You can update this later in broker.toml") + .prompt() + .context("Failed to get collateral token price")?; + + display.item_colored( + "Collateral price", + format!("{} ZKC/Mcycle", min_mcycle_price_collateral_token), + "green", + ); + + let price_f64 = min_mcycle_price.parse::().unwrap_or(0.0); + display.item_colored( + "Min price", + format!( + "{} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", + min_mcycle_price, + price_f64 * 1e9, + price_f64 * 1e12 + ), + "green", + ); + + Ok(WizardConfig { + num_threads, + num_gpus, + max_exec_agents, + max_concurrent_preflights, + max_concurrent_proofs, + peak_prove_khz, + priority_requestor_lists, + max_collateral, + min_mcycle_price, + min_mcycle_price_collateral_token, + }) + } + + fn try_extract_cycle_count(input: &RequestInput) -> Option { + // Check if inline input + if input.inputType != RequestInputType::Inline { + tracing::debug!("Skipping URL-based input for cycle count extraction"); + return None; + } + + // Decode GuestEnv + match GuestEnv::decode(&input.data) { + Ok(guest_env) => { + // Convert stdin bytes to u32 words for risc0 deserialization + // risc0 serde uses u32 words, need to convert from bytes + match bytemuck::try_cast_slice::(&guest_env.stdin) { + Ok(words) => { + // Decode first u64 from stdin words + match from_slice::(words) { + Ok(cycle_count) => { + tracing::trace!( + "Successfully decoded cycle count: {}", + cycle_count + ); + Some(cycle_count) + } + Err(e) => { + tracing::debug!("Failed to decode cycle count from stdin: {}", e); + None + } + } + } + Err(e) => { + tracing::debug!("Failed to convert stdin bytes to u32 words: {}", e); + None + } + } + } + Err(e) => { + tracing::debug!("Failed to decode GuestEnv: {}", e); + None + } + } + } + + // TODO: Migrate to getting market price from the indexer API once available. + async fn query_market_pricing( + &self, + rpc_url: &Url, + display: &DisplayManager, + global_config: &GlobalConfig, + ) -> Result { + display.status("Status", "Querying recent market prices", "yellow"); + display.note("This may take a moment..."); + + // Priority requestors to filter for + let priority_requestors: Vec

= vec![ + OG_OFFCHAIN_REQUESTOR.parse()?, + OG_ONCHAIN_REQUESTOR.parse()?, + SIGNAL_REQUESTOR.parse()?, + ]; + + // Build market client + let timeout = global_config.tx_timeout.unwrap_or(std::time::Duration::from_secs(300)); + let client = ClientBuilder::new() + .with_rpc_url(rpc_url.clone()) + .with_timeout(timeout) + .build() + .await + .context("Failed to create market client")?; + + // Get current block and calculate range + let current_block = client.provider().get_block_number().await?; + let start_block = current_block.saturating_sub(MARKET_PRICE_BLOCKS_TO_QUERY); + + display.note(&format!( + "Querying market prices from block {} to {} in chunks of {}", + start_block, current_block, EVENT_QUERY_CHUNK_SIZE + )); + + // Query RequestLocked events in chunks + let mut locked_logs = Vec::new(); + let mut chunk_start = start_block; + while chunk_start < current_block { + let chunk_end = (chunk_start + EVENT_QUERY_CHUNK_SIZE).min(current_block); + + let locked_filter = client + .boundless_market + .instance() + .RequestLocked_filter() + .from_block(chunk_start) + .to_block(chunk_end); + + let mut chunk_logs = + locked_filter.query().await.context("Failed to query RequestLocked events")?; + + locked_logs.append(&mut chunk_logs); + chunk_start = chunk_end + 1; + } + + display.note(&format!("Found {} locked orders", locked_logs.len())); + + // Query RequestFulfilled events in chunks + let mut fulfilled_logs = Vec::new(); + let mut chunk_start = start_block; + while chunk_start < current_block { + let chunk_end = (chunk_start + EVENT_QUERY_CHUNK_SIZE).min(current_block); + + let fulfilled_filter = client + .boundless_market + .instance() + .RequestFulfilled_filter() + .from_block(chunk_start) + .to_block(chunk_end); + + let mut chunk_logs = fulfilled_filter + .query() + .await + .context("Failed to query RequestFulfilled events")?; + + fulfilled_logs.append(&mut chunk_logs); + chunk_start = chunk_end + 1; + } + + display.note(&format!("Found {} fulfilled orders", fulfilled_logs.len())); + + // Build map of fulfilled requests with their block timestamps + let mut fulfilled_map: HashMap = HashMap::new(); + for (event, log_meta) in &fulfilled_logs { + let request_id: U256 = event.requestId; + if let Some(block_timestamp) = log_meta.block_timestamp { + fulfilled_map.insert(request_id, block_timestamp); + } + } + + // Process locked orders + let mut prices_per_mcycle: Vec = Vec::new(); + + for (event, log_meta) in &locked_logs { + let request_id: U256 = event.requestId; + let requestor = RequestId::from_lossy(event.request.id).addr; + + // Filter: only priority requestors + if !priority_requestors.contains(&requestor) { + continue; + } + + // Filter: only fulfilled orders + let fulfilled_timestamp = match fulfilled_map.get(&request_id) { + Some(×tamp) => timestamp, + None => continue, + }; + + // Get block timestamp for locked order from log metadata + let lock_timestamp = match log_meta.block_timestamp { + Some(ts) => ts, + None => continue, + }; + + // Calculate lock deadline + let lock_deadline = event.request.offer.lock_deadline(); + + // Filter: only orders fulfilled before lockExpiry + if fulfilled_timestamp > lock_deadline { + continue; + } + + // Calculate price at lock time using Offer.price_at + let locked_price = match event.request.offer.price_at(lock_timestamp) { + Ok(price) => price, + Err(_) => { + bail!( + "Failed to calculate price at lock time for request ID 0x{:x}", + request_id + ); + } + }; + + // Extract or estimate cycle count + let cycles = if requestor == SIGNAL_REQUESTOR.parse::
()? { + let mut rng = rand::rng(); + let random_cycles = + rng.random_range(SIGNAL_REQUESTOR_MIN_CYCLES..=SIGNAL_REQUESTOR_MAX_CYCLES); + tracing::trace!( + "Signal requestor detected, using random cycle count between {}B and {}B: {}", + SIGNAL_REQUESTOR_MIN_CYCLES / 1_000_000_000, + SIGNAL_REQUESTOR_MAX_CYCLES / 1_000_000_000, + random_cycles + ); + random_cycles + } else { + match Self::try_extract_cycle_count(&event.request.input) { + Some(cycles) => { + tracing::debug!( + "Using decoded cycle count: {} for request ID 0x{:x}", + cycles, + request_id + ); + cycles + } + None => { + bail!("Failed to extract cycle count from order generator request input for request ID 0x{:x}", request_id); + } + } + }; + + // Calculate price per megacycle + // locked_price is in wei, need to convert to eth per mcycle + if locked_price > U256::ZERO { + let price_f64 = locked_price.to_string().parse::().unwrap_or(0.0); + let price_per_cycle = price_f64 / cycles as f64; + let price_per_mcycle = price_per_cycle * 1_000_000.0; + + // Convert from wei to eth + let price_per_mcycle_eth = price_per_mcycle / 1e18; + + prices_per_mcycle.push(price_per_mcycle_eth); + // Gwei per mcycle + let price_per_mcycle_gwei = price_per_mcycle / 1e9; + let locked_price_eth = price_f64 / 1e18; + tracing::debug!("Added price per mcycle: {} ETH/Mcycle ({:.2} Gwei/Mcycle, cycles: {}, locked_price: {} wei ({} eth) for request ID 0x{:x}", price_per_mcycle_eth, price_per_mcycle_gwei, cycles, locked_price, locked_price_eth, request_id); + } + } + + if prices_per_mcycle.is_empty() { + bail!("No valid market data found for pricing analysis"); + } + + let sample_size = prices_per_mcycle.len(); + display.note(&format!("Analyzed {} qualifying orders", sample_size)); + + // Sort prices for percentile calculations + prices_per_mcycle.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + + // Calculate median (50th percentile) + let median_price = if prices_per_mcycle.len() % 2 == 0 { + let mid = prices_per_mcycle.len() / 2; + (prices_per_mcycle[mid - 1] + prices_per_mcycle[mid]) / 2.0 + } else { + prices_per_mcycle[prices_per_mcycle.len() / 2] + }; + + // Calculate 25th percentile + let percentile_25_idx = ((prices_per_mcycle.len() as f64) * 0.25) as usize; + let percentile_25 = prices_per_mcycle[percentile_25_idx.min(prices_per_mcycle.len() - 1)]; + + Ok(MarketPricing { median: median_price, percentile_25, sample_size }) + } + + async fn get_peak_performance( + &self, + display: &DisplayManager, + global_config: &GlobalConfig, + ) -> Result { + display.note("Configuration requires an estimate of the peak performance of your proving"); + display.note("cluster."); + display.note(""); + display.note("Boundless provides a benchmarking suite for estimating your cluster's"); + display.note("peak performance."); + display.warning("The benchmark suite requires access to a running Bento proving cluster."); + display.note("See https://docs.boundless.network/provers/quick-start#running-a-test-proof for information on how to run Bento."); + display.note(""); + + let choice = Select::new( + "How would you like to set the peak performance?", + vec![ + selection_strings::BENCHMARK_RUN_SUITE, + selection_strings::BENCHMARK_MANUAL_ENTRY, + ], + ) + .prompt() + .context("Failed to get benchmark choice")?; + + match choice { + selection_strings::BENCHMARK_RUN_SUITE => { + // Get RPC URL before running benchmark + display.separator(); + display.status("Status", "Checking RPC configuration", "yellow"); + let rpc_url = self.get_or_prompt_rpc_url(display)?; + + // Try to detect Bento at localhost + let default_bento_url = "http://localhost:8081"; + let bento_available = check_bento_health(default_bento_url).await.is_ok(); + + if bento_available { + display.item_colored("Bento", "Detected at http://localhost:8081", "green"); + + let use_detected = Confirm::new("Use this Bento instance for benchmarking?") + .with_default(true) + .prompt() + .context("Failed to get confirmation")?; + + if use_detected { + display.status("Status", "Running benchmark", "yellow"); + display.note("This may take several minutes..."); + + match self.run_benchmark(default_bento_url, &rpc_url, global_config).await { + Ok(khz) => { + let adjusted_khz = khz * 0.75; + display.item_colored( + "Benchmark result", + format!("{:.2} kHz", khz), + "cyan", + ); + display.item_colored( + "Adjusted (75%)", + format!("{:.2} kHz", adjusted_khz), + "cyan", + ); + return Ok(adjusted_khz); + } + Err(e) => { + display.note(&format!("⚠ Benchmark failed: {}", e)); + display.note("Falling back to manual input..."); + } + } + } + } + + // If not detected or user chose not to use detected, ask for custom URL + if !bento_available { + display.note("⚠ Bento not detected at http://localhost:8081"); + } + + let provide_url = + Confirm::new("Do you have a Bento instance running at a different URL?") + .with_default(false) + .prompt() + .context("Failed to get URL confirmation")?; + + if provide_url { + let bento_url = Text::new("What is your Bento URL?") + .with_help_message("e.g., http://your-server:8081") + .prompt() + .context("Failed to get Bento URL")?; + + if check_bento_health(&bento_url).await.is_ok() { + display.status("Status", "Running benchmark", "yellow"); + display.note("This may take several minutes..."); + + match self.run_benchmark(&bento_url, &rpc_url, global_config).await { + Ok(khz) => { + let adjusted_khz = khz * 0.75; + display.item_colored( + "Benchmark result", + format!("{:.2} kHz", khz), + "cyan", + ); + display.item_colored( + "Adjusted (75%)", + format!("{:.2} kHz", adjusted_khz), + "cyan", + ); + return Ok(adjusted_khz); + } + Err(e) => { + display.note(&format!("⚠ Benchmark failed: {}", e)); + display.note("Falling back to manual input..."); + } + } + } else { + display.note(&format!("⚠ Could not connect to Bento at {}", bento_url)); + display.note("Falling back to manual input..."); + } + } + + // Fall through to manual input + let khz_str = Text::new("Peak performance (kHz):") + .with_default("100") + .with_help_message("You can update this later in broker.toml") + .prompt() + .context("Failed to get peak performance")?; + + khz_str.parse::().context("Invalid performance value") + } + selection_strings::BENCHMARK_MANUAL_ENTRY => { + let khz_str = Text::new("Peak performance (kHz):") + .with_default("100") + .with_help_message("You can update this later in broker.toml") + .prompt() + .context("Failed to get peak performance")?; + + khz_str.parse::().context("Invalid performance value") + } + _ => unreachable!(), + } + } + + async fn run_benchmark( + &self, + bento_url: &str, + rpc_url: &Url, + global_config: &GlobalConfig, + ) -> Result { + // Use the hardcoded test request ID for benchmarking + // TODO: use a representative request ID for benchmarking. This is a OG order of ~500M cycles. + let request_id = "0xc197ebe12c7bcf1d9f3b415342bdbc795425335c01379fa6" + .parse::() + .context("Failed to parse request ID")?; + + // Create the benchmark command with proper configuration + let benchmark = ProverBenchmark { + request_ids: vec![request_id], + prover_config: ProverConfig { + prover_rpc_url: Some(rpc_url.clone()), + private_key: None, + prover_address: None, + deployment: None, + proving_backend: ProvingBackendConfig { + bento_api_url: bento_url.to_string(), + bento_api_key: None, + use_default_prover: false, + skip_health_check: false, + }, + }, + }; + + // Execute the benchmark and return the worst KHz value + benchmark.run(global_config).await + } + + fn get_or_prompt_rpc_url(&self, display: &DisplayManager) -> Result { + // Try to load existing prover configuration + if let Ok(config) = Config::load() { + if let Some(_prover_config) = config.prover { + // Try to load the full ProverConfig with RPC URL from environment/secrets + let full_config = ProverConfig { + prover_rpc_url: None, + private_key: None, + prover_address: None, + deployment: None, + proving_backend: ProvingBackendConfig { + bento_api_url: "http://localhost:8081".to_string(), + bento_api_key: None, + use_default_prover: false, + skip_health_check: true, + }, + }; + + if let Ok(loaded_config) = full_config.load_from_files() { + if let Some(rpc_url) = loaded_config.prover_rpc_url { + display.item_colored("RPC URL", obscure_url(rpc_url.as_ref()), "green"); + return Ok(rpc_url); + } + } + } + } + + // Check environment variable + if let Ok(rpc_url) = std::env::var("PROVER_RPC_URL") { + let url = + rpc_url.parse::().context("Invalid PROVER_RPC_URL environment variable")?; + display.item_colored("RPC URL", obscure_url(url.as_ref()), "green"); + return Ok(url); + } + + // No RPC URL found, prompt user + display.note("⚠ No RPC URL configured for prover"); + display.note("An RPC URL is required to fetch benchmark request data from the"); + display.note("blockchain."); + display.note(""); + + let rpc_url = + Text::new("Enter Base Mainnet RPC URL:").prompt().context("Failed to get RPC URL")?; + + let url = rpc_url.parse::().context("Invalid RPC URL format")?; + + display.item_colored("RPC URL", obscure_url(url.as_ref()), "green"); + Ok(url) + } + + fn backup_file(&self, file_path: &Path) -> Result> { + // Skip if backup flag is set or file doesn't exist + if self.skip_backup || !file_path.exists() { + return Ok(None); + } + + // Create backup directory + let home = dirs::home_dir().context("Failed to get home directory")?; + let backup_dir = home.join(".boundless").join("backups"); + std::fs::create_dir_all(&backup_dir).context("Failed to create backup directory")?; + + // Create timestamped backup filename + let timestamp = Utc::now().format("%Y%m%d_%H%M%S"); + let filename = file_path.file_name().context("Invalid file path")?.to_string_lossy(); + let backup_filename = format!("{}.{}.bak", filename, timestamp); + let backup_path = backup_dir.join(backup_filename); + + // Copy file to backup location + std::fs::copy(file_path, &backup_path) + .with_context(|| format!("Failed to create backup at {}", backup_path.display()))?; + + Ok(Some(backup_path)) + } + + fn strip_tagged_section(content: &str, opening_tag: &str, closing_tag: &str) -> String { + if let Some(start) = content.find(opening_tag) { + if let Some(end) = content[start..].find(closing_tag) { + let mut section_end = start + end + closing_tag.len(); + + // Also skip the newline after the closing tag if present + if section_end < content.len() && content.as_bytes()[section_end] == b'\n' { + section_end += 1; + } + + let before = &content[..start]; + let after = &content[section_end..]; + + format!("{}{}", before, after) + } else { + content.to_string() + } + } else { + content.to_string() + } + } + + fn generate_broker_toml( + &self, + config: &WizardConfig, + strategy: FileHandlingStrategy, + display: &DisplayManager, + ) -> Result<()> { + // Load source (template or existing file) + let source = match strategy { + FileHandlingStrategy::ModifyExisting => std::fs::read_to_string(&self.broker_toml_file) + .context("Failed to read existing broker.toml")?, + FileHandlingStrategy::GenerateNew => { + include_str!("../../../../../broker-template.toml").to_string() + } + }; + + // Parse with toml_edit (preserves comments and formatting) + let mut doc = source.parse::().context("Failed to parse TOML")?; + + // Create CLI wizard metadata section with pretty tags + let metadata_section = format!( + "### [CLI Wizard Metadata] #####\n\ + # Generated by boundless-cli v{} on {}\n\ + # Hardware: {} threads, {} GPU(s)\n\ + # Peak performance: {:.0} kHz\n\ + ### [End] ###\n\n", + env!("CARGO_PKG_VERSION"), + Utc::now().format("%Y-%m-%d"), + config.num_threads, + config.num_gpus, + config.peak_prove_khz + ); + + // Get current prefix + let current_prefix = doc + .as_table() + .decor() + .prefix() + .map(|s| s.as_str().unwrap_or("").to_string()) + .unwrap_or_default(); + + // Strip disclaimer section (from template) + let cleaned_prefix = + Self::strip_tagged_section(¤t_prefix, "### [Disclaimer] ###", "### [End] ###"); + + // Strip any existing CLI wizard metadata (from previous runs) + let cleaned_prefix = Self::strip_tagged_section( + &cleaned_prefix, + "### [CLI Wizard Metadata] #####", + "### [End] ###", + ); + + // Add new metadata section + doc.as_table_mut() + .decor_mut() + .set_prefix(format!("{}{}", metadata_section, cleaned_prefix)); + + // Update market section + if let Some(market) = doc.get_mut("market").and_then(|v| v.as_table_mut()) { + // Update peak_prove_khz with calculation comment + if let Some(item) = market.get_mut("peak_prove_khz") { + let comment = + format!("\n# Calculated from benchmark: {:.2} kHz\n", config.peak_prove_khz); + if let Some(value) = item.as_value_mut() { + value.decor_mut().set_prefix(comment); + } + *item = toml_edit::value(config.peak_prove_khz as i64); + } + + // Update max_collateral + if let Some(item) = market.get_mut("max_collateral") { + *item = toml_edit::value(config.max_collateral.clone()); + } + + // Update max_concurrent_proofs with calculation comment + if let Some(item) = market.get_mut("max_concurrent_proofs") { + let comment = format!("\n# Set based on GPU count: {} GPU(s)\n", config.num_gpus); + if let Some(value) = item.as_value_mut() { + value.decor_mut().set_prefix(comment); + } + *item = toml_edit::value(config.max_concurrent_proofs as i64); + } + + // Update priority_requestor_lists + if let Some(item) = market.get_mut("priority_requestor_lists") { + let mut arr = toml_edit::Array::new(); + for list in &config.priority_requestor_lists { + arr.push(list.clone()); + } + *item = toml_edit::value(arr); + } + + // Update max_concurrent_preflights with calculation comment + if let Some(item) = market.get_mut("max_concurrent_preflights") { + let comment = format!( + "\n# Calculated:\n\ + # max_concurrent_preflights = (\n\ + # (\n\ + # {} threads\n\ + # - 1 # reserve for postgres\n\ + # - 1 # reserve for redis\n\ + # - 1 # reserve for minio\n\ + # - {} GPUs × 2 # reserve two threads per GPU\n\ + # )\n\ + # / 2 # 2 threads per exec agent\n\ + # - 1 # reserve for proofs\n\ + # - 1 # reserve for mining\n\ + # )\n\ + # = {}\n", + config.num_threads, config.num_gpus, config.max_concurrent_preflights + ); + if let Some(value) = item.as_value_mut() { + value.decor_mut().set_prefix(comment); + } + *item = toml_edit::value(config.max_concurrent_preflights as i64); + } + + // Update min_mcycle_price + if let Some(item) = market.get_mut("min_mcycle_price") { + let should_update = match strategy { + FileHandlingStrategy::ModifyExisting => { + // Get existing price and compare with recommended price + let existing_price_str = item.as_str().unwrap_or("0"); + let existing_price = existing_price_str.parse::().unwrap_or(0.0); + let recommended_price = + config.min_mcycle_price.parse::().unwrap_or(0.0); + + if existing_price <= recommended_price && existing_price > 0.0 { + // Existing price is already competitive, don't raise it + display.note(""); + display.note(&format!( + "Your min_mcycle_price is already priced competitively at {} ETH/Mcycle. Not modifying.", + existing_price_str + )); + display.note(""); + false + } else { + // Recommended price is lower (more competitive), update it + true + } + } + FileHandlingStrategy::GenerateNew => true, + }; + + if should_update { + *item = toml_edit::value(config.min_mcycle_price.clone()); + } + } + } + + // Write to file + std::fs::write(&self.broker_toml_file, doc.to_string()) + .context("Failed to write broker.toml")?; + + Ok(()) + } + + fn generate_compose_yml( + &self, + config: &WizardConfig, + strategy: FileHandlingStrategy, + ) -> Result<()> { + // We use string manipulation instead of YAML parsing libraries because + // compose.yml uses YAML anchors (&) and aliases (*) which are + // not preserved by most Rust YAML libraries (serde_yaml, etc.). + // This ensures all comments, formatting, and anchor definitions remain intact. + + // Load source (template or existing file) + let mut content = match strategy { + FileHandlingStrategy::ModifyExisting => std::fs::read_to_string(&self.compose_yml_file) + .context("Failed to read existing compose.yml")?, + FileHandlingStrategy::GenerateNew => { + include_str!("../../../../../compose.yml").to_string() + } + }; + + // Update exec_agent replicas + content = self.update_exec_agent_replicas(content, config.max_exec_agents)?; + + // Add additional GPU agents if needed + if config.num_gpus > 1 { + content = self.add_gpu_agents(content, config.num_gpus)?; + } + + // Write to file + std::fs::write(&self.compose_yml_file, content).context("Failed to write compose.yml")?; + + Ok(()) + } + + fn update_exec_agent_replicas(&self, content: String, replicas: usize) -> Result { + let lines: Vec<&str> = content.lines().collect(); + let mut result: Vec = Vec::new(); + let mut in_exec_agent = false; + let mut in_deploy = false; + + for line in lines { + let mut updated_line = line.to_string(); + + // Track if we're in the exec_agent section + if line.starts_with(" exec_agent:") { + in_exec_agent = true; + in_deploy = false; + } else if in_exec_agent + && line.starts_with(" ") + && !line.starts_with(" ") + && line.len() > 2 + { + // We've hit another service at the same level, exit exec_agent section + in_exec_agent = false; + in_deploy = false; + } + + // Track if we're in the deploy subsection + if in_exec_agent && line.trim().starts_with("deploy:") { + in_deploy = true; + } else if in_deploy && !line.starts_with(" ") && !line.trim().is_empty() { + // Exit deploy section if we hit a line at same or lower indentation + in_deploy = false; + } + + // Update replicas line if we're in the right section + if in_exec_agent && in_deploy && line.trim().starts_with("replicas:") { + let indent = line.chars().take_while(|c| c.is_whitespace()).collect::(); + updated_line = format!("{}replicas: {}", indent, replicas); + } + + result.push(updated_line); + } + + Ok(result.join("\n")) + } + + fn add_gpu_agents(&self, content: String, num_gpus: usize) -> Result { + let lines: Vec<&str> = content.lines().collect(); + let mut result: Vec = Vec::new(); + + // Find gpu_prove_agent0 section boundaries + let mut gpu_agent_start = None; + let mut gpu_agent_end = None; + + for (i, line) in lines.iter().enumerate() { + if line.starts_with(" gpu_prove_agent0:") { + gpu_agent_start = Some(i); + } else if gpu_agent_start.is_some() && gpu_agent_end.is_none() { + // Look for next service at same indentation level (2 spaces, followed by a letter) + if line.starts_with(" ") + && !line.starts_with(" ") + && line.len() > 2 + && line.chars().nth(2).unwrap().is_alphabetic() + { + gpu_agent_end = Some(i); + break; + } + } + } + + let start = + gpu_agent_start.context("Could not find gpu_prove_agent0 section in compose.yml")?; + let end = gpu_agent_end.unwrap_or(lines.len()); + + // Extract the gpu_prove_agent0 section + let gpu_agent_lines: Vec<&str> = lines[start..end].to_vec(); + + // Build result: everything up to and including gpu_prove_agent0 + for line in &lines[..end] { + result.push(line.to_string()); + } + + // Add additional GPU agents + for i in 1..num_gpus { + result.push(String::new()); // Empty line between services + + for line in &gpu_agent_lines { + let mut new_line = line.to_string(); + // Replace service name + if new_line.contains("gpu_prove_agent0") { + new_line = + new_line.replace("gpu_prove_agent0", &format!("gpu_prove_agent{}", i)); + } + // Replace device_ids + if new_line.contains(r#"device_ids: ["0"]"#) { + new_line = new_line + .replace(r#"device_ids: ["0"]"#, &format!(r#"device_ids: ["{}"]"#, i)); + } + result.push(new_line); + } + } + + // Add remaining content + for line in &lines[end..] { + result.push(line.to_string()); + } + + Ok(result.join("\n")) + } + + fn ask_file_handling_strategy( + &self, + file_path: &Path, + file_type: &str, + display: &DisplayManager, + ) -> Result { + if !file_path.exists() { + return Ok(FileHandlingStrategy::GenerateNew); + } + + display.item_colored( + "Found", + format!("existing {} at {}", file_type, file_path.display()), + "yellow", + ); + + let options = vec![ + selection_strings::FILE_MODIFY_EXISTING, + selection_strings::FILE_GENERATE_NEW, + selection_strings::FILE_CANCEL, + ]; + + let choice = + Select::new(&format!("What would you like to do with {}?", file_type), options) + .with_help_message("Modify preserves your customizations and comments") + .prompt() + .context("Failed to get file handling choice")?; + + match choice { + selection_strings::FILE_MODIFY_EXISTING => Ok(FileHandlingStrategy::ModifyExisting), + selection_strings::FILE_GENERATE_NEW => Ok(FileHandlingStrategy::GenerateNew), + selection_strings::FILE_CANCEL => { + bail!("Configuration cancelled by user"); + } + _ => unreachable!(), + } + } + + fn show_success_message(&self, config: &WizardConfig, display: &DisplayManager) -> Result<()> { + display.header("✓ Configuration Complete!"); + + display.note("Generated files:"); + display.item_colored(" Broker config", self.broker_toml_file.display(), "green"); + display.item_colored(" Compose config", self.compose_yml_file.display(), "green"); + + display.separator(); + display.note("Next steps:"); + display.note("1. Set the following environment variables:"); + + // Check if env vars are set + let private_key_set = + std::env::var("PROVER_PRIVATE_KEY").or_else(|_| std::env::var("PRIVATE_KEY")).is_ok(); + let prover_rpc_url_set = std::env::var("PROVER_RPC_URL").is_ok(); + let legacy_rpc_url_set = std::env::var("RPC_URL").is_ok(); + let reward_address_set = std::env::var("REWARD_ADDRESS").is_ok(); + + if private_key_set { + display.item_colored(" PROVER_PRIVATE_KEY", "✓ Already set", "green"); + } else { + display.note(" export PROVER_PRIVATE_KEY="); + } + + match (prover_rpc_url_set, legacy_rpc_url_set) { + (true, _) => { + display.item_colored(" PROVER_RPC_URL", "✓ Already set", "green"); + } + (false, true) => { + display.item_colored( + " RPC_URL (legacy)", + "✓ Already set (fallback in use)", + "yellow", + ); + display.note(" # Preferred: export PROVER_RPC_URL="); + } + (false, false) => { + display.note(" export PROVER_RPC_URL="); + } + } + + if reward_address_set { + display.item_colored(" REWARD_ADDRESS", "✓ Already set", "green"); + } else { + display.warning(" REWARD_ADDRESS env variable is not set."); + display.note(" This is required in order to receive ZK mining rewards for your work."); + display.note(" (This does not effect the ETH market fees you receive from fulfilling proving requests."); + display.note(" Learn more: https://docs.boundless.network/zkc/mining/overview"); + display.note(""); + display.note(" Option 1: export REWARD_ADDRESS="); + display.note(" Option 2: Set POVW_LOG_ID in compose.yml to your reward address"); + } + + display.note(""); + display.note(&format!( + "2. Ensure you have a minimum of {} ZKC collateral in your prover address:", + config.max_collateral + )); + display.note(" boundless prover balance-collateral"); + + display.note(""); + display.note("3. Start your prover:"); + display.note(" just prover up"); + + display.note(""); + display.note("4. Monitor your prover:"); + display.note(" just prover logs"); + + display.separator(); + display.note("For more information, visit:"); + display.note("https://docs.boundless.network/provers/broker"); + + Ok(()) + } +} + +// CPU thread detection +fn detect_cpu_threads() -> Result { + Ok(num_cpus::get()) +} + +// GPU detection +fn detect_gpus() -> Result { + // Try to detect NVIDIA GPUs using nvidia-smi + let output = std::process::Command::new("nvidia-smi").arg("--list-gpus").output(); + + match output { + Ok(output) if output.status.success() => { + let stdout = String::from_utf8_lossy(&output.stdout); + let count = stdout.lines().filter(|line| line.contains("GPU")).count(); + Ok(count) + } + _ => bail!("Could not detect GPUs automatically using `nvidia-smi --list-gpus`"), + } +} + +// Bento health check +async fn check_bento_health(bento_url: &str) -> Result<()> { + let url = Url::parse(bento_url).context("Invalid Bento URL")?; + let health_url = url.join("health").context("Failed to construct health check URL")?; + + reqwest::get(health_url.clone()) + .await + .with_context(|| format!("Failed to connect to Bento at {}", health_url))? + .error_for_status() + .context("Bento health check returned error status")?; + + Ok(()) +} diff --git a/crates/broker/src/order_monitor.rs b/crates/broker/src/order_monitor.rs index 255800675..68f6abf96 100644 --- a/crates/broker/src/order_monitor.rs +++ b/crates/broker/src/order_monitor.rs @@ -470,7 +470,11 @@ where self.skip_order(&order, "expired").await; } else if is_target_time_reached(&order, current_block_timestamp) { if self.market.is_fulfilled(order.request.id).await? { - tracing::info!("Order already fulfilled by another prover, skipping 0x{:x}", order.request.id); + tracing::debug!( + "Lock expiry timeout occurred, but 0x{:x} was already fulfilled by another prover. Skipping.", + order.request.id + ); + self.skip_order(&order, "was fulfilled by other").await; } else { tracing::info!("Request 0x{:x} was locked by another prover but expired unfulfilled, setting status to pending proving", order.request.id); candidate_orders.push(order); diff --git a/crates/broker/src/submitter.rs b/crates/broker/src/submitter.rs index bee100a1f..2b199fb49 100644 --- a/crates/broker/src/submitter.rs +++ b/crates/broker/src/submitter.rs @@ -585,14 +585,22 @@ where ); return Ok(()); } - Err(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailedUnknownError)) => { - tracing::warn!("Payment requirement failed for one or more orders, will not retry"); - errors.push(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailedUnknownError)); + Err(SubmitterErr::MarketError( + MarketError::PaymentRequirementsFailedUnknownError, + )) => { + tracing::warn!( + "Payment requirement failed for one or more orders, will not retry" + ); + errors.push(SubmitterErr::MarketError( + MarketError::PaymentRequirementsFailedUnknownError, + )); break; } Err(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailed(err))) => { tracing::warn!("Payment requirement failed for one or more orders: {err:?}, will not retry"); - errors.push(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailed(err))); + errors.push(SubmitterErr::MarketError(MarketError::PaymentRequirementsFailed( + err, + ))); break; } Err(err) => { From 8970a6c0a422aa6442db62d8c7d3665d96ff1bb0 Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 21 Nov 2025 15:52:51 -0500 Subject: [PATCH 04/11] fmt --- .../src/contracts/boundless_market.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/boundless-market/src/contracts/boundless_market.rs b/crates/boundless-market/src/contracts/boundless_market.rs index 34edef26f..6d184cf66 100644 --- a/crates/boundless-market/src/contracts/boundless_market.rs +++ b/crates/boundless-market/src/contracts/boundless_market.rs @@ -33,15 +33,15 @@ use anyhow::{anyhow, Context, Result}; use risc0_ethereum_contracts::event_query::EventQueryConfig; use thiserror::Error; -use crate::{ - contracts::token::{IERC20Permit, IHitPoints::IHitPointsErrors, Permit, IERC20}, - deployments::collateral_token_supports_permit, -}; use super::{ eip712_domain, AssessorReceipt, EIP712DomainSaltless, Fulfillment, - IBoundlessMarket::{self, IBoundlessMarketInstance, ProofDelivered, IBoundlessMarketErrors}, + IBoundlessMarket::{self, IBoundlessMarketErrors, IBoundlessMarketInstance, ProofDelivered}, Offer, ProofRequest, RequestError, RequestId, RequestStatus, TxnErr, TXN_CONFIRM_TIMEOUT, }; +use crate::{ + contracts::token::{IERC20Permit, IHitPoints::IHitPointsErrors, Permit, IERC20}, + deployments::collateral_token_supports_permit, +}; /// Fraction of collateral the protocol gives to the prover who fills an order that was locked by another prover but expired /// This is determined by the constant SLASHING_BURN_BPS defined in the BoundlessMarket contract. @@ -220,7 +220,7 @@ fn validate_fulfill_receipt(receipt: TransactionReceipt) -> Result<(), MarketErr if let Some(log) = receipt.decoded_log::() { match IBoundlessMarketErrors::abi_decode(&**log.error) { Ok(err) => Err(MarketError::PaymentRequirementsFailed(err)), - Err(_) => Err(MarketError::PaymentRequirementsFailedUnknownError) + Err(_) => Err(MarketError::PaymentRequirementsFailedUnknownError), } } else { Ok(()) From 9e3f1a2bf10b816f3cc2de9eb68a833e3cc99ace Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 21 Nov 2025 16:20:02 -0500 Subject: [PATCH 05/11] remove re-added backup file --- .../commands/prover/generate_config.rs.backup | 1473 ----------------- 1 file changed, 1473 deletions(-) delete mode 100644 crates/boundless-cli/src/commands/prover/generate_config.rs.backup diff --git a/crates/boundless-cli/src/commands/prover/generate_config.rs.backup b/crates/boundless-cli/src/commands/prover/generate_config.rs.backup deleted file mode 100644 index 1e135f351..000000000 --- a/crates/boundless-cli/src/commands/prover/generate_config.rs.backup +++ /dev/null @@ -1,1473 +0,0 @@ -// Copyright 2025 Boundless Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::path::{Path, PathBuf}; - -use alloy::primitives::{Address, U256}; -use alloy::providers::Provider; -use anyhow::{bail, Context, Result}; -use chrono::Utc; -use clap::Args; -use inquire::{Confirm, Select, Text}; -use rand::Rng; -use url::Url; - -use super::benchmark::ProverBenchmark; -use crate::config::{GlobalConfig, ProverConfig, ProvingBackendConfig}; -use crate::config_file::Config; -use crate::display::{obscure_url, DisplayManager}; -use boundless_market::client::ClientBuilder; -use boundless_market::contracts::{RequestId, RequestInput, RequestInputType}; -use boundless_market::GuestEnv; -use risc0_zkvm::serde::from_slice; - -// Priority requestor addresses for market pricing analysis -const OG_OFFCHAIN_REQUESTOR: &str = "0xc197ebe12c7bcf1d9f3b415342bdbc795425335c"; -const OG_ONCHAIN_REQUESTOR: &str = "0xe198c6944cae382902a375b0b8673084270a7f8e"; -const SIGNAL_REQUESTOR: &str = "0x734df7809c4ef94da037449c287166d114503198"; - -// Cycle count range for signal requestor (50B to 54B cycles) -const SIGNAL_REQUESTOR_MIN_CYCLES: u64 = 50_000_000_000; -const SIGNAL_REQUESTOR_MAX_CYCLES: u64 = 54_000_000_000; - -// Number of blocks to query for market pricing analysis -const MARKET_PRICE_BLOCKS_TO_QUERY: u64 = 30000; - -// Chunk size for querying events to avoid RPC limits -const EVENT_QUERY_CHUNK_SIZE: u64 = 500; - -// Priority requestor list URLs -const PRIORITY_REQUESTOR_LIST_STANDARD: &str = - "https://requestors.boundless.network/boundless-recommended-priority-list.standard.json"; -const PRIORITY_REQUESTOR_LIST_LARGE: &str = - "https://requestors.boundless.network/boundless-recommended-priority-list.large.json"; - -// Peak performance threshold for enabling large requestor list (kHz) -const LARGE_REQUESTOR_LIST_THRESHOLD_KHZ: f64 = 4000.0; - -// Default minimum price per mega-cycle in collateral token (ZKC) for fulfilling -// orders locked by other provers that exceeded their lock timeout -const DEFAULT_MIN_MCYCLE_PRICE_COLLATERAL_TOKEN: &str = "0.0005"; - -mod selection_strings { - // Benchmark performance options - pub const BENCHMARK_RUN_SUITE: &str = - "Run the Boundless benchmark suite (requires a Bento instance running)"; - pub const BENCHMARK_MANUAL_ENTRY: &str = "Manually set peak performance (in kHz)"; - - // File handling strategy options - pub const FILE_MODIFY_EXISTING: &str = "Modify existing"; - pub const FILE_GENERATE_NEW: &str = "Generate new"; - pub const FILE_CANCEL: &str = "Cancel"; -} - -#[derive(Debug, Clone)] -struct MarketPricing { - median: f64, - percentile_25: f64, - sample_size: usize, -} - -/// Generate optimized broker.toml and compose.yml configuration files -#[derive(Args, Clone, Debug)] -pub struct ProverGenerateConfig { - /// Path to output broker.toml file - #[clap(long, default_value = "./broker.toml")] - pub broker_toml_file: PathBuf, - - /// Path to output compose.yml file - #[clap(long, default_value = "./compose.yml")] - pub compose_yml_file: PathBuf, - - /// Skip creating backups of existing files - #[clap(long)] - pub skip_backup: bool, -} - -#[derive(Debug)] -#[allow(dead_code)] -struct WizardConfig { - num_threads: usize, - num_gpus: usize, - max_exec_agents: usize, - max_concurrent_preflights: usize, - max_concurrent_proofs: usize, - peak_prove_khz: f64, - priority_requestor_lists: Vec, - max_collateral: String, - min_mcycle_price: String, - min_mcycle_price_collateral_token: String, -} - -#[derive(Debug, Clone, Copy)] -enum FileHandlingStrategy { - ModifyExisting, - GenerateNew, -} - -impl ProverGenerateConfig { - /// Run the generate-config command - pub async fn run(&self, global_config: &GlobalConfig) -> Result<()> { - let display = DisplayManager::new(); - - display.header("Boundless Prover Configuration Wizard"); - display.note("This wizard helps you create Broker and Bento configuration files,"); - display.note("customized for your prover setup, that allow you to compete in the"); - display.note("market and earn rewards."); - display.separator(); - - // Check file handling strategy - let broker_strategy = - self.ask_file_handling_strategy(&self.broker_toml_file, "broker.toml", &display)?; - let compose_strategy = - self.ask_file_handling_strategy(&self.compose_yml_file, "compose.yml", &display)?; - - display.separator(); - - // Run wizard to collect configuration - let config = self.run_wizard(&display, global_config).await?; - - display.separator(); - display.header("Generating Configuration Files"); - - // Backup and generate broker.toml - if let Some(backup_path) = self.backup_file(&self.broker_toml_file)? { - display.item_colored("Backup saved", backup_path.display(), "cyan"); - } - self.generate_broker_toml(&config, broker_strategy, &display)?; - display.item_colored("Created", self.broker_toml_file.display(), "green"); - - // Backup and generate compose.yml - if let Some(backup_path) = self.backup_file(&self.compose_yml_file)? { - display.item_colored("Backup saved", backup_path.display(), "cyan"); - } - self.generate_compose_yml(&config, compose_strategy)?; - display.item_colored("Created", self.compose_yml_file.display(), "green"); - - display.separator(); - self.show_success_message(&config, &display)?; - - Ok(()) - } - - async fn run_wizard( - &self, - display: &DisplayManager, - global_config: &GlobalConfig, - ) -> Result { - // Step 1: Machine configuration - display.step(1, 7, "Machine Configuration"); - - let run_on_single_machine = - Confirm::new("Do you plan to run your prover entirely on your current machine?") - .with_default(true) - .with_help_message("This wizard is optimized for single-machine setups") - .prompt() - .context("Failed to get user input")?; - - if !run_on_single_machine { - display.note("⚠ This wizard is optimized for single-machine setups."); - display.note(" Cluster setups may require additional manual configuration."); - display.note(&format!( - " Please refer to our documentation: {}", - "https://docs.boundless.network/provers/broker" - )); - - let continue_anyway = Confirm::new("Continue with configuration anyway?") - .with_default(false) - .with_help_message("Generated config may need manual adjustments for clusters") - .prompt() - .context("Failed to get confirmation")?; - - if !continue_anyway { - display.note("Configuration cancelled."); - bail!("Configuration cancelled by user"); - } - } - - let detected_threads = detect_cpu_threads()?; - display.item_colored("Detected", format!("{} CPU threads", detected_threads), "cyan"); - - let input = Text::new("How many CPU threads do you want to use?") - .with_default(&detected_threads.to_string()) - .prompt() - .context("Failed to get CPU thread count")?; - let num_threads = input.parse::().context("Invalid number format")?; - display.item_colored("Using", format!("{} CPU threads", num_threads), "green"); - - // Step 2: GPU configuration - display.separator(); - display.step(2, 7, "GPU Configuration"); - - let num_gpus = match detect_gpus() { - Ok(count) if count > 0 => { - display.item_colored("Detected", format!("{} GPU(s)", count), "cyan"); - count - } - _ => { - let input = Text::new("How many GPUs do you have?") - .with_default("1") - .with_help_message("Enter 0 if you don't have any GPUs") - .prompt() - .context("Failed to get GPU count")?; - input.parse::().context("Invalid number format")? - } - }; - - // Step 3: Calculated Configuration - display.separator(); - display.step(3, 7, "Calculated Configuration"); - - display.note("The following values are calculated based on your hardware:"); - display.note(""); - - let max_exec_agents = (num_threads.saturating_sub(4).saturating_sub(num_gpus * 2)) / 2; - display.note(" Formula: max_exec_agents ="); - display.note(" ("); - display.note(&format!(" {} threads", num_threads)); - display.note(" - 1 # reserve for postgres"); - display.note(" - 1 # reserve for redis"); - display.note(" - 1 # reserve for minio"); - display.note(&format!(" - {} GPUs × 2 # reserve two threads per GPU", num_gpus)); - display.note(" )"); - display.note(" / 2 # 2 threads per exec agent"); - display.item_colored(" Result", format!("{} exec agents", max_exec_agents), "cyan"); - display.note(""); - - let max_concurrent_preflights = max_exec_agents.saturating_sub(2).max(1); - display.note(" Formula: max_concurrent_preflights ="); - display.note(" ("); - display.note(&format!(" {} exec agents", max_exec_agents)); - display.note(" - 1 # reserve for proofs"); - display.note(" - 1 # reserve for mining"); - display.note(" )"); - display.item_colored( - " Result", - format!("{} concurrent preflights", max_concurrent_preflights), - "cyan", - ); - display.note(""); - - let max_concurrent_proofs = 1; - display.note(" Formula: max_concurrent_proofs = 1 (fixed)"); - display.item_colored( - " Result", - format!("{} concurrent proof", max_concurrent_proofs), - "cyan", - ); - - // Step 4: Performance Benchmarking - display.separator(); - display.step(4, 7, "Performance Benchmarking"); - - let peak_prove_khz = self.get_peak_performance(display, global_config).await?.floor(); - display.item_colored( - "Setting `peak_prove_khz` to", - format!("{:.0} kHz", peak_prove_khz), - "green", - ); - - // Step 5: Priority Requestor Lists - display.separator(); - display.step(5, 7, "Priority Requestor Lists"); - - display.note("Requestor priority lists specify proof requestors that the broker should"); - display - .note("prioritize for proving. Requestors on these lists are considered more likely"); - display - .note("to request useful work with profitable pricing, and thus are prioritized over"); - display.note("other requestors."); - display.note(""); - display.note("Boundless Networks maintains recommended requestor lists:"); - display.note(" • Standard list: For all provers (general workloads)"); - display.note(&format!( - " • Large list: For provers >{:.0} kHz (includes high-cycle orders)", - LARGE_REQUESTOR_LIST_THRESHOLD_KHZ - )); - display.note(""); - - let priority_requestor_lists = if peak_prove_khz > LARGE_REQUESTOR_LIST_THRESHOLD_KHZ { - display.note(&format!( - "Your cluster's peak performance of {:.0} kHz qualifies for large orders.", - peak_prove_khz - )); - display.note("We recommend enabling both the standard and large requestor lists."); - vec![ - PRIORITY_REQUESTOR_LIST_STANDARD.to_string(), - PRIORITY_REQUESTOR_LIST_LARGE.to_string(), - ] - } else { - display.note(&format!( - "Your cluster's peak performance of {:.0} kHz is suitable for standard orders.", - peak_prove_khz - )); - display.note("We recommend enabling the standard requestor list."); - vec![PRIORITY_REQUESTOR_LIST_STANDARD.to_string()] - }; - - for list in &priority_requestor_lists { - display.item_colored(" List", list, "cyan"); - } - - // Step 6: Collateral Configuration - display.separator(); - display.step(6, 7, "Collateral Configuration"); - - let recommended_collateral = if priority_requestor_lists.len() > 1 { "200" } else { "50" }; - - display.note(&format!( - "We recommend a max collateral of {} ZKC for your configuration.", - recommended_collateral - )); - display.note(" • 50 ZKC: Recommended for standard requestor list (lower risk)"); - display.note(" • 200 ZKC: Recommended for standard + large lists (higher rewards, higher risk)"); - display.note(""); - display.note("Higher collateral enables higher-reward orders but increases slashing risks."); - - let max_collateral = Text::new("Max collateral (ZKC):") - .with_default(recommended_collateral) - .with_help_message("Press Enter to use recommended value") - .prompt() - .context("Failed to get max collateral")?; - - display.item_colored("Max collateral", format!("{} ZKC", max_collateral), "green"); - - // Step 7: Pricing Configuration - display.separator(); - display.step(7, 7, "Pricing Configuration"); - - display.note("Analyzing recent market prices to determine competitive pricing..."); - display.note(""); - - // Get RPC URL for market query - let rpc_url = self.get_or_prompt_rpc_url(display)?; - - // Validate chain ID to ensure it's Base Mainnet - display.status("Status", "Validating RPC connection", "yellow"); - let temp_provider = alloy::providers::ProviderBuilder::new() - .connect(rpc_url.as_ref()) - .await - .context("Failed to connect to RPC provider")?; - - let chain_id = temp_provider - .get_chain_id() - .await - .context("Failed to query chain ID from RPC provider")?; - - if chain_id == 1 { - display.note("⚠ Detected Ethereum Mainnet (Chain ID: 1)"); - display.note(" Boundless Market is deployed on Base Mainnet (Chain ID: 8453)"); - display.note(" Please use a Base Mainnet RPC URL instead."); - bail!("Incorrect chain detected: Ethereum Mainnet. Base Mainnet required."); - } - - if chain_id != 8453 { - display.note(&format!("⚠ Detected Chain ID: {}", chain_id)); - display.note(" Boundless Market is deployed on Base Mainnet (Chain ID: 8453)"); - let continue_anyway = Confirm::new("Continue with this RPC anyway?") - .with_default(false) - .prompt() - .context("Failed to get confirmation")?; - if !continue_anyway { - bail!("Incorrect chain detected"); - } - } - - // Query market pricing with fallback to defaults - let market_pricing = match self.query_market_pricing(&rpc_url, display, global_config).await - { - Ok(pricing) => { - display.item_colored( - "Market analysis", - format!("{} orders analyzed", pricing.sample_size), - "green", - ); - display.item_colored( - "Median price", - format!( - "{:.10} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", - pricing.median, - pricing.median * 1e9, - pricing.median * 1e12 - ), - "cyan", - ); - display.item_colored( - "25th percentile", - format!( - "{:.10} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", - pricing.percentile_25, - pricing.percentile_25 * 1e9, - pricing.percentile_25 * 1e12 - ), - "cyan", - ); - Some(pricing) - } - Err(e) => { - display.note(&format!("⚠ Failed to query market prices: {}", e)); - display.note("Falling back to default pricing"); - None - } - }; - - // Prompt user to accept or override - let min_mcycle_price = if let Some(pricing) = market_pricing { - display.note(""); - display.note(&format!( - "Recommended minimum price: {:.10} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", - pricing.percentile_25, - pricing.percentile_25 * 1e9, - pricing.percentile_25 * 1e12 - )); - display.note(""); - display - .note("This value is computed based on recent market prices. It ensures you are"); - display - .note("priced competitively such that you will be able to lock and fulfill orders"); - display.note("for ETH rewards in the market."); - display.note(""); - - Text::new("Press Enter to accept or enter custom price:") - .with_default(&format!("{:.10}", pricing.percentile_25)) - .with_help_message("You can update this later in broker.toml") - .prompt() - .context("Failed to get price")? - } else { - // Fallback to manual entry if query failed - Text::new("Minimum price per megacycle (ETH):") - .with_default("0.00000001") - .with_help_message("You can update this later in broker.toml") - .prompt() - .context("Failed to get price")? - }; - - // Collateral token pricing - display.separator(); - display.note(""); - display.note("Collateral Token Pricing (Proof Races):"); - display.note(""); - display.note("When another prover fails to fulfill their locked order within the timeout,"); - display.note("they are slashed. A portion of their collateral (in ZKC) becomes available"); - display.note("as a reward for any prover who can fulfill that order in a 'proof race'."); - display.note(""); - display.note("The setting below controls the minimum ZKC reward your broker will accept"); - display.note("to participate in these proof races (competing to fulfill slashed orders)."); - display.note(""); - display.note("Example: If set to 0.0005 ZKC/Mcycle, a 1000 Mcycle slashed order must"); - display.note(" offer at least 0.5 ZKC reward for your broker to compete for it."); - display.note(""); - display.note(&format!( - "Default minimum price: {} ZKC/Mcycle", - DEFAULT_MIN_MCYCLE_PRICE_COLLATERAL_TOKEN - )); - display.note(""); - - let min_mcycle_price_collateral_token = - Text::new("Minimum price per Mcycle (in ZKC collateral rewards):") - .with_default(DEFAULT_MIN_MCYCLE_PRICE_COLLATERAL_TOKEN) - .with_help_message("You can update this later in broker.toml") - .prompt() - .context("Failed to get collateral token price")?; - - display.item_colored( - "Collateral price", - format!("{} ZKC/Mcycle", min_mcycle_price_collateral_token), - "green", - ); - - let price_f64 = min_mcycle_price.parse::().unwrap_or(0.0); - display.item_colored( - "Min price", - format!( - "{} ETH/Mcycle ({} Gwei/Mcycle, {:.0} wei/cycle)", - min_mcycle_price, - price_f64 * 1e9, - price_f64 * 1e12 - ), - "green", - ); - - Ok(WizardConfig { - num_threads, - num_gpus, - max_exec_agents, - max_concurrent_preflights, - max_concurrent_proofs, - peak_prove_khz, - priority_requestor_lists, - max_collateral, - min_mcycle_price, - min_mcycle_price_collateral_token, - }) - } - - fn try_extract_cycle_count(input: &RequestInput) -> Option { - // Check if inline input - if input.inputType != RequestInputType::Inline { - tracing::debug!("Skipping URL-based input for cycle count extraction"); - return None; - } - - // Decode GuestEnv - match GuestEnv::decode(&input.data) { - Ok(guest_env) => { - // Convert stdin bytes to u32 words for risc0 deserialization - // risc0 serde uses u32 words, need to convert from bytes - match bytemuck::try_cast_slice::(&guest_env.stdin) { - Ok(words) => { - // Decode first u64 from stdin words - match from_slice::(words) { - Ok(cycle_count) => { - tracing::trace!( - "Successfully decoded cycle count: {}", - cycle_count - ); - Some(cycle_count) - } - Err(e) => { - tracing::debug!("Failed to decode cycle count from stdin: {}", e); - None - } - } - } - Err(e) => { - tracing::debug!("Failed to convert stdin bytes to u32 words: {}", e); - None - } - } - } - Err(e) => { - tracing::debug!("Failed to decode GuestEnv: {}", e); - None - } - } - } - - // TODO: Migrate to getting market price from the indexer API once available. - async fn query_market_pricing( - &self, - rpc_url: &Url, - display: &DisplayManager, - global_config: &GlobalConfig, - ) -> Result { - display.status("Status", "Querying recent market prices", "yellow"); - display.note("This may take a moment..."); - - // Priority requestors to filter for - let priority_requestors: Vec
= vec![ - OG_OFFCHAIN_REQUESTOR.parse()?, - OG_ONCHAIN_REQUESTOR.parse()?, - SIGNAL_REQUESTOR.parse()?, - ]; - - // Build market client - let timeout = global_config.tx_timeout.unwrap_or(std::time::Duration::from_secs(300)); - let client = ClientBuilder::new() - .with_rpc_url(rpc_url.clone()) - .with_timeout(timeout) - .build() - .await - .context("Failed to create market client")?; - - // Get current block and calculate range - let current_block = client.provider().get_block_number().await?; - let start_block = current_block.saturating_sub(MARKET_PRICE_BLOCKS_TO_QUERY); - - display.note(&format!( - "Querying market prices from block {} to {} in chunks of {}", - start_block, current_block, EVENT_QUERY_CHUNK_SIZE - )); - - // Query RequestLocked events in chunks - let mut locked_logs = Vec::new(); - let mut chunk_start = start_block; - while chunk_start < current_block { - let chunk_end = (chunk_start + EVENT_QUERY_CHUNK_SIZE).min(current_block); - - let locked_filter = client - .boundless_market - .instance() - .RequestLocked_filter() - .from_block(chunk_start) - .to_block(chunk_end); - - let mut chunk_logs = - locked_filter.query().await.context("Failed to query RequestLocked events")?; - - locked_logs.append(&mut chunk_logs); - chunk_start = chunk_end + 1; - } - - display.note(&format!("Found {} locked orders", locked_logs.len())); - - // Query RequestFulfilled events in chunks - let mut fulfilled_logs = Vec::new(); - let mut chunk_start = start_block; - while chunk_start < current_block { - let chunk_end = (chunk_start + EVENT_QUERY_CHUNK_SIZE).min(current_block); - - let fulfilled_filter = client - .boundless_market - .instance() - .RequestFulfilled_filter() - .from_block(chunk_start) - .to_block(chunk_end); - - let mut chunk_logs = fulfilled_filter - .query() - .await - .context("Failed to query RequestFulfilled events")?; - - fulfilled_logs.append(&mut chunk_logs); - chunk_start = chunk_end + 1; - } - - display.note(&format!("Found {} fulfilled orders", fulfilled_logs.len())); - - // Build map of fulfilled requests with their block timestamps - let mut fulfilled_map: HashMap = HashMap::new(); - for (event, log_meta) in &fulfilled_logs { - let request_id: U256 = event.requestId; - if let Some(block_timestamp) = log_meta.block_timestamp { - fulfilled_map.insert(request_id, block_timestamp); - } - } - - // Process locked orders - let mut prices_per_mcycle: Vec = Vec::new(); - - for (event, log_meta) in &locked_logs { - let request_id: U256 = event.requestId; - let requestor = RequestId::from_lossy(event.request.id).addr; - - // Filter: only priority requestors - if !priority_requestors.contains(&requestor) { - continue; - } - - // Filter: only fulfilled orders - let fulfilled_timestamp = match fulfilled_map.get(&request_id) { - Some(×tamp) => timestamp, - None => continue, - }; - - // Get block timestamp for locked order from log metadata - let lock_timestamp = match log_meta.block_timestamp { - Some(ts) => ts, - None => continue, - }; - - // Calculate lock deadline - let lock_deadline = event.request.offer.lock_deadline(); - - // Filter: only orders fulfilled before lockExpiry - if fulfilled_timestamp > lock_deadline { - continue; - } - - // Calculate price at lock time using Offer.price_at - let locked_price = match event.request.offer.price_at(lock_timestamp) { - Ok(price) => price, - Err(_) => { - bail!( - "Failed to calculate price at lock time for request ID 0x{:x}", - request_id - ); - } - }; - - // Extract or estimate cycle count - let cycles = if requestor == SIGNAL_REQUESTOR.parse::
()? { - let mut rng = rand::rng(); - let random_cycles = - rng.random_range(SIGNAL_REQUESTOR_MIN_CYCLES..=SIGNAL_REQUESTOR_MAX_CYCLES); - tracing::trace!( - "Signal requestor detected, using random cycle count between {}B and {}B: {}", - SIGNAL_REQUESTOR_MIN_CYCLES / 1_000_000_000, - SIGNAL_REQUESTOR_MAX_CYCLES / 1_000_000_000, - random_cycles - ); - random_cycles - } else { - match Self::try_extract_cycle_count(&event.request.input) { - Some(cycles) => { - tracing::debug!( - "Using decoded cycle count: {} for request ID 0x{:x}", - cycles, - request_id - ); - cycles - } - None => { - bail!("Failed to extract cycle count from order generator request input for request ID 0x{:x}", request_id); - } - } - }; - - // Calculate price per megacycle - // locked_price is in wei, need to convert to eth per mcycle - if locked_price > U256::ZERO { - let price_f64 = locked_price.to_string().parse::().unwrap_or(0.0); - let price_per_cycle = price_f64 / cycles as f64; - let price_per_mcycle = price_per_cycle * 1_000_000.0; - - // Convert from wei to eth - let price_per_mcycle_eth = price_per_mcycle / 1e18; - - prices_per_mcycle.push(price_per_mcycle_eth); - // Gwei per mcycle - let price_per_mcycle_gwei = price_per_mcycle / 1e9; - let locked_price_eth = price_f64 / 1e18; - tracing::debug!("Added price per mcycle: {} ETH/Mcycle ({:.2} Gwei/Mcycle, cycles: {}, locked_price: {} wei ({} eth) for request ID 0x{:x}", price_per_mcycle_eth, price_per_mcycle_gwei, cycles, locked_price, locked_price_eth, request_id); - } - } - - if prices_per_mcycle.is_empty() { - bail!("No valid market data found for pricing analysis"); - } - - let sample_size = prices_per_mcycle.len(); - display.note(&format!("Analyzed {} qualifying orders", sample_size)); - - // Sort prices for percentile calculations - prices_per_mcycle.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); - - // Calculate median (50th percentile) - let median_price = if prices_per_mcycle.len() % 2 == 0 { - let mid = prices_per_mcycle.len() / 2; - (prices_per_mcycle[mid - 1] + prices_per_mcycle[mid]) / 2.0 - } else { - prices_per_mcycle[prices_per_mcycle.len() / 2] - }; - - // Calculate 25th percentile - let percentile_25_idx = ((prices_per_mcycle.len() as f64) * 0.25) as usize; - let percentile_25 = prices_per_mcycle[percentile_25_idx.min(prices_per_mcycle.len() - 1)]; - - Ok(MarketPricing { median: median_price, percentile_25, sample_size }) - } - - async fn get_peak_performance( - &self, - display: &DisplayManager, - global_config: &GlobalConfig, - ) -> Result { - display.note("Configuration requires an estimate of the peak performance of your proving"); - display.note("cluster."); - display.note(""); - display.note("Boundless provides a benchmarking suite for estimating your cluster's"); - display.note("peak performance."); - display.warning("The benchmark suite requires access to a running Bento proving cluster."); - display.note("See https://docs.boundless.network/provers/quick-start#running-a-test-proof for information on how to run Bento."); - display.note(""); - - let choice = Select::new( - "How would you like to set the peak performance?", - vec![ - selection_strings::BENCHMARK_RUN_SUITE, - selection_strings::BENCHMARK_MANUAL_ENTRY, - ], - ) - .prompt() - .context("Failed to get benchmark choice")?; - - match choice { - selection_strings::BENCHMARK_RUN_SUITE => { - // Get RPC URL before running benchmark - display.separator(); - display.status("Status", "Checking RPC configuration", "yellow"); - let rpc_url = self.get_or_prompt_rpc_url(display)?; - - // Try to detect Bento at localhost - let default_bento_url = "http://localhost:8081"; - let bento_available = check_bento_health(default_bento_url).await.is_ok(); - - if bento_available { - display.item_colored("Bento", "Detected at http://localhost:8081", "green"); - - let use_detected = Confirm::new("Use this Bento instance for benchmarking?") - .with_default(true) - .prompt() - .context("Failed to get confirmation")?; - - if use_detected { - display.status("Status", "Running benchmark", "yellow"); - display.note("This may take several minutes..."); - - match self.run_benchmark(default_bento_url, &rpc_url, global_config).await { - Ok(khz) => { - let adjusted_khz = khz * 0.75; - display.item_colored( - "Benchmark result", - format!("{:.2} kHz", khz), - "cyan", - ); - display.item_colored( - "Adjusted (75%)", - format!("{:.2} kHz", adjusted_khz), - "cyan", - ); - return Ok(adjusted_khz); - } - Err(e) => { - display.note(&format!("⚠ Benchmark failed: {}", e)); - display.note("Falling back to manual input..."); - } - } - } - } - - // If not detected or user chose not to use detected, ask for custom URL - if !bento_available { - display.note("⚠ Bento not detected at http://localhost:8081"); - } - - let provide_url = - Confirm::new("Do you have a Bento instance running at a different URL?") - .with_default(false) - .prompt() - .context("Failed to get URL confirmation")?; - - if provide_url { - let bento_url = Text::new("What is your Bento URL?") - .with_help_message("e.g., http://your-server:8081") - .prompt() - .context("Failed to get Bento URL")?; - - if check_bento_health(&bento_url).await.is_ok() { - display.status("Status", "Running benchmark", "yellow"); - display.note("This may take several minutes..."); - - match self.run_benchmark(&bento_url, &rpc_url, global_config).await { - Ok(khz) => { - let adjusted_khz = khz * 0.75; - display.item_colored( - "Benchmark result", - format!("{:.2} kHz", khz), - "cyan", - ); - display.item_colored( - "Adjusted (75%)", - format!("{:.2} kHz", adjusted_khz), - "cyan", - ); - return Ok(adjusted_khz); - } - Err(e) => { - display.note(&format!("⚠ Benchmark failed: {}", e)); - display.note("Falling back to manual input..."); - } - } - } else { - display.note(&format!("⚠ Could not connect to Bento at {}", bento_url)); - display.note("Falling back to manual input..."); - } - } - - // Fall through to manual input - let khz_str = Text::new("Peak performance (kHz):") - .with_default("100") - .with_help_message("You can update this later in broker.toml") - .prompt() - .context("Failed to get peak performance")?; - - khz_str.parse::().context("Invalid performance value") - } - selection_strings::BENCHMARK_MANUAL_ENTRY => { - let khz_str = Text::new("Peak performance (kHz):") - .with_default("100") - .with_help_message("You can update this later in broker.toml") - .prompt() - .context("Failed to get peak performance")?; - - khz_str.parse::().context("Invalid performance value") - } - _ => unreachable!(), - } - } - - async fn run_benchmark( - &self, - bento_url: &str, - rpc_url: &Url, - global_config: &GlobalConfig, - ) -> Result { - // Use the hardcoded test request ID for benchmarking - // TODO: use a representative request ID for benchmarking. This is a OG order of ~500M cycles. - let request_id = "0xc197ebe12c7bcf1d9f3b415342bdbc795425335c01379fa6" - .parse::() - .context("Failed to parse request ID")?; - - // Create the benchmark command with proper configuration - let benchmark = ProverBenchmark { - request_ids: vec![request_id], - prover_config: ProverConfig { - prover_rpc_url: Some(rpc_url.clone()), - private_key: None, - prover_address: None, - deployment: None, - proving_backend: ProvingBackendConfig { - bento_api_url: bento_url.to_string(), - bento_api_key: None, - use_default_prover: false, - skip_health_check: false, - }, - }, - }; - - // Execute the benchmark and return the worst KHz value - benchmark.run(global_config).await - } - - fn get_or_prompt_rpc_url(&self, display: &DisplayManager) -> Result { - // Try to load existing prover configuration - if let Ok(config) = Config::load() { - if let Some(_prover_config) = config.prover { - // Try to load the full ProverConfig with RPC URL from environment/secrets - let full_config = ProverConfig { - prover_rpc_url: None, - private_key: None, - prover_address: None, - deployment: None, - proving_backend: ProvingBackendConfig { - bento_api_url: "http://localhost:8081".to_string(), - bento_api_key: None, - use_default_prover: false, - skip_health_check: true, - }, - }; - - if let Ok(loaded_config) = full_config.load_from_files() { - if let Some(rpc_url) = loaded_config.prover_rpc_url { - display.item_colored("RPC URL", obscure_url(rpc_url.as_ref()), "green"); - return Ok(rpc_url); - } - } - } - } - - // Check environment variable - if let Ok(rpc_url) = std::env::var("PROVER_RPC_URL") { - let url = - rpc_url.parse::().context("Invalid PROVER_RPC_URL environment variable")?; - display.item_colored("RPC URL", obscure_url(url.as_ref()), "green"); - return Ok(url); - } - - // No RPC URL found, prompt user - display.note("⚠ No RPC URL configured for prover"); - display.note("An RPC URL is required to fetch benchmark request data from the"); - display.note("blockchain."); - display.note(""); - - let rpc_url = - Text::new("Enter Base Mainnet RPC URL:").prompt().context("Failed to get RPC URL")?; - - let url = rpc_url.parse::().context("Invalid RPC URL format")?; - - display.item_colored("RPC URL", obscure_url(url.as_ref()), "green"); - Ok(url) - } - - fn backup_file(&self, file_path: &Path) -> Result> { - // Skip if backup flag is set or file doesn't exist - if self.skip_backup || !file_path.exists() { - return Ok(None); - } - - // Create backup directory - let home = dirs::home_dir().context("Failed to get home directory")?; - let backup_dir = home.join(".boundless").join("backups"); - std::fs::create_dir_all(&backup_dir).context("Failed to create backup directory")?; - - // Create timestamped backup filename - let timestamp = Utc::now().format("%Y%m%d_%H%M%S"); - let filename = file_path.file_name().context("Invalid file path")?.to_string_lossy(); - let backup_filename = format!("{}.{}.bak", filename, timestamp); - let backup_path = backup_dir.join(backup_filename); - - // Copy file to backup location - std::fs::copy(file_path, &backup_path) - .with_context(|| format!("Failed to create backup at {}", backup_path.display()))?; - - Ok(Some(backup_path)) - } - - fn strip_tagged_section(content: &str, opening_tag: &str, closing_tag: &str) -> String { - if let Some(start) = content.find(opening_tag) { - if let Some(end) = content[start..].find(closing_tag) { - let mut section_end = start + end + closing_tag.len(); - - // Also skip the newline after the closing tag if present - if section_end < content.len() && content.as_bytes()[section_end] == b'\n' { - section_end += 1; - } - - let before = &content[..start]; - let after = &content[section_end..]; - - format!("{}{}", before, after) - } else { - content.to_string() - } - } else { - content.to_string() - } - } - - fn generate_broker_toml( - &self, - config: &WizardConfig, - strategy: FileHandlingStrategy, - display: &DisplayManager, - ) -> Result<()> { - // Load source (template or existing file) - let source = match strategy { - FileHandlingStrategy::ModifyExisting => std::fs::read_to_string(&self.broker_toml_file) - .context("Failed to read existing broker.toml")?, - FileHandlingStrategy::GenerateNew => { - include_str!("../../../../../broker-template.toml").to_string() - } - }; - - // Parse with toml_edit (preserves comments and formatting) - let mut doc = source.parse::().context("Failed to parse TOML")?; - - // Create CLI wizard metadata section with pretty tags - let metadata_section = format!( - "### [CLI Wizard Metadata] #####\n\ - # Generated by boundless-cli v{} on {}\n\ - # Hardware: {} threads, {} GPU(s)\n\ - # Peak performance: {:.0} kHz\n\ - ### [End] ###\n\n", - env!("CARGO_PKG_VERSION"), - Utc::now().format("%Y-%m-%d"), - config.num_threads, - config.num_gpus, - config.peak_prove_khz - ); - - // Get current prefix - let current_prefix = doc - .as_table() - .decor() - .prefix() - .map(|s| s.as_str().unwrap_or("").to_string()) - .unwrap_or_default(); - - // Strip disclaimer section (from template) - let cleaned_prefix = - Self::strip_tagged_section(¤t_prefix, "### [Disclaimer] ###", "### [End] ###"); - - // Strip any existing CLI wizard metadata (from previous runs) - let cleaned_prefix = Self::strip_tagged_section( - &cleaned_prefix, - "### [CLI Wizard Metadata] #####", - "### [End] ###", - ); - - // Add new metadata section - doc.as_table_mut() - .decor_mut() - .set_prefix(format!("{}{}", metadata_section, cleaned_prefix)); - - // Update market section - if let Some(market) = doc.get_mut("market").and_then(|v| v.as_table_mut()) { - // Update peak_prove_khz with calculation comment - if let Some(item) = market.get_mut("peak_prove_khz") { - let comment = - format!("\n# Calculated from benchmark: {:.2} kHz\n", config.peak_prove_khz); - if let Some(value) = item.as_value_mut() { - value.decor_mut().set_prefix(comment); - } - *item = toml_edit::value(config.peak_prove_khz as i64); - } - - // Update max_collateral - if let Some(item) = market.get_mut("max_collateral") { - *item = toml_edit::value(config.max_collateral.clone()); - } - - // Update max_concurrent_proofs with calculation comment - if let Some(item) = market.get_mut("max_concurrent_proofs") { - let comment = format!("\n# Set based on GPU count: {} GPU(s)\n", config.num_gpus); - if let Some(value) = item.as_value_mut() { - value.decor_mut().set_prefix(comment); - } - *item = toml_edit::value(config.max_concurrent_proofs as i64); - } - - // Update priority_requestor_lists - if let Some(item) = market.get_mut("priority_requestor_lists") { - let mut arr = toml_edit::Array::new(); - for list in &config.priority_requestor_lists { - arr.push(list.clone()); - } - *item = toml_edit::value(arr); - } - - // Update max_concurrent_preflights with calculation comment - if let Some(item) = market.get_mut("max_concurrent_preflights") { - let comment = format!( - "\n# Calculated:\n\ - # max_concurrent_preflights = (\n\ - # (\n\ - # {} threads\n\ - # - 1 # reserve for postgres\n\ - # - 1 # reserve for redis\n\ - # - 1 # reserve for minio\n\ - # - {} GPUs × 2 # reserve two threads per GPU\n\ - # )\n\ - # / 2 # 2 threads per exec agent\n\ - # - 1 # reserve for proofs\n\ - # - 1 # reserve for mining\n\ - # )\n\ - # = {}\n", - config.num_threads, config.num_gpus, config.max_concurrent_preflights - ); - if let Some(value) = item.as_value_mut() { - value.decor_mut().set_prefix(comment); - } - *item = toml_edit::value(config.max_concurrent_preflights as i64); - } - - // Update min_mcycle_price - if let Some(item) = market.get_mut("min_mcycle_price") { - let should_update = match strategy { - FileHandlingStrategy::ModifyExisting => { - // Get existing price and compare with recommended price - let existing_price_str = item.as_str().unwrap_or("0"); - let existing_price = existing_price_str.parse::().unwrap_or(0.0); - let recommended_price = - config.min_mcycle_price.parse::().unwrap_or(0.0); - - if existing_price <= recommended_price && existing_price > 0.0 { - // Existing price is already competitive, don't raise it - display.note(""); - display.note(&format!( - "Your min_mcycle_price is already priced competitively at {} ETH/Mcycle. Not modifying.", - existing_price_str - )); - display.note(""); - false - } else { - // Recommended price is lower (more competitive), update it - true - } - } - FileHandlingStrategy::GenerateNew => true, - }; - - if should_update { - *item = toml_edit::value(config.min_mcycle_price.clone()); - } - } - } - - // Write to file - std::fs::write(&self.broker_toml_file, doc.to_string()) - .context("Failed to write broker.toml")?; - - Ok(()) - } - - fn generate_compose_yml( - &self, - config: &WizardConfig, - strategy: FileHandlingStrategy, - ) -> Result<()> { - // We use string manipulation instead of YAML parsing libraries because - // compose.yml uses YAML anchors (&) and aliases (*) which are - // not preserved by most Rust YAML libraries (serde_yaml, etc.). - // This ensures all comments, formatting, and anchor definitions remain intact. - - // Load source (template or existing file) - let mut content = match strategy { - FileHandlingStrategy::ModifyExisting => std::fs::read_to_string(&self.compose_yml_file) - .context("Failed to read existing compose.yml")?, - FileHandlingStrategy::GenerateNew => { - include_str!("../../../../../compose.yml").to_string() - } - }; - - // Update exec_agent replicas - content = self.update_exec_agent_replicas(content, config.max_exec_agents)?; - - // Add additional GPU agents if needed - if config.num_gpus > 1 { - content = self.add_gpu_agents(content, config.num_gpus)?; - } - - // Write to file - std::fs::write(&self.compose_yml_file, content).context("Failed to write compose.yml")?; - - Ok(()) - } - - fn update_exec_agent_replicas(&self, content: String, replicas: usize) -> Result { - let lines: Vec<&str> = content.lines().collect(); - let mut result: Vec = Vec::new(); - let mut in_exec_agent = false; - let mut in_deploy = false; - - for line in lines { - let mut updated_line = line.to_string(); - - // Track if we're in the exec_agent section - if line.starts_with(" exec_agent:") { - in_exec_agent = true; - in_deploy = false; - } else if in_exec_agent - && line.starts_with(" ") - && !line.starts_with(" ") - && line.len() > 2 - { - // We've hit another service at the same level, exit exec_agent section - in_exec_agent = false; - in_deploy = false; - } - - // Track if we're in the deploy subsection - if in_exec_agent && line.trim().starts_with("deploy:") { - in_deploy = true; - } else if in_deploy && !line.starts_with(" ") && !line.trim().is_empty() { - // Exit deploy section if we hit a line at same or lower indentation - in_deploy = false; - } - - // Update replicas line if we're in the right section - if in_exec_agent && in_deploy && line.trim().starts_with("replicas:") { - let indent = line.chars().take_while(|c| c.is_whitespace()).collect::(); - updated_line = format!("{}replicas: {}", indent, replicas); - } - - result.push(updated_line); - } - - Ok(result.join("\n")) - } - - fn add_gpu_agents(&self, content: String, num_gpus: usize) -> Result { - let lines: Vec<&str> = content.lines().collect(); - let mut result: Vec = Vec::new(); - - // Find gpu_prove_agent0 section boundaries - let mut gpu_agent_start = None; - let mut gpu_agent_end = None; - - for (i, line) in lines.iter().enumerate() { - if line.starts_with(" gpu_prove_agent0:") { - gpu_agent_start = Some(i); - } else if gpu_agent_start.is_some() && gpu_agent_end.is_none() { - // Look for next service at same indentation level (2 spaces, followed by a letter) - if line.starts_with(" ") - && !line.starts_with(" ") - && line.len() > 2 - && line.chars().nth(2).unwrap().is_alphabetic() - { - gpu_agent_end = Some(i); - break; - } - } - } - - let start = - gpu_agent_start.context("Could not find gpu_prove_agent0 section in compose.yml")?; - let end = gpu_agent_end.unwrap_or(lines.len()); - - // Extract the gpu_prove_agent0 section - let gpu_agent_lines: Vec<&str> = lines[start..end].to_vec(); - - // Build result: everything up to and including gpu_prove_agent0 - for line in &lines[..end] { - result.push(line.to_string()); - } - - // Add additional GPU agents - for i in 1..num_gpus { - result.push(String::new()); // Empty line between services - - for line in &gpu_agent_lines { - let mut new_line = line.to_string(); - // Replace service name - if new_line.contains("gpu_prove_agent0") { - new_line = - new_line.replace("gpu_prove_agent0", &format!("gpu_prove_agent{}", i)); - } - // Replace device_ids - if new_line.contains(r#"device_ids: ["0"]"#) { - new_line = new_line - .replace(r#"device_ids: ["0"]"#, &format!(r#"device_ids: ["{}"]"#, i)); - } - result.push(new_line); - } - } - - // Add remaining content - for line in &lines[end..] { - result.push(line.to_string()); - } - - Ok(result.join("\n")) - } - - fn ask_file_handling_strategy( - &self, - file_path: &Path, - file_type: &str, - display: &DisplayManager, - ) -> Result { - if !file_path.exists() { - return Ok(FileHandlingStrategy::GenerateNew); - } - - display.item_colored( - "Found", - format!("existing {} at {}", file_type, file_path.display()), - "yellow", - ); - - let options = vec![ - selection_strings::FILE_MODIFY_EXISTING, - selection_strings::FILE_GENERATE_NEW, - selection_strings::FILE_CANCEL, - ]; - - let choice = - Select::new(&format!("What would you like to do with {}?", file_type), options) - .with_help_message("Modify preserves your customizations and comments") - .prompt() - .context("Failed to get file handling choice")?; - - match choice { - selection_strings::FILE_MODIFY_EXISTING => Ok(FileHandlingStrategy::ModifyExisting), - selection_strings::FILE_GENERATE_NEW => Ok(FileHandlingStrategy::GenerateNew), - selection_strings::FILE_CANCEL => { - bail!("Configuration cancelled by user"); - } - _ => unreachable!(), - } - } - - fn show_success_message(&self, config: &WizardConfig, display: &DisplayManager) -> Result<()> { - display.header("✓ Configuration Complete!"); - - display.note("Generated files:"); - display.item_colored(" Broker config", self.broker_toml_file.display(), "green"); - display.item_colored(" Compose config", self.compose_yml_file.display(), "green"); - - display.separator(); - display.note("Next steps:"); - display.note("1. Set the following environment variables:"); - - // Check if env vars are set - let private_key_set = - std::env::var("PROVER_PRIVATE_KEY").or_else(|_| std::env::var("PRIVATE_KEY")).is_ok(); - let prover_rpc_url_set = std::env::var("PROVER_RPC_URL").is_ok(); - let legacy_rpc_url_set = std::env::var("RPC_URL").is_ok(); - let reward_address_set = std::env::var("REWARD_ADDRESS").is_ok(); - - if private_key_set { - display.item_colored(" PROVER_PRIVATE_KEY", "✓ Already set", "green"); - } else { - display.note(" export PROVER_PRIVATE_KEY="); - } - - match (prover_rpc_url_set, legacy_rpc_url_set) { - (true, _) => { - display.item_colored(" PROVER_RPC_URL", "✓ Already set", "green"); - } - (false, true) => { - display.item_colored( - " RPC_URL (legacy)", - "✓ Already set (fallback in use)", - "yellow", - ); - display.note(" # Preferred: export PROVER_RPC_URL="); - } - (false, false) => { - display.note(" export PROVER_RPC_URL="); - } - } - - if reward_address_set { - display.item_colored(" REWARD_ADDRESS", "✓ Already set", "green"); - } else { - display.warning(" REWARD_ADDRESS env variable is not set."); - display.note(" This is required in order to receive ZK mining rewards for your work."); - display.note(" (This does not effect the ETH market fees you receive from fulfilling proving requests."); - display.note(" Learn more: https://docs.boundless.network/zkc/mining/overview"); - display.note(""); - display.note(" Option 1: export REWARD_ADDRESS="); - display.note(" Option 2: Set POVW_LOG_ID in compose.yml to your reward address"); - } - - display.note(""); - display.note(&format!( - "2. Ensure you have a minimum of {} ZKC collateral in your prover address:", - config.max_collateral - )); - display.note(" boundless prover balance-collateral"); - - display.note(""); - display.note("3. Start your prover:"); - display.note(" just prover up"); - - display.note(""); - display.note("4. Monitor your prover:"); - display.note(" just prover logs"); - - display.separator(); - display.note("For more information, visit:"); - display.note("https://docs.boundless.network/provers/broker"); - - Ok(()) - } -} - -// CPU thread detection -fn detect_cpu_threads() -> Result { - Ok(num_cpus::get()) -} - -// GPU detection -fn detect_gpus() -> Result { - // Try to detect NVIDIA GPUs using nvidia-smi - let output = std::process::Command::new("nvidia-smi").arg("--list-gpus").output(); - - match output { - Ok(output) if output.status.success() => { - let stdout = String::from_utf8_lossy(&output.stdout); - let count = stdout.lines().filter(|line| line.contains("GPU")).count(); - Ok(count) - } - _ => bail!("Could not detect GPUs automatically using `nvidia-smi --list-gpus`"), - } -} - -// Bento health check -async fn check_bento_health(bento_url: &str) -> Result<()> { - let url = Url::parse(bento_url).context("Invalid Bento URL")?; - let health_url = url.join("health").context("Failed to construct health check URL")?; - - reqwest::get(health_url.clone()) - .await - .with_context(|| format!("Failed to connect to Bento at {}", health_url))? - .error_for_status() - .context("Bento health check returned error status")?; - - Ok(()) -} From db22e9db1751357b7234e5b2d1fdfa6da9ac93b2 Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 21 Nov 2025 17:08:01 -0500 Subject: [PATCH 06/11] include raw error message in error message --- .../src/contracts/boundless_market.rs | 11 +++++++---- crates/broker/src/submitter.rs | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/boundless-market/src/contracts/boundless_market.rs b/crates/boundless-market/src/contracts/boundless_market.rs index 6d184cf66..9cae74e51 100644 --- a/crates/boundless-market/src/contracts/boundless_market.rs +++ b/crates/boundless-market/src/contracts/boundless_market.rs @@ -118,8 +118,10 @@ pub enum MarketError { PaymentRequirementsFailed(IBoundlessMarketErrors), /// Payment requirements failed, unable to decode error - #[error("Payment requirements failed during order fulfillment: error unknown")] - PaymentRequirementsFailedUnknownError, + #[error( + "Payment requirements failed during order fulfillment: unrecognized error payload {0:?}" + )] + PaymentRequirementsFailedUnknownError(Bytes), } impl From for MarketError { @@ -218,9 +220,10 @@ fn extract_tx_log( fn validate_fulfill_receipt(receipt: TransactionReceipt) -> Result<(), MarketError> { if let Some(log) = receipt.decoded_log::() { - match IBoundlessMarketErrors::abi_decode(&**log.error) { + let raw_error = Bytes::copy_from_slice(&log.error); + match IBoundlessMarketErrors::abi_decode(&raw_error) { Ok(err) => Err(MarketError::PaymentRequirementsFailed(err)), - Err(_) => Err(MarketError::PaymentRequirementsFailedUnknownError), + Err(_) => Err(MarketError::PaymentRequirementsFailedUnknownError(raw_error)), } } else { Ok(()) diff --git a/crates/broker/src/submitter.rs b/crates/broker/src/submitter.rs index 2b199fb49..ab0b19dd9 100644 --- a/crates/broker/src/submitter.rs +++ b/crates/broker/src/submitter.rs @@ -586,13 +586,13 @@ where return Ok(()); } Err(SubmitterErr::MarketError( - MarketError::PaymentRequirementsFailedUnknownError, + MarketError::PaymentRequirementsFailedUnknownError(raw), )) => { tracing::warn!( - "Payment requirement failed for one or more orders, will not retry" + "Payment requirement failed for one or more orders, will not retry (raw error: {raw:?})" ); errors.push(SubmitterErr::MarketError( - MarketError::PaymentRequirementsFailedUnknownError, + MarketError::PaymentRequirementsFailedUnknownError(raw), )); break; } From 8320cca70c3372f61661e1788bab843d00865dac Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 21 Nov 2025 18:29:00 -0500 Subject: [PATCH 07/11] add sanity check for fulfillment status before finishing proving --- crates/broker/src/lib.rs | 23 +++++++++-------- crates/broker/src/proving.rs | 49 ++++++++++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/crates/broker/src/lib.rs b/crates/broker/src/lib.rs index ff6f8af77..0fe9bf440 100644 --- a/crates/broker/src/lib.rs +++ b/crates/broker/src/lib.rs @@ -22,7 +22,7 @@ use crate::storage::create_uri_handler; use alloy::{ network::Ethereum, primitives::{Address, Bytes, FixedBytes, U256}, - providers::{Provider, WalletProvider}, + providers::{DynProvider, Provider, WalletProvider}, signers::local::PrivateKeySigner, }; use anyhow::{Context, Result}; @@ -890,16 +890,19 @@ where Arc::new(provers::DefaultProver::new()) }; + let prover_addr = self.provider.default_signer_address(); + let (pricing_tx, pricing_rx) = mpsc::channel(PRICING_CHANNEL_CAPACITY); - let collateral_token_decimals = BoundlessMarketService::new( + let market = Arc::new(BoundlessMarketService::new( self.deployment().boundless_market_address, - self.provider.clone(), - Address::ZERO, - ) - .collateral_token_decimals() - .await - .context("Failed to get stake token decimals. Possible RPC error.")?; + DynProvider::new(self.provider.clone()), + prover_addr, + )); + let collateral_token_decimals = market + .collateral_token_decimals() + .await + .context("Failed to get stake token decimals. Possible RPC error.")?; // Spin up the order picker to pre-flight and find orders to lock let order_picker = Arc::new(order_picker::OrderPicker::new( @@ -932,6 +935,7 @@ where config.clone(), order_state_tx.clone(), self.priority_requestors.clone(), + market.clone(), ) .await .context("Failed to initialize proving service")?, @@ -947,9 +951,6 @@ where Ok(()) }); - let prover_addr = - self.args.private_key.as_ref().expect("Private key must be set").address(); - let order_monitor = Arc::new(order_monitor::OrderMonitor::new( self.db.clone(), self.provider.clone(), diff --git a/crates/broker/src/proving.rs b/crates/broker/src/proving.rs index d9fb33bcd..39fcc126e 100644 --- a/crates/broker/src/proving.rs +++ b/crates/broker/src/proving.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use crate::{ config::ConfigLock, @@ -27,7 +27,9 @@ use crate::{ utils::cancel_proof_and_fail_order, FulfillmentType, Order, OrderStateChange, OrderStatus, }; +use alloy::providers::DynProvider; use anyhow::{Context, Result}; +use boundless_market::contracts::boundless_market::BoundlessMarketService; use thiserror::Error; use tokio_util::sync::CancellationToken; @@ -66,6 +68,7 @@ pub struct ProvingService { config: ConfigLock, order_state_tx: tokio::sync::broadcast::Sender, priority_requestors: PriorityRequestors, + fulfillment_market: Arc>, } impl ProvingService { @@ -75,8 +78,9 @@ impl ProvingService { config: ConfigLock, order_state_tx: tokio::sync::broadcast::Sender, priority_requestors: PriorityRequestors, + fulfillment_market: Arc>, ) -> Result { - Ok(Self { db, prover, config, order_state_tx, priority_requestors }) + Ok(Self { db, prover, config, order_state_tx, priority_requestors, fulfillment_market }) } async fn monitor_proof_internal( @@ -339,6 +343,7 @@ impl ProvingService { async fn prove_and_update_db(&self, mut order: Order) { let order_id = order.id(); + let request_id = order.request.id; let (proof_retry_count, proof_retry_sleep_ms) = { let config = self.config.lock_all().unwrap(); @@ -378,6 +383,22 @@ impl ProvingService { Ok(order_status) => { tracing::info!("Successfully completed proof monitoring for order {order_id}"); + let is_fulfilled = self + .fulfillment_market + .is_fulfilled(request_id) + .await + .inspect_err(|e| { + tracing::warn!( + "Failed to sanity check fulfillment status for order {order_id}: {e:?}" + ); + }) + .unwrap_or(false); + if is_fulfilled { + tracing::warn!("Fulfillment event was missed, skipping aggregation for fulfilled order {order_id}"); + handle_order_failure(&self.db, &order_id, "Fulfilled before aggregation").await; + return; + } + if let Err(e) = self.db.set_aggregation_status(&order_id, order_status).await { tracing::error!("Failed to set aggregation status for order {order_id}: {e:?}"); } @@ -494,15 +515,32 @@ mod tests { FulfillmentType, OrderStatus, }; use alloy::primitives::{Address, Bytes, U256}; + use alloy::providers::{DynProvider, Provider, ProviderBuilder}; + use alloy::transports::mock::Asserter; use boundless_market::contracts::{ Offer, Predicate, ProofRequest, RequestInput, RequestInputType, Requirements, }; use boundless_test_utils::guests::{ECHO_ELF, ECHO_ID}; use chrono::Utc; + use hex::encode; use risc0_zkvm::sha::Digest; use std::sync::Arc; use tracing_test::traced_test; + fn mock_market(responses: Vec) -> Arc> { + let asserter = Asserter::new(); + for fulfilled in responses { + let mut data = [0u8; 32]; + if fulfilled { + data[31] = 1; + } + asserter.push_success(&format!("0x{}", encode(data))); + } + + let provider = ProviderBuilder::new().connect_mocked_client(asserter).erased(); + Arc::new(BoundlessMarketService::new(Address::ZERO, provider, Address::ZERO)) + } + fn create_test_order( request_id: U256, image_id: String, @@ -573,6 +611,7 @@ mod tests { async fn prove_order() { let db: DbObj = Arc::new(SqliteDb::new("sqlite::memory:").await.unwrap()); let config = ConfigLock::default(); + let market = mock_market(vec![false; 4]); let prover: ProverObj = Arc::new(DefaultProver::new()); let image_id = Digest::from(ECHO_ID).to_string(); @@ -590,6 +629,7 @@ mod tests { config.clone(), order_state_tx, priority_requestors, + market.clone(), ) .await .unwrap(); @@ -619,6 +659,7 @@ mod tests { config.clone(), order_state_tx.clone(), priority_requestors, + market.clone(), ) .await .unwrap(); @@ -656,6 +697,7 @@ mod tests { let config = ConfigLock::default(); let prover: ProverObj = Arc::new(DefaultProver::new()); + let market = mock_market(vec![false; 6]); let image_id = Digest::from(ECHO_ID).to_string(); prover.upload_image(&image_id, ECHO_ELF.to_vec()).await.unwrap(); @@ -675,6 +717,7 @@ mod tests { config.clone(), order_state_tx, priority_requestors, + market, ) .await .unwrap(); @@ -750,6 +793,7 @@ mod tests { let db: DbObj = Arc::new(SqliteDb::new("sqlite::memory:").await.unwrap()); let config = ConfigLock::default(); let prover: ProverObj = Arc::new(DefaultProver::new()); + let market = mock_market(vec![false; 6]); let image_id = Digest::from(ECHO_ID).to_string(); prover.upload_image(&image_id, ECHO_ELF.to_vec()).await.unwrap(); @@ -766,6 +810,7 @@ mod tests { config.clone(), order_state_tx.clone(), priority_requestors, + market, ) .await .unwrap(); From abec7b6022e5db3fc5c5cb8a5a2eb5b8644f2a6a Mon Sep 17 00:00:00 2001 From: austinabell Date: Sat, 22 Nov 2025 11:41:58 -0500 Subject: [PATCH 08/11] fix to only cancel submitting if secondary fulfilling --- crates/broker/src/proving.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/crates/broker/src/proving.rs b/crates/broker/src/proving.rs index 39fcc126e..8f3401e6c 100644 --- a/crates/broker/src/proving.rs +++ b/crates/broker/src/proving.rs @@ -383,20 +383,23 @@ impl ProvingService { Ok(order_status) => { tracing::info!("Successfully completed proof monitoring for order {order_id}"); - let is_fulfilled = self - .fulfillment_market - .is_fulfilled(request_id) - .await - .inspect_err(|e| { - tracing::warn!( + if order.fulfillment_type == FulfillmentType::FulfillAfterLockExpire { + let is_fulfilled = self + .fulfillment_market + .is_fulfilled(request_id) + .await + .inspect_err(|e| { + tracing::warn!( "Failed to sanity check fulfillment status for order {order_id}: {e:?}" ); - }) - .unwrap_or(false); - if is_fulfilled { - tracing::warn!("Fulfillment event was missed, skipping aggregation for fulfilled order {order_id}"); - handle_order_failure(&self.db, &order_id, "Fulfilled before aggregation").await; - return; + }) + .unwrap_or(false); + if is_fulfilled { + tracing::warn!("Fulfillment event was missed, skipping aggregation for fulfilled order {order_id}"); + handle_order_failure(&self.db, &order_id, "Fulfilled before aggregation") + .await; + return; + } } if let Err(e) = self.db.set_aggregation_status(&order_id, order_status).await { From 212ef6967d743bdb34b7bc97f6e0d0c51520f48e Mon Sep 17 00:00:00 2001 From: austinabell Date: Sat, 22 Nov 2025 11:43:07 -0500 Subject: [PATCH 09/11] add comment to give context about fulfillment check --- crates/broker/src/proving.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/broker/src/proving.rs b/crates/broker/src/proving.rs index 8f3401e6c..aaa0f2485 100644 --- a/crates/broker/src/proving.rs +++ b/crates/broker/src/proving.rs @@ -383,6 +383,8 @@ impl ProvingService { Ok(order_status) => { tracing::info!("Successfully completed proof monitoring for order {order_id}"); + // Note: this sanity check isn't strictly necessary, but is to avoid submitting the + // order when the fulfillment event was missed. if order.fulfillment_type == FulfillmentType::FulfillAfterLockExpire { let is_fulfilled = self .fulfillment_market From cbdaba517e0ca5a4b0a9968eadb7103a639bbbd0 Mon Sep 17 00:00:00 2001 From: austinabell Date: Sat, 22 Nov 2025 12:24:40 -0500 Subject: [PATCH 10/11] update e2e test --- crates/boundless-market/tests/e2e.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/boundless-market/tests/e2e.rs b/crates/boundless-market/tests/e2e.rs index f0b101279..ae88969fe 100644 --- a/crates/boundless-market/tests/e2e.rs +++ b/crates/boundless-market/tests/e2e.rs @@ -21,8 +21,9 @@ use alloy::{ use alloy_primitives::Bytes; use boundless_market::{ contracts::{ - boundless_market::{FulfillmentTx, UnlockedRequest}, + boundless_market::{FulfillmentTx, MarketError, UnlockedRequest}, hit_points::default_allowance, + IBoundlessMarket::IBoundlessMarketErrors, AssessorReceipt, FulfillmentData, FulfillmentDataType, Offer, Predicate, ProofRequest, RequestId, RequestStatus, Requirements, }, @@ -434,11 +435,17 @@ async fn test_e2e_no_payment() { }; let balance_before = ctx.prover_market.balance_of(some_other_address).await.unwrap(); - // fulfill the request. - ctx.prover_market + // fulfill the request. This call should fail payment requirements since the lock belongs + // to a different prover, but the request itself still becomes fulfilled on-chain. + let err = ctx + .prover_market .fulfill(FulfillmentTx::new(vec![fulfillment.clone()], assessor_fill.clone())) .await - .unwrap(); + .expect_err("expected payment requirement failure for mismatched locker"); + match err { + MarketError::PaymentRequirementsFailed(IBoundlessMarketErrors::RequestIsLocked(_)) => {} + other => panic!("unexpected error when fulfilling with mismatched prover: {other:?}"), + } assert!(ctx.customer_market.is_fulfilled(request_id).await.unwrap()); let balance_after = ctx.prover_market.balance_of(some_other_address).await.unwrap(); assert!(balance_before == balance_after); From 0533e00a8e358bded39a3db456a0623bdf31ac6e Mon Sep 17 00:00:00 2001 From: austinabell Date: Tue, 25 Nov 2025 19:18:49 -0500 Subject: [PATCH 11/11] update to warn log on payment error event instead of erroring for the batch transaction --- crates/boundless-market/Cargo.toml | 2 +- .../src/contracts/boundless_market.rs | 35 +++++++++++++++---- crates/boundless-market/tests/e2e.rs | 18 ++++------ 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/crates/boundless-market/Cargo.toml b/crates/boundless-market/Cargo.toml index a49ffef24..6963e0aa1 100644 --- a/crates/boundless-market/Cargo.toml +++ b/crates/boundless-market/Cargo.toml @@ -66,4 +66,4 @@ serde_json = { workspace = true } [dev-dependencies] boundless-test-utils = { workspace = true } -tracing-test = { workspace = true } +tracing-test = { workspace = true, features = ["no-env-filter"] } diff --git a/crates/boundless-market/src/contracts/boundless_market.rs b/crates/boundless-market/src/contracts/boundless_market.rs index 9cae74e51..1446d234b 100644 --- a/crates/boundless-market/src/contracts/boundless_market.rs +++ b/crates/boundless-market/src/contracts/boundless_market.rs @@ -219,15 +219,36 @@ fn extract_tx_log( } fn validate_fulfill_receipt(receipt: TransactionReceipt) -> Result<(), MarketError> { - if let Some(log) = receipt.decoded_log::() { - let raw_error = Bytes::copy_from_slice(&log.error); - match IBoundlessMarketErrors::abi_decode(&raw_error) { - Ok(err) => Err(MarketError::PaymentRequirementsFailed(err)), - Err(_) => Err(MarketError::PaymentRequirementsFailedUnknownError(raw_error)), + for (idx, log) in receipt.inner.logs().iter().enumerate() { + if log.topic0().is_some_and(|topic| { + *topic == IBoundlessMarket::PaymentRequirementsFailed::SIGNATURE_HASH + }) { + match log.log_decode::() { + Ok(decoded) => { + let raw_error = Bytes::copy_from_slice(decoded.inner.data.error.as_ref()); + match IBoundlessMarketErrors::abi_decode(&raw_error) { + Ok(err) => tracing::warn!( + tx_hash = ?receipt.transaction_hash, + log_index = idx, + "Payment requirements failed for at least one fulfillment: {err:?}" + ), + Err(_) => tracing::warn!( + tx_hash = ?receipt.transaction_hash, + log_index = idx, + raw = ?raw_error, + "Payment requirements failed for at least one fulfillment, but error payload was unrecognized" + ), + } + } + Err(err) => tracing::warn!( + tx_hash = ?receipt.transaction_hash, + log_index = idx, + "Failed to decode PaymentRequirementsFailed event: {err:?}" + ), + } } - } else { - Ok(()) } + Ok(()) } /// Data returned when querying for a RequestSubmitted event diff --git a/crates/boundless-market/tests/e2e.rs b/crates/boundless-market/tests/e2e.rs index ae88969fe..d544b253d 100644 --- a/crates/boundless-market/tests/e2e.rs +++ b/crates/boundless-market/tests/e2e.rs @@ -21,9 +21,8 @@ use alloy::{ use alloy_primitives::Bytes; use boundless_market::{ contracts::{ - boundless_market::{FulfillmentTx, MarketError, UnlockedRequest}, + boundless_market::{FulfillmentTx, UnlockedRequest}, hit_points::default_allowance, - IBoundlessMarket::IBoundlessMarketErrors, AssessorReceipt, FulfillmentData, FulfillmentDataType, Offer, Predicate, ProofRequest, RequestId, RequestStatus, Requirements, }, @@ -375,6 +374,7 @@ async fn test_e2e_price_and_fulfill_batch() { } #[tokio::test] +#[traced_test] async fn test_e2e_no_payment() { // Setup anvil let anvil = Anvil::new().spawn(); @@ -435,17 +435,13 @@ async fn test_e2e_no_payment() { }; let balance_before = ctx.prover_market.balance_of(some_other_address).await.unwrap(); - // fulfill the request. This call should fail payment requirements since the lock belongs - // to a different prover, but the request itself still becomes fulfilled on-chain. - let err = ctx - .prover_market + // fulfill the request. This call emits a PaymentRequirementsFailed log since the lock + // belongs to a different prover, but the request itself still becomes fulfilled on-chain. + ctx.prover_market .fulfill(FulfillmentTx::new(vec![fulfillment.clone()], assessor_fill.clone())) .await - .expect_err("expected payment requirement failure for mismatched locker"); - match err { - MarketError::PaymentRequirementsFailed(IBoundlessMarketErrors::RequestIsLocked(_)) => {} - other => panic!("unexpected error when fulfilling with mismatched prover: {other:?}"), - } + .expect("fulfillment should succeed even if payment requirements fail"); + assert!(logs_contain("Payment requirements failed for at least one fulfillment")); assert!(ctx.customer_market.is_fulfilled(request_id).await.unwrap()); let balance_after = ctx.prover_market.balance_of(some_other_address).await.unwrap(); assert!(balance_before == balance_after);