From 415b8db2915937ef89aaf48c23bc1711967105c4 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 04:11:01 +0000 Subject: [PATCH 01/20] rebase --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 6 +++--- docs/vocs/docs/pages/cli/reth/node.mdx | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 1e5b226f591..adeac7131c2 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -35,8 +35,8 @@ use std::{ use tracing::{debug, error, instrument, trace}; /// Default upper bound for inflight multiproof calculations. These would be sitting in the queue -/// waiting to be processed. -const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 128; +/// waiting to +pub const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256; /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. @@ -386,7 +386,7 @@ impl MultiproofManager { } } - const fn is_full(&self) -> bool { + fn is_full(&self) -> bool { self.inflight >= self.inflight_limit } diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 3fc6988dc69..dedcd5b385e 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -1009,4 +1009,4 @@ Tracing: Defaults to TRACE if not specified. [default: debug] -``` \ No newline at end of file +``` From d85cc5f7ea3188fed6a479ce981ed1b37e3bcadd Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Tue, 21 Oct 2025 08:37:43 +0000 Subject: [PATCH 02/20] fmt --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index adeac7131c2..2ebc67d81c4 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -35,7 +35,7 @@ use std::{ use tracing::{debug, error, instrument, trace}; /// Default upper bound for inflight multiproof calculations. These would be sitting in the queue -/// waiting to +/// waiting to pub const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256; /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the From 452457e0c7817d0d963acf43c2105c2097c61810 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Tue, 21 Oct 2025 08:42:51 +0000 Subject: [PATCH 03/20] clippy --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 2ebc67d81c4..a9ce00ff447 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -386,7 +386,7 @@ impl MultiproofManager { } } - fn is_full(&self) -> bool { + const fn is_full(&self) -> bool { self.inflight >= self.inflight_limit } From 3c676ac7c511bd743b809e006750f680d3d038e9 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Tue, 21 Oct 2025 08:57:07 +0000 Subject: [PATCH 04/20] remove --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index a9ce00ff447..e357a95c4b7 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -35,8 +35,8 @@ use std::{ use tracing::{debug, error, instrument, trace}; /// Default upper bound for inflight multiproof calculations. These would be sitting in the queue -/// waiting to -pub const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256; +/// waiting to be processed. +const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256; /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. From 042b4a79fcdb4e6d45a25a32732d542b6af388c8 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Tue, 21 Oct 2025 10:35:32 +0000 Subject: [PATCH 05/20] perf: removed pending --- .../src/tree/payload_processor/multiproof.rs | 35 ++----------------- 1 file changed, 3 insertions(+), 32 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index e357a95c4b7..2f227ee5266 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -24,7 +24,7 @@ use reth_trie_parallel::{ root::ParallelStateRootError, }; use std::{ - collections::{BTreeMap, VecDeque}, + collections::BTreeMap, ops::DerefMut, sync::{ mpsc::{channel, Receiver, Sender}, @@ -34,10 +34,6 @@ use std::{ }; use tracing::{debug, error, instrument, trace}; -/// Default upper bound for inflight multiproof calculations. These would be sitting in the queue -/// waiting to be processed. -const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256; - /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. #[derive(Default, Debug)] @@ -337,17 +333,11 @@ impl MultiproofInput { } /// Manages concurrent multiproof calculations. -/// Takes care of not having more calculations in flight than a given maximum -/// concurrency, further calculation requests are queued and spawn later, after -/// availability has been signaled. +/// Takes care of spawning multiproof calculations and tracking in-flight work. #[derive(Debug)] pub struct MultiproofManager { - /// Maximum number of proof calculations allowed to be inflight at once. - inflight_limit: usize, /// Currently running calculations. inflight: usize, - /// Queued calculations. - pending: VecDeque, /// Executor for tasks executor: WorkloadExecutor, /// Handle to the proof worker pools (storage and account). @@ -376,8 +366,6 @@ impl MultiproofManager { proof_worker_handle: ProofWorkerHandle, ) -> Self { Self { - pending: VecDeque::with_capacity(DEFAULT_MULTIPROOF_INFLIGHT_LIMIT), - inflight_limit: DEFAULT_MULTIPROOF_INFLIGHT_LIMIT, executor, inflight: 0, metrics, @@ -386,11 +374,7 @@ impl MultiproofManager { } } - const fn is_full(&self) -> bool { - self.inflight >= self.inflight_limit - } - - /// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached. + /// Spawns a new multiproof calculation for the provided input. fn spawn_or_queue(&mut self, input: PendingMultiproofTask) { // If there are no proof targets, we can just send an empty multiproof back immediately if input.proof_targets_is_empty() { @@ -402,12 +386,6 @@ impl MultiproofManager { return } - if self.is_full() { - self.pending.push_back(input); - self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64); - return; - } - self.spawn_multiproof_task(input); } @@ -416,11 +394,6 @@ impl MultiproofManager { fn on_calculation_complete(&mut self) { self.inflight = self.inflight.saturating_sub(1); self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); - - if let Some(input) = self.pending.pop_front() { - self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64); - self.spawn_multiproof_task(input); - } } /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage @@ -606,8 +579,6 @@ impl MultiproofManager { pub(crate) struct MultiProofTaskMetrics { /// Histogram of inflight multiproofs. pub inflight_multiproofs_histogram: Histogram, - /// Histogram of pending multiproofs. - pub pending_multiproofs_histogram: Histogram, /// Histogram of the number of prefetch proof target accounts. pub prefetch_proof_targets_accounts_histogram: Histogram, From 089dacd72b23bcc039d10a6f36831fbc5aeebf66 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Tue, 21 Oct 2025 10:43:29 +0000 Subject: [PATCH 06/20] Revert "perf: removed pending" This reverts commit 941942484f55265a40cdcbc3477c8a3f264a23f4. --- .../src/tree/payload_processor/multiproof.rs | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 2f227ee5266..e357a95c4b7 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -24,7 +24,7 @@ use reth_trie_parallel::{ root::ParallelStateRootError, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, VecDeque}, ops::DerefMut, sync::{ mpsc::{channel, Receiver, Sender}, @@ -34,6 +34,10 @@ use std::{ }; use tracing::{debug, error, instrument, trace}; +/// Default upper bound for inflight multiproof calculations. These would be sitting in the queue +/// waiting to be processed. +const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256; + /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. #[derive(Default, Debug)] @@ -333,11 +337,17 @@ impl MultiproofInput { } /// Manages concurrent multiproof calculations. -/// Takes care of spawning multiproof calculations and tracking in-flight work. +/// Takes care of not having more calculations in flight than a given maximum +/// concurrency, further calculation requests are queued and spawn later, after +/// availability has been signaled. #[derive(Debug)] pub struct MultiproofManager { + /// Maximum number of proof calculations allowed to be inflight at once. + inflight_limit: usize, /// Currently running calculations. inflight: usize, + /// Queued calculations. + pending: VecDeque, /// Executor for tasks executor: WorkloadExecutor, /// Handle to the proof worker pools (storage and account). @@ -366,6 +376,8 @@ impl MultiproofManager { proof_worker_handle: ProofWorkerHandle, ) -> Self { Self { + pending: VecDeque::with_capacity(DEFAULT_MULTIPROOF_INFLIGHT_LIMIT), + inflight_limit: DEFAULT_MULTIPROOF_INFLIGHT_LIMIT, executor, inflight: 0, metrics, @@ -374,7 +386,11 @@ impl MultiproofManager { } } - /// Spawns a new multiproof calculation for the provided input. + const fn is_full(&self) -> bool { + self.inflight >= self.inflight_limit + } + + /// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached. fn spawn_or_queue(&mut self, input: PendingMultiproofTask) { // If there are no proof targets, we can just send an empty multiproof back immediately if input.proof_targets_is_empty() { @@ -386,6 +402,12 @@ impl MultiproofManager { return } + if self.is_full() { + self.pending.push_back(input); + self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64); + return; + } + self.spawn_multiproof_task(input); } @@ -394,6 +416,11 @@ impl MultiproofManager { fn on_calculation_complete(&mut self) { self.inflight = self.inflight.saturating_sub(1); self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); + + if let Some(input) = self.pending.pop_front() { + self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64); + self.spawn_multiproof_task(input); + } } /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage @@ -579,6 +606,8 @@ impl MultiproofManager { pub(crate) struct MultiProofTaskMetrics { /// Histogram of inflight multiproofs. pub inflight_multiproofs_histogram: Histogram, + /// Histogram of pending multiproofs. + pub pending_multiproofs_histogram: Histogram, /// Histogram of the number of prefetch proof target accounts. pub prefetch_proof_targets_accounts_histogram: Histogram, From 139938d0aaf6b2624bcee2ab3a394ac812aea666 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Tue, 21 Oct 2025 11:17:30 +0000 Subject: [PATCH 07/20] refactor: rm pending queue --- .../src/tree/payload_processor/multiproof.rs | 66 +++++-------------- 1 file changed, 16 insertions(+), 50 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index e357a95c4b7..3c6c1a92765 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -24,7 +24,7 @@ use reth_trie_parallel::{ root::ParallelStateRootError, }; use std::{ - collections::{BTreeMap, VecDeque}, + collections::BTreeMap, ops::DerefMut, sync::{ mpsc::{channel, Receiver, Sender}, @@ -34,10 +34,6 @@ use std::{ }; use tracing::{debug, error, instrument, trace}; -/// Default upper bound for inflight multiproof calculations. These would be sitting in the queue -/// waiting to be processed. -const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256; - /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. #[derive(Default, Debug)] @@ -337,17 +333,10 @@ impl MultiproofInput { } /// Manages concurrent multiproof calculations. -/// Takes care of not having more calculations in flight than a given maximum -/// concurrency, further calculation requests are queued and spawn later, after -/// availability has been signaled. #[derive(Debug)] pub struct MultiproofManager { - /// Maximum number of proof calculations allowed to be inflight at once. - inflight_limit: usize, /// Currently running calculations. inflight: usize, - /// Queued calculations. - pending: VecDeque, /// Executor for tasks executor: WorkloadExecutor, /// Handle to the proof worker pools (storage and account). @@ -376,22 +365,16 @@ impl MultiproofManager { proof_worker_handle: ProofWorkerHandle, ) -> Self { Self { - pending: VecDeque::with_capacity(DEFAULT_MULTIPROOF_INFLIGHT_LIMIT), - inflight_limit: DEFAULT_MULTIPROOF_INFLIGHT_LIMIT, - executor, inflight: 0, + executor, metrics, proof_worker_handle, missed_leaves_storage_roots: Default::default(), } } - const fn is_full(&self) -> bool { - self.inflight >= self.inflight_limit - } - - /// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached. - fn spawn_or_queue(&mut self, input: PendingMultiproofTask) { + /// Spawns a new multiproof calculation. + fn spawn(&mut self, input: PendingMultiproofTask) { // If there are no proof targets, we can just send an empty multiproof back immediately if input.proof_targets_is_empty() { debug!( @@ -402,27 +385,9 @@ impl MultiproofManager { return } - if self.is_full() { - self.pending.push_back(input); - self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64); - return; - } - self.spawn_multiproof_task(input); } - /// Signals that a multiproof calculation has finished and there's room to - /// spawn a new calculation if needed. - fn on_calculation_complete(&mut self) { - self.inflight = self.inflight.saturating_sub(1); - self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); - - if let Some(input) = self.pending.pop_front() { - self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64); - self.spawn_multiproof_task(input); - } - } - /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage /// multiproof, and dispatching to `spawn_multiproof` otherwise. fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) { @@ -510,6 +475,12 @@ impl MultiproofManager { self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); } + /// Signals that a multiproof calculation has finished. + fn on_calculation_complete(&mut self) { + self.inflight = self.inflight.saturating_sub(1); + self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); + } + /// Spawns a single multiproof calculation task. fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput) { let MultiproofInput { @@ -606,8 +577,6 @@ impl MultiproofManager { pub(crate) struct MultiProofTaskMetrics { /// Histogram of inflight multiproofs. pub inflight_multiproofs_histogram: Histogram, - /// Histogram of pending multiproofs. - pub pending_multiproofs_histogram: Histogram, /// Histogram of the number of prefetch proof target accounts. pub prefetch_proof_targets_accounts_histogram: Histogram, @@ -657,8 +626,7 @@ pub(crate) struct MultiProofTaskMetrics { #[derive(Debug)] pub(super) struct MultiProofTask { /// The size of proof targets chunk to spawn in one calculation. - /// - /// If [`None`], then chunking is disabled. + /// If None, chunking is disabled and all targets are processed in a single proof. chunk_size: Option, /// Task configuration. config: MultiProofConfig, @@ -738,10 +706,9 @@ impl MultiProofTask { // Process proof targets in chunks. let mut chunks = 0; - let should_chunk = !self.multiproof_manager.is_full(); let mut spawn = |proof_targets| { - self.multiproof_manager.spawn_or_queue( + self.multiproof_manager.spawn( MultiproofInput { config: self.config.clone(), source: None, @@ -756,7 +723,7 @@ impl MultiProofTask { chunks += 1; }; - if should_chunk && let Some(chunk_size) = self.chunk_size { + if let Some(chunk_size) = self.chunk_size { for proof_targets_chunk in proof_targets.chunks(chunk_size) { spawn(proof_targets_chunk); } @@ -873,7 +840,6 @@ impl MultiProofTask { // Process state updates in chunks. let mut chunks = 0; - let should_chunk = !self.multiproof_manager.is_full(); let mut spawned_proof_targets = MultiProofTargets::default(); @@ -885,7 +851,7 @@ impl MultiProofTask { ); spawned_proof_targets.extend_ref(&proof_targets); - self.multiproof_manager.spawn_or_queue( + self.multiproof_manager.spawn( MultiproofInput { config: self.config.clone(), source: Some(source), @@ -901,7 +867,7 @@ impl MultiProofTask { chunks += 1; }; - if should_chunk && let Some(chunk_size) = self.chunk_size { + if let Some(chunk_size) = self.chunk_size { for chunk in not_fetched_state_update.chunks(chunk_size) { spawn(chunk); } @@ -954,7 +920,7 @@ impl MultiProofTask { /// so that the proofs for accounts and storage slots that were already fetched are not /// requested again. /// 2. Using the proof targets, a new multiproof is calculated using - /// [`MultiproofManager::spawn_or_queue`]. + /// [`MultiproofManager::spawn`]. /// * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is /// sent back to this task along with the original state update. /// * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`] From 20e4ec02fe15e7cf9308cfeed49ca9cfdb580549 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Wed, 22 Oct 2025 13:21:01 +0000 Subject: [PATCH 08/20] feat: add worker availability tracking to multiproof manager - Introduced a counter for available workers to manage task processing. - Updated multiproof spawning logic to utilize worker availability for chunking. - Enhanced worker decrement/increment logic during task execution. --- .../tree/src/tree/payload_processor/mod.rs | 2 + .../src/tree/payload_processor/multiproof.rs | 47 +++++++++++++++++-- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 8ab186dea5b..c4bb9e6cca4 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -210,6 +210,7 @@ where ); let storage_worker_count = config.storage_worker_count(); let account_worker_count = config.account_worker_count(); + let worker_count = storage_worker_count + account_worker_count; let proof_handle = ProofWorkerHandle::new( self.executor.handle().clone(), consistent_view, @@ -222,6 +223,7 @@ where state_root_config, self.executor.clone(), proof_handle.clone(), + worker_count, to_sparse_trie, config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()), ); diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 3c6c1a92765..05ccf9ef978 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -27,6 +27,7 @@ use std::{ collections::BTreeMap, ops::DerefMut, sync::{ + atomic::{AtomicUsize, Ordering}, mpsc::{channel, Receiver, Sender}, Arc, }, @@ -353,6 +354,9 @@ pub struct MultiproofManager { /// a big account change into different chunks, which may repeatedly /// revisit missed leaves. missed_leaves_storage_roots: Arc>, + /// Counter tracking available workers. Workers decrement when starting work, + /// increment when finishing. Used to determine whether to chunk multiproofs. + available_workers: Arc, /// Metrics metrics: MultiProofTaskMetrics, } @@ -363,6 +367,7 @@ impl MultiproofManager { executor: WorkloadExecutor, metrics: MultiProofTaskMetrics, proof_worker_handle: ProofWorkerHandle, + worker_count: usize, ) -> Self { Self { inflight: 0, @@ -370,9 +375,15 @@ impl MultiproofManager { metrics, proof_worker_handle, missed_leaves_storage_roots: Default::default(), + available_workers: Arc::new(AtomicUsize::new(worker_count)), } } + /// Returns true if there are available workers to process tasks. + fn has_available_workers(&self) -> bool { + self.available_workers.load(Ordering::Relaxed) > 0 + } + /// Spawns a new multiproof calculation. fn spawn(&mut self, input: PendingMultiproofTask) { // If there are no proof targets, we can just send an empty multiproof back immediately @@ -416,8 +427,14 @@ impl MultiproofManager { let storage_proof_worker_handle = self.proof_worker_handle.clone(); let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone(); + let available_workers = self.available_workers.clone(); self.executor.spawn_blocking(move || { + // Try to reserve a worker slot without letting the counter underflow. + let decremented = available_workers + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) + .is_ok(); + let storage_targets = proof_targets.len(); trace!( @@ -469,6 +486,11 @@ impl MultiproofManager { .send(MultiProofMessage::ProofCalculationError(error.into())); } } + + // Increment at the end - worker is now available again + if decremented { + available_workers.fetch_add(1, Ordering::Relaxed); + } }); self.inflight += 1; @@ -494,8 +516,14 @@ impl MultiproofManager { } = multiproof_input; let account_proof_worker_handle = self.proof_worker_handle.clone(); let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone(); + let available_workers = self.available_workers.clone(); self.executor.spawn_blocking(move || { + // Try to reserve a worker slot without letting the counter underflow. + let decremented = available_workers + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) + .is_ok(); + let account_targets = proof_targets.len(); let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::(); @@ -565,6 +593,11 @@ impl MultiproofManager { .send(MultiProofMessage::ProofCalculationError(error.into())); } } + + // Increment at the end - worker is now available again + if decremented { + available_workers.fetch_add(1, Ordering::Relaxed); + } }); self.inflight += 1; @@ -654,6 +687,7 @@ impl MultiProofTask { config: MultiProofConfig, executor: WorkloadExecutor, proof_worker_handle: ProofWorkerHandle, + worker_count: usize, to_sparse_trie: Sender, chunk_size: Option, ) -> Self { @@ -673,6 +707,7 @@ impl MultiProofTask { executor, metrics.clone(), proof_worker_handle, + worker_count, ), metrics, } @@ -707,6 +742,9 @@ impl MultiProofTask { // Process proof targets in chunks. let mut chunks = 0; + // Only chunk if workers are available to take advantage of parallelism. + let should_chunk = self.multiproof_manager.has_available_workers(); + let mut spawn = |proof_targets| { self.multiproof_manager.spawn( MultiproofInput { @@ -723,7 +761,7 @@ impl MultiProofTask { chunks += 1; }; - if let Some(chunk_size) = self.chunk_size { + if should_chunk && let Some(chunk_size) = self.chunk_size { for proof_targets_chunk in proof_targets.chunks(chunk_size) { spawn(proof_targets_chunk); } @@ -843,6 +881,9 @@ impl MultiProofTask { let mut spawned_proof_targets = MultiProofTargets::default(); + // Only chunk if workers are available to take advantage of parallelism. + let should_chunk = self.multiproof_manager.has_available_workers(); + let mut spawn = |hashed_state_update| { let proof_targets = get_proof_targets( &hashed_state_update, @@ -867,7 +908,7 @@ impl MultiProofTask { chunks += 1; }; - if let Some(chunk_size) = self.chunk_size { + if should_chunk && let Some(chunk_size) = self.chunk_size { for chunk in not_fetched_state_update.chunks(chunk_size) { spawn(chunk); } @@ -1205,7 +1246,7 @@ mod tests { ProofWorkerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1); let channel = channel(); - MultiProofTask::new(config, executor, proof_handle, channel.0, Some(1)) + MultiProofTask::new(config, executor, proof_handle, 1, channel.0, Some(1)) } #[test] From 6ca5bd76e245dbd09bdc88a05e5e5e7cd8ea1f5c Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Wed, 22 Oct 2025 13:24:40 +0000 Subject: [PATCH 09/20] fmt --- .../engine/tree/src/tree/payload_processor/multiproof.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 05ccf9ef978..dad56c18217 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -432,7 +432,9 @@ impl MultiproofManager { self.executor.spawn_blocking(move || { // Try to reserve a worker slot without letting the counter underflow. let decremented = available_workers - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { + current.checked_sub(1) + }) .is_ok(); let storage_targets = proof_targets.len(); @@ -521,7 +523,9 @@ impl MultiproofManager { self.executor.spawn_blocking(move || { // Try to reserve a worker slot without letting the counter underflow. let decremented = available_workers - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { + current.checked_sub(1) + }) .is_ok(); let account_targets = proof_targets.len(); From 4d380549b9c92d05bb20569097896714cfaf01f6 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Wed, 22 Oct 2025 13:28:07 +0000 Subject: [PATCH 10/20] rm comment --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index dad56c18217..85138454195 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -430,7 +430,6 @@ impl MultiproofManager { let available_workers = self.available_workers.clone(); self.executor.spawn_blocking(move || { - // Try to reserve a worker slot without letting the counter underflow. let decremented = available_workers .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { current.checked_sub(1) @@ -521,7 +520,6 @@ impl MultiproofManager { let available_workers = self.available_workers.clone(); self.executor.spawn_blocking(move || { - // Try to reserve a worker slot without letting the counter underflow. let decremented = available_workers .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { current.checked_sub(1) From 562934694739a7ec1098c535e1d94ad644776fb5 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Wed, 22 Oct 2025 13:28:53 +0000 Subject: [PATCH 11/20] comment --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 85138454195..4d5a9a11709 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -596,7 +596,7 @@ impl MultiproofManager { } } - // Increment at the end - worker is now available again + // Increment available workers after no more calculations are expected. if decremented { available_workers.fetch_add(1, Ordering::Relaxed); } From d78ccdee2beadc9137180d4fad8434e90b84e029 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 04:11:24 +0000 Subject: [PATCH 12/20] rebase --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 4d5a9a11709..fccc3bda5f9 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -35,6 +35,7 @@ use std::{ }; use tracing::{debug, error, instrument, trace}; + /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. #[derive(Default, Debug)] From 1003fb8d2bbaa44694cb8f4727f0eb42cd789d35 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Wed, 22 Oct 2025 13:52:26 +0000 Subject: [PATCH 13/20] refactor: remove unused worker count and availability tracking from multiproof manager - Eliminated the worker count variable and associated logic from the multiproof manager. - Updated multiproof spawning to check for available account workers instead of a general worker count. - Cleaned up related comments and code for clarity and maintainability. --- .../tree/src/tree/payload_processor/mod.rs | 2 - .../src/tree/payload_processor/multiproof.rs | 50 ++--------- crates/trie/parallel/src/proof_task.rs | 87 +++++++++++++++++-- 3 files changed, 89 insertions(+), 50 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index c4bb9e6cca4..8ab186dea5b 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -210,7 +210,6 @@ where ); let storage_worker_count = config.storage_worker_count(); let account_worker_count = config.account_worker_count(); - let worker_count = storage_worker_count + account_worker_count; let proof_handle = ProofWorkerHandle::new( self.executor.handle().clone(), consistent_view, @@ -223,7 +222,6 @@ where state_root_config, self.executor.clone(), proof_handle.clone(), - worker_count, to_sparse_trie, config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()), ); diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index fccc3bda5f9..235dd9d58cc 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -27,7 +27,6 @@ use std::{ collections::BTreeMap, ops::DerefMut, sync::{ - atomic::{AtomicUsize, Ordering}, mpsc::{channel, Receiver, Sender}, Arc, }, @@ -35,7 +34,6 @@ use std::{ }; use tracing::{debug, error, instrument, trace}; - /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. #[derive(Default, Debug)] @@ -355,9 +353,6 @@ pub struct MultiproofManager { /// a big account change into different chunks, which may repeatedly /// revisit missed leaves. missed_leaves_storage_roots: Arc>, - /// Counter tracking available workers. Workers decrement when starting work, - /// increment when finishing. Used to determine whether to chunk multiproofs. - available_workers: Arc, /// Metrics metrics: MultiProofTaskMetrics, } @@ -368,7 +363,6 @@ impl MultiproofManager { executor: WorkloadExecutor, metrics: MultiProofTaskMetrics, proof_worker_handle: ProofWorkerHandle, - worker_count: usize, ) -> Self { Self { inflight: 0, @@ -376,15 +370,9 @@ impl MultiproofManager { metrics, proof_worker_handle, missed_leaves_storage_roots: Default::default(), - available_workers: Arc::new(AtomicUsize::new(worker_count)), } } - /// Returns true if there are available workers to process tasks. - fn has_available_workers(&self) -> bool { - self.available_workers.load(Ordering::Relaxed) > 0 - } - /// Spawns a new multiproof calculation. fn spawn(&mut self, input: PendingMultiproofTask) { // If there are no proof targets, we can just send an empty multiproof back immediately @@ -428,15 +416,8 @@ impl MultiproofManager { let storage_proof_worker_handle = self.proof_worker_handle.clone(); let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone(); - let available_workers = self.available_workers.clone(); self.executor.spawn_blocking(move || { - let decremented = available_workers - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - current.checked_sub(1) - }) - .is_ok(); - let storage_targets = proof_targets.len(); trace!( @@ -488,11 +469,6 @@ impl MultiproofManager { .send(MultiProofMessage::ProofCalculationError(error.into())); } } - - // Increment at the end - worker is now available again - if decremented { - available_workers.fetch_add(1, Ordering::Relaxed); - } }); self.inflight += 1; @@ -518,15 +494,8 @@ impl MultiproofManager { } = multiproof_input; let account_proof_worker_handle = self.proof_worker_handle.clone(); let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone(); - let available_workers = self.available_workers.clone(); self.executor.spawn_blocking(move || { - let decremented = available_workers - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - current.checked_sub(1) - }) - .is_ok(); - let account_targets = proof_targets.len(); let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::(); @@ -596,11 +565,6 @@ impl MultiproofManager { .send(MultiProofMessage::ProofCalculationError(error.into())); } } - - // Increment available workers after no more calculations are expected. - if decremented { - available_workers.fetch_add(1, Ordering::Relaxed); - } }); self.inflight += 1; @@ -690,7 +654,6 @@ impl MultiProofTask { config: MultiProofConfig, executor: WorkloadExecutor, proof_worker_handle: ProofWorkerHandle, - worker_count: usize, to_sparse_trie: Sender, chunk_size: Option, ) -> Self { @@ -710,7 +673,6 @@ impl MultiProofTask { executor, metrics.clone(), proof_worker_handle, - worker_count, ), metrics, } @@ -745,8 +707,9 @@ impl MultiProofTask { // Process proof targets in chunks. let mut chunks = 0; - // Only chunk if workers are available to take advantage of parallelism. - let should_chunk = self.multiproof_manager.has_available_workers(); + // Only chunk if account workers are available to take advantage of parallelism. + let should_chunk = + self.multiproof_manager.proof_worker_handle.has_available_account_workers(); let mut spawn = |proof_targets| { self.multiproof_manager.spawn( @@ -884,8 +847,9 @@ impl MultiProofTask { let mut spawned_proof_targets = MultiProofTargets::default(); - // Only chunk if workers are available to take advantage of parallelism. - let should_chunk = self.multiproof_manager.has_available_workers(); + // Only chunk if account workers are available to take advantage of parallelism. + let should_chunk = + self.multiproof_manager.proof_worker_handle.has_available_account_workers(); let mut spawn = |hashed_state_update| { let proof_targets = get_proof_targets( @@ -1249,7 +1213,7 @@ mod tests { ProofWorkerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1); let channel = channel(); - MultiProofTask::new(config, executor, proof_handle, 1, channel.0, Some(1)) + MultiProofTask::new(config, executor, proof_handle, channel.0, Some(1)) } #[test] diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 6525500a2a2..0517141b4dd 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -51,6 +51,7 @@ use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory}; use std::{ sync::{ + atomic::{AtomicUsize, Ordering}, mpsc::{channel, Receiver, Sender}, Arc, }, @@ -116,6 +117,7 @@ fn storage_worker_loop( task_ctx: ProofTaskCtx, work_rx: CrossbeamReceiver, worker_id: usize, + available_workers: Arc, #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, ) where Factory: DatabaseProviderFactory, @@ -145,6 +147,12 @@ fn storage_worker_loop( let mut storage_nodes_processed = 0u64; while let Ok(job) = work_rx.recv() { + // Reserve a worker slot with saturating decrement to prevent underflow. + // Only decrement if counter is above zero. + let decremented = available_workers + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) + .is_ok(); + match job { StorageWorkerJob::StorageProof { input, result_sender } => { let hashed_address = input.hashed_address; @@ -186,6 +194,12 @@ fn storage_worker_loop( total_processed = storage_proofs_processed, "Storage proof completed" ); + + // Release worker slot only if we successfully reserved it. + // Prevents counter from wrapping to usize::MAX on underflow. + if decremented { + available_workers.fetch_add(1, Ordering::Relaxed); + } } StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => { @@ -224,6 +238,12 @@ fn storage_worker_loop( total_processed = storage_nodes_processed, "Blinded storage node completed" ); + + // Release worker slot only if we successfully reserved it. + // Prevents counter from wrapping to usize::MAX on underflow. + if decremented { + available_workers.fetch_add(1, Ordering::Relaxed); + } } } } @@ -246,9 +266,11 @@ fn storage_worker_loop( /// /// Each worker: /// 1. Receives `AccountWorkerJob` from crossbeam unbounded channel -/// 2. Computes result using its dedicated long-lived transaction -/// 3. Sends result directly to original caller via `std::mpsc` -/// 4. Repeats until channel closes (graceful shutdown) +/// 2. Decrements availability counter to mark itself as busy +/// 3. Computes result using its dedicated long-lived transaction +/// 4. Sends result directly to original caller via `std::mpsc` +/// 5. Increments availability counter to mark itself as available +/// 6. Repeats until channel closes (graceful shutdown) /// /// # Transaction Reuse /// @@ -269,6 +291,7 @@ fn account_worker_loop( work_rx: CrossbeamReceiver, storage_work_tx: CrossbeamSender, worker_id: usize, + available_workers: Arc, #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, ) where Factory: DatabaseProviderFactory, @@ -298,6 +321,12 @@ fn account_worker_loop( let mut account_nodes_processed = 0u64; while let Ok(job) = work_rx.recv() { + // Reserve a worker slot with saturating decrement to prevent underflow. + // Only decrement if counter is above zero. + let decremented = available_workers + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) + .is_ok(); + match job { AccountWorkerJob::AccountMultiproof { mut input, result_sender } => { let span = tracing::debug_span!( @@ -381,6 +410,12 @@ fn account_worker_loop( "Account multiproof completed" ); drop(_span_guard); + + // Release worker slot only if we successfully reserved it. + // Prevents counter from wrapping to usize::MAX on underflow. + if decremented { + available_workers.fetch_add(1, Ordering::Relaxed); + } } AccountWorkerJob::BlindedAccountNode { path, result_sender } => { @@ -420,6 +455,12 @@ fn account_worker_loop( "Blinded account node completed" ); drop(_span_guard); + + // Release worker slot only if we successfully reserved it. + // Prevents counter from wrapping to usize::MAX on underflow. + if decremented { + available_workers.fetch_add(1, Ordering::Relaxed); + } } } } @@ -866,6 +907,12 @@ pub struct ProofWorkerHandle { storage_work_tx: CrossbeamSender, /// Direct sender to account worker pool account_work_tx: CrossbeamSender, + /// Counter tracking available storage workers. Workers decrement when starting work, + /// increment when finishing. Used to determine whether to chunk multiproofs. + storage_available_workers: Arc, + /// Counter tracking available account workers. Workers decrement when starting work, + /// increment when finishing. Used to determine whether to chunk multiproofs. + account_available_workers: Arc, } impl ProofWorkerHandle { @@ -893,6 +940,10 @@ impl ProofWorkerHandle { let (storage_work_tx, storage_work_rx) = unbounded::(); let (account_work_tx, account_work_rx) = unbounded::(); + // Initialize availability counters + let storage_available_workers = Arc::new(AtomicUsize::new(storage_worker_count)); + let account_available_workers = Arc::new(AtomicUsize::new(account_worker_count)); + tracing::debug!( target: "trie::proof_task", storage_worker_count, @@ -910,6 +961,7 @@ impl ProofWorkerHandle { let view_clone = view.clone(); let task_ctx_clone = task_ctx.clone(); let work_rx_clone = storage_work_rx.clone(); + let storage_available_workers_clone = storage_available_workers.clone(); executor.spawn_blocking(move || { #[cfg(feature = "metrics")] @@ -921,6 +973,7 @@ impl ProofWorkerHandle { task_ctx_clone, work_rx_clone, worker_id, + storage_available_workers_clone, #[cfg(feature = "metrics")] metrics, ) @@ -946,6 +999,7 @@ impl ProofWorkerHandle { let task_ctx_clone = task_ctx.clone(); let work_rx_clone = account_work_rx.clone(); let storage_work_tx_clone = storage_work_tx.clone(); + let account_available_workers_clone = account_available_workers.clone(); executor.spawn_blocking(move || { #[cfg(feature = "metrics")] @@ -958,6 +1012,7 @@ impl ProofWorkerHandle { work_rx_clone, storage_work_tx_clone, worker_id, + account_available_workers_clone, #[cfg(feature = "metrics")] metrics, ) @@ -972,7 +1027,12 @@ impl ProofWorkerHandle { drop(_guard); - Self::new_handle(storage_work_tx, account_work_tx) + Self::new_handle( + storage_work_tx, + account_work_tx, + storage_available_workers, + account_available_workers, + ) } /// Creates a new [`ProofWorkerHandle`] with direct access to worker pools. @@ -981,8 +1041,25 @@ impl ProofWorkerHandle { const fn new_handle( storage_work_tx: CrossbeamSender, account_work_tx: CrossbeamSender, + storage_available_workers: Arc, + account_available_workers: Arc, ) -> Self { - Self { storage_work_tx, account_work_tx } + Self { + storage_work_tx, + account_work_tx, + storage_available_workers, + account_available_workers, + } + } + + /// Returns true if there are available storage workers to process tasks. + pub fn has_available_storage_workers(&self) -> bool { + self.storage_available_workers.load(Ordering::Relaxed) > 0 + } + + /// Returns true if there are available account workers to process tasks. + pub fn has_available_account_workers(&self) -> bool { + self.account_available_workers.load(Ordering::Relaxed) > 0 } /// Dispatch a storage proof computation to storage worker pool From a1b36ea4273de18fb3e947794d76bc0d47b5e072 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Wed, 22 Oct 2025 13:56:55 +0000 Subject: [PATCH 14/20] update logic to or --- .../tree/src/tree/payload_processor/multiproof.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 235dd9d58cc..79b424e103a 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -707,9 +707,10 @@ impl MultiProofTask { // Process proof targets in chunks. let mut chunks = 0; - // Only chunk if account workers are available to take advantage of parallelism. + // Only chunk if account or storage workers are available to take advantage of parallelism. let should_chunk = - self.multiproof_manager.proof_worker_handle.has_available_account_workers(); + self.multiproof_manager.proof_worker_handle.has_available_account_workers() || + self.multiproof_manager.proof_worker_handle.has_available_storage_workers(); let mut spawn = |proof_targets| { self.multiproof_manager.spawn( @@ -847,9 +848,10 @@ impl MultiProofTask { let mut spawned_proof_targets = MultiProofTargets::default(); - // Only chunk if account workers are available to take advantage of parallelism. + // Only chunk if account or storage workers are available to take advantage of parallelism. let should_chunk = - self.multiproof_manager.proof_worker_handle.has_available_account_workers(); + self.multiproof_manager.proof_worker_handle.has_available_account_workers() || + self.multiproof_manager.proof_worker_handle.has_available_storage_workers(); let mut spawn = |hashed_state_update| { let proof_targets = get_proof_targets( From 452e530d378a50d206ae81abd76c539e0123a4fc Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 02:10:21 +0000 Subject: [PATCH 15/20] refactor: improve worker availability tracking in proof tasks - We no longer seed the counters with the worker count. In ProofWorkerHandle::new we switched to zero-initializing both atomics --- crates/trie/parallel/src/proof_task.rs | 65 ++++++++++---------------- 1 file changed, 25 insertions(+), 40 deletions(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 0517141b4dd..fe7cfc8f085 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -146,12 +146,12 @@ fn storage_worker_loop( let mut storage_proofs_processed = 0u64; let mut storage_nodes_processed = 0u64; + // Initially mark this worker as available. + available_workers.fetch_add(1, Ordering::Relaxed); + while let Ok(job) = work_rx.recv() { - // Reserve a worker slot with saturating decrement to prevent underflow. - // Only decrement if counter is above zero. - let decremented = available_workers - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) - .is_ok(); + // Mark worker as busy. + available_workers.fetch_sub(1, Ordering::Relaxed); match job { StorageWorkerJob::StorageProof { input, result_sender } => { @@ -195,11 +195,8 @@ fn storage_worker_loop( "Storage proof completed" ); - // Release worker slot only if we successfully reserved it. - // Prevents counter from wrapping to usize::MAX on underflow. - if decremented { - available_workers.fetch_add(1, Ordering::Relaxed); - } + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); } StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => { @@ -239,11 +236,8 @@ fn storage_worker_loop( "Blinded storage node completed" ); - // Release worker slot only if we successfully reserved it. - // Prevents counter from wrapping to usize::MAX on underflow. - if decremented { - available_workers.fetch_add(1, Ordering::Relaxed); - } + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); } } } @@ -264,13 +258,9 @@ fn storage_worker_loop( /// /// # Lifecycle /// -/// Each worker: -/// 1. Receives `AccountWorkerJob` from crossbeam unbounded channel -/// 2. Decrements availability counter to mark itself as busy -/// 3. Computes result using its dedicated long-lived transaction -/// 4. Sends result directly to original caller via `std::mpsc` -/// 5. Increments availability counter to mark itself as available -/// 6. Repeats until channel closes (graceful shutdown) +/// Each worker initializes its providers, advertises availability, then loops: +/// receive an account job, mark busy, process the work, respond, and mark available again. +/// The loop ends gracefully once the channel closes. /// /// # Transaction Reuse /// @@ -320,12 +310,12 @@ fn account_worker_loop( let mut account_proofs_processed = 0u64; let mut account_nodes_processed = 0u64; + // Count this worker as available only after successful initialization. + available_workers.fetch_add(1, Ordering::Relaxed); + while let Ok(job) = work_rx.recv() { - // Reserve a worker slot with saturating decrement to prevent underflow. - // Only decrement if counter is above zero. - let decremented = available_workers - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1)) - .is_ok(); + // Mark worker as busy. + available_workers.fetch_sub(1, Ordering::Relaxed); match job { AccountWorkerJob::AccountMultiproof { mut input, result_sender } => { @@ -411,11 +401,8 @@ fn account_worker_loop( ); drop(_span_guard); - // Release worker slot only if we successfully reserved it. - // Prevents counter from wrapping to usize::MAX on underflow. - if decremented { - available_workers.fetch_add(1, Ordering::Relaxed); - } + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); } AccountWorkerJob::BlindedAccountNode { path, result_sender } => { @@ -456,11 +443,8 @@ fn account_worker_loop( ); drop(_span_guard); - // Release worker slot only if we successfully reserved it. - // Prevents counter from wrapping to usize::MAX on underflow. - if decremented { - available_workers.fetch_add(1, Ordering::Relaxed); - } + // Mark worker as available again. + available_workers.fetch_add(1, Ordering::Relaxed); } } } @@ -940,9 +924,10 @@ impl ProofWorkerHandle { let (storage_work_tx, storage_work_rx) = unbounded::(); let (account_work_tx, account_work_rx) = unbounded::(); - // Initialize availability counters - let storage_available_workers = Arc::new(AtomicUsize::new(storage_worker_count)); - let account_available_workers = Arc::new(AtomicUsize::new(account_worker_count)); + // Initialize availability counters at zero. Each worker will increment when it + // successfully initializes, ensuring only healthy workers are counted. + let storage_available_workers = Arc::new(AtomicUsize::new(0)); + let account_available_workers = Arc::new(AtomicUsize::new(0)); tracing::debug!( target: "trie::proof_task", From 2fca4ba235185a0fd8ae33d112d50c3ca38f03c0 Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 02:33:28 +0000 Subject: [PATCH 16/20] fmt --- crates/trie/parallel/src/proof_task.rs | 2 +- docs/vocs/docs/pages/cli/reth/node.mdx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index fe7cfc8f085..adcf9b63bc6 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -904,7 +904,7 @@ impl ProofWorkerHandle { /// /// Returns a handle for submitting proof tasks to the worker pools. /// Workers run until the last handle is dropped. - /// + /// /// # Parameters /// - `executor`: Tokio runtime handle for spawning blocking tasks /// - `view`: Consistent database view for creating transactions diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index dedcd5b385e..3fc6988dc69 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -1009,4 +1009,4 @@ Tracing: Defaults to TRACE if not specified. [default: debug] -``` +``` \ No newline at end of file From 74d8d3c228a98fc30fb5abf62e1752d15a773e6a Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 02:58:15 +0000 Subject: [PATCH 17/20] fmt --- crates/trie/parallel/src/proof_task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index adcf9b63bc6..fe7cfc8f085 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -904,7 +904,7 @@ impl ProofWorkerHandle { /// /// Returns a handle for submitting proof tasks to the worker pools. /// Workers run until the last handle is dropped. - /// + /// /// # Parameters /// - `executor`: Tokio runtime handle for spawning blocking tasks /// - `view`: Consistent database view for creating transactions From 90b73ed566c78dbf489434824d6011d4cdefa33e Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 04:38:40 +0000 Subject: [PATCH 18/20] feat: enhance multiproof metrics tracking - Added histograms for pending storage and account tasks in the MultiproofManager. - Implemented methods in ProofWorkerHandle to retrieve the count of pending storage and account tasks. - Updated multiproof processing logic to record these new metrics. --- .../src/tree/payload_processor/multiproof.rs | 17 +++++++++++++++++ crates/trie/parallel/src/proof_task.rs | 10 ++++++++++ 2 files changed, 27 insertions(+) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 79b424e103a..44fe5fe896f 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -473,6 +473,12 @@ impl MultiproofManager { self.inflight += 1; self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); + self.metrics + .pending_storage_tasks_histogram + .record(self.proof_worker_handle.pending_storage_tasks() as f64); + self.metrics + .pending_account_tasks_histogram + .record(self.proof_worker_handle.pending_account_tasks() as f64); } /// Signals that a multiproof calculation has finished. @@ -569,6 +575,12 @@ impl MultiproofManager { self.inflight += 1; self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); + self.metrics + .pending_storage_tasks_histogram + .record(self.proof_worker_handle.pending_storage_tasks() as f64); + self.metrics + .pending_account_tasks_histogram + .record(self.proof_worker_handle.pending_account_tasks() as f64); } } @@ -612,6 +624,11 @@ pub(crate) struct MultiProofTaskMetrics { pub first_update_wait_time_histogram: Histogram, /// Total time spent waiting for the last proof result. pub last_proof_wait_time_histogram: Histogram, + + /// Histogram of pending storage worker tasks in the queue. + pub pending_storage_tasks_histogram: Histogram, + /// Histogram of pending account worker tasks in the queue. + pub pending_account_tasks_histogram: Histogram, } /// Standalone task that receives a transaction state stream and updates relevant diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index fe7cfc8f085..18e93dc26a4 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -1047,6 +1047,16 @@ impl ProofWorkerHandle { self.account_available_workers.load(Ordering::Relaxed) > 0 } + /// Returns the number of pending storage tasks in the queue. + pub fn pending_storage_tasks(&self) -> usize { + self.storage_work_tx.len() + } + + /// Returns the number of pending account tasks in the queue. + pub fn pending_account_tasks(&self) -> usize { + self.account_work_tx.len() + } + /// Dispatch a storage proof computation to storage worker pool pub fn dispatch_storage_proof( &self, From feb317e06cf2af210d76c5c2fded1068d79cd1da Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 04:41:13 +0000 Subject: [PATCH 19/20] feat: record additional multiproof metrics - Added recording of pending storage and account tasks in the MultiproofManager upon calculation completion. - Enhanced metrics tracking to provide better insights into task management. --- crates/engine/tree/src/tree/payload_processor/multiproof.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 44fe5fe896f..0782f3b90e0 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -485,6 +485,12 @@ impl MultiproofManager { fn on_calculation_complete(&mut self) { self.inflight = self.inflight.saturating_sub(1); self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); + self.metrics + .pending_storage_tasks_histogram + .record(self.proof_worker_handle.pending_storage_tasks() as f64); + self.metrics + .pending_account_tasks_histogram + .record(self.proof_worker_handle.pending_account_tasks() as f64); } /// Spawns a single multiproof calculation task. From 2e87b731110b2d53fa0f56693fee84b86a3eafaa Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Thu, 23 Oct 2025 04:53:52 +0000 Subject: [PATCH 20/20] fix naming --- .../src/tree/payload_processor/multiproof.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 0782f3b90e0..698198c5694 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -474,10 +474,10 @@ impl MultiproofManager { self.inflight += 1; self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); self.metrics - .pending_storage_tasks_histogram + .pending_storage_multiproofs_histogram .record(self.proof_worker_handle.pending_storage_tasks() as f64); self.metrics - .pending_account_tasks_histogram + .pending_account_multiproofs_histogram .record(self.proof_worker_handle.pending_account_tasks() as f64); } @@ -486,10 +486,10 @@ impl MultiproofManager { self.inflight = self.inflight.saturating_sub(1); self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); self.metrics - .pending_storage_tasks_histogram + .pending_storage_multiproofs_histogram .record(self.proof_worker_handle.pending_storage_tasks() as f64); self.metrics - .pending_account_tasks_histogram + .pending_account_multiproofs_histogram .record(self.proof_worker_handle.pending_account_tasks() as f64); } @@ -582,10 +582,10 @@ impl MultiproofManager { self.inflight += 1; self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64); self.metrics - .pending_storage_tasks_histogram + .pending_storage_multiproofs_histogram .record(self.proof_worker_handle.pending_storage_tasks() as f64); self.metrics - .pending_account_tasks_histogram + .pending_account_multiproofs_histogram .record(self.proof_worker_handle.pending_account_tasks() as f64); } } @@ -595,6 +595,10 @@ impl MultiproofManager { pub(crate) struct MultiProofTaskMetrics { /// Histogram of inflight multiproofs. pub inflight_multiproofs_histogram: Histogram, + /// Histogram of pending storage multiproofs in the queue. + pub pending_storage_multiproofs_histogram: Histogram, + /// Histogram of pending account multiproofs in the queue. + pub pending_account_multiproofs_histogram: Histogram, /// Histogram of the number of prefetch proof target accounts. pub prefetch_proof_targets_accounts_histogram: Histogram, @@ -630,11 +634,6 @@ pub(crate) struct MultiProofTaskMetrics { pub first_update_wait_time_histogram: Histogram, /// Total time spent waiting for the last proof result. pub last_proof_wait_time_histogram: Histogram, - - /// Histogram of pending storage worker tasks in the queue. - pub pending_storage_tasks_histogram: Histogram, - /// Histogram of pending account worker tasks in the queue. - pub pending_account_tasks_histogram: Histogram, } /// Standalone task that receives a transaction state stream and updates relevant