diff --git a/crates/ethereum/payload/src/lib.rs b/crates/ethereum/payload/src/lib.rs index d092267c4be..d2f36a4dee9 100644 --- a/crates/ethereum/payload/src/lib.rs +++ b/crates/ethereum/payload/src/lib.rs @@ -37,7 +37,10 @@ use reth_transaction_pool::{ ValidPoolTransaction, }; use revm::context_interface::Block as _; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use tracing::{debug, trace, warn}; mod config; @@ -114,7 +117,13 @@ where &self, config: PayloadConfig, ) -> Result { - let args = BuildArguments::new(Default::default(), config, Default::default(), None); + let args = BuildArguments::new( + Default::default(), + config, + Default::default(), + None, + Arc::new(AtomicBool::new(false)), + ); default_ethereum_payload( self.evm_config.clone(), @@ -149,7 +158,7 @@ where Pool: TransactionPool>, F: FnOnce(BestTransactionsAttributes) -> BestTransactionsIter, { - let BuildArguments { mut cached_reads, config, cancel, best_payload } = args; + let BuildArguments { mut cached_reads, config, cancel, best_payload, is_resolving } = args; let PayloadConfig { parent_header, attributes } = config; let state_provider = client.state_by_block_hash(parent_header.hash())?; @@ -228,6 +237,13 @@ where return Ok(BuildOutcome::Cancelled) } + // check if the payload is being resolved, if so we should stop adding more transactions + // and return whatever payload we have built so far + if is_resolving.load(Ordering::Relaxed) { + debug!(target: "payload_builder", "payload is being resolved, stopping transaction processing"); + break + } + // convert tx to a signed transaction let tx = pool_tx.to_consensus(); diff --git a/crates/optimism/payload/src/builder.rs b/crates/optimism/payload/src/builder.rs index e44e28fc8f4..0bac1f12d55 100644 --- a/crates/optimism/payload/src/builder.rs +++ b/crates/optimism/payload/src/builder.rs @@ -39,7 +39,13 @@ use reth_revm::{ use reth_storage_api::{errors::ProviderError, StateProvider, StateProviderFactory}; use reth_transaction_pool::{BestTransactionsAttributes, PoolTransaction, TransactionPool}; use revm::context::{Block, BlockEnv}; -use std::{marker::PhantomData, sync::Arc}; +use std::{ + marker::PhantomData, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; use tracing::{debug, trace, warn}; /// Optimism's payload builder @@ -180,7 +186,7 @@ where Txs: PayloadTransactions + OpPooledTx>, { - let BuildArguments { mut cached_reads, config, cancel, best_payload } = args; + let BuildArguments { mut cached_reads, config, cancel, best_payload, is_resolving } = args; let ctx = OpPayloadBuilderCtx { evm_config: self.evm_config.clone(), @@ -189,6 +195,7 @@ where config, cancel, best_payload, + is_resolving, }; let builder = OpBuilder::new(best); @@ -225,6 +232,7 @@ where config, cancel: Default::default(), best_payload: Default::default(), + is_resolving: Arc::new(AtomicBool::new(false)), }; let state_provider = self.client.state_by_block_hash(ctx.parent().hash())?; @@ -279,6 +287,7 @@ where cached_reads: Default::default(), cancel: Default::default(), best_payload: None, + is_resolving: Arc::new(AtomicBool::new(false)), }; self.build_payload(args, |_| NoopPayloadTransactions::::default())? .into_payload() @@ -558,6 +567,8 @@ pub struct OpPayloadBuilderCtx< pub cancel: CancelOnDrop, /// The currently best payload. pub best_payload: Option>, + /// Flag indicating whether the payload job is being resolved. + pub is_resolving: Arc, } impl OpPayloadBuilderCtx @@ -597,6 +608,13 @@ where is_better_payload(self.best_payload.as_ref(), total_fees) } + /// Returns `true` if the payload job is being resolved. + /// + /// When this returns `true`, the builder should finish as quickly as possible. + pub fn is_resolving(&self) -> bool { + self.is_resolving.load(Ordering::Relaxed) + } + /// Prepares a [`BlockBuilder`] for the next block. pub fn block_builder<'a, DB: Database>( &'a self, @@ -740,6 +758,13 @@ where return Ok(Some(())) } + // check if the payload is being resolved, if so we should stop adding more transactions + // and return whatever payload we have built so far + if self.is_resolving() { + debug!(target: "payload_builder", "payload is being resolved, stopping transaction processing"); + return Ok(Some(())) + } + let gas_used = match builder.execute_transaction(tx.clone()) { Ok(gas_used) => gas_used, Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx { diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 84a4f64a4e8..ac0b8dbdbb8 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -26,7 +26,10 @@ use std::{ future::Future, ops::Deref, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, task::{Context, Poll}, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -177,6 +180,7 @@ where payload_task_guard: self.payload_task_guard.clone(), metrics: Default::default(), builder: self.builder.clone(), + is_resolving: Arc::new(AtomicBool::new(false)), }; // start the first job right away @@ -328,6 +332,12 @@ where /// /// See [`PayloadBuilder`] builder: Builder, + /// Flag to indicate that the payload is being resolved. + /// + /// This is set to `true` when [`PayloadJob::resolve`] is called, signaling to any + /// in-progress build tasks that they should finish as quickly as possible and return + /// whatever payload they have built so far rather than waiting for more transactions. + is_resolving: Arc, } impl BasicPayloadJob @@ -349,11 +359,17 @@ where self.metrics.inc_initiated_payload_builds(); let cached_reads = self.cached_reads.take().unwrap_or_default(); let builder = self.builder.clone(); + let is_resolving = self.is_resolving.clone(); self.executor.spawn_blocking(Box::pin(async move { // acquire the permit for executing the task let _permit = guard.acquire().await; - let args = - BuildArguments { cached_reads, config: payload_config, cancel, best_payload }; + let args = BuildArguments { + cached_reads, + config: payload_config, + cancel, + best_payload, + is_resolving, + }; let result = builder.try_build(args); let _ = tx.send(result); })); @@ -463,6 +479,9 @@ where &mut self, kind: PayloadKind, ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { + // Signal to any in-progress build tasks that we're resolving + self.is_resolving.store(true, Ordering::SeqCst); + let best_payload = self.best_payload.payload().cloned(); if best_payload.is_none() && self.pending_block.is_none() { // ensure we have a job scheduled if we don't have a best payload yet and none is active @@ -480,6 +499,7 @@ where config: self.config.clone(), cancel: CancelOnDrop::default(), best_payload: None, + is_resolving: self.is_resolving.clone(), }; match self.builder.on_missing_payload(args) { @@ -804,6 +824,11 @@ pub struct BuildArguments { pub cancel: CancelOnDrop, /// The best payload achieved so far. pub best_payload: Option, + /// Flag indicating whether the payload job is being resolved. + /// + /// When `true`, the builder should finish as quickly as possible and return whatever + /// payload it has built so far, rather than waiting for more transactions. + pub is_resolving: Arc, } impl BuildArguments { @@ -813,8 +838,16 @@ impl BuildArguments { config: PayloadConfig>, cancel: CancelOnDrop, best_payload: Option, + is_resolving: Arc, ) -> Self { - Self { cached_reads, config, cancel, best_payload } + Self { cached_reads, config, cancel, best_payload, is_resolving } + } + + /// Returns `true` if the payload job is being resolved. + /// + /// When this returns `true`, the builder should finish as quickly as possible. + pub fn is_resolving(&self) -> bool { + self.is_resolving.load(Ordering::Relaxed) } } diff --git a/crates/payload/basic/src/stack.rs b/crates/payload/basic/src/stack.rs index ba5c927b9f3..b0f80ec3697 100644 --- a/crates/payload/basic/src/stack.rs +++ b/crates/payload/basic/src/stack.rs @@ -194,7 +194,7 @@ where &self, args: BuildArguments, ) -> Result, PayloadBuilderError> { - let BuildArguments { cached_reads, config, cancel, best_payload } = args; + let BuildArguments { cached_reads, config, cancel, best_payload, is_resolving } = args; let PayloadConfig { parent_header, attributes } = config; match attributes { @@ -210,6 +210,7 @@ where None } }), + is_resolving, }; self.left.try_build(left_args).map(|out| out.map_payload(Either::Left)) } @@ -225,6 +226,7 @@ where None } }), + is_resolving, }; self.right.try_build(right_args).map(|out| out.map_payload(Either::Right)) }