Skip to content

Commit 1003fb8

Browse files
committed
refactor: remove unused worker count and availability tracking from multiproof manager
- Eliminated the worker count variable and associated logic from the multiproof manager. - Updated multiproof spawning to check for available account workers instead of a general worker count. - Cleaned up related comments and code for clarity and maintainability.
1 parent d78ccde commit 1003fb8

File tree

3 files changed

+89
-50
lines changed

3 files changed

+89
-50
lines changed

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,6 @@ where
210210
);
211211
let storage_worker_count = config.storage_worker_count();
212212
let account_worker_count = config.account_worker_count();
213-
let worker_count = storage_worker_count + account_worker_count;
214213
let proof_handle = ProofWorkerHandle::new(
215214
self.executor.handle().clone(),
216215
consistent_view,
@@ -223,7 +222,6 @@ where
223222
state_root_config,
224223
self.executor.clone(),
225224
proof_handle.clone(),
226-
worker_count,
227225
to_sparse_trie,
228226
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
229227
);

crates/engine/tree/src/tree/payload_processor/multiproof.rs

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,13 @@ use std::{
2727
collections::BTreeMap,
2828
ops::DerefMut,
2929
sync::{
30-
atomic::{AtomicUsize, Ordering},
3130
mpsc::{channel, Receiver, Sender},
3231
Arc,
3332
},
3433
time::{Duration, Instant},
3534
};
3635
use tracing::{debug, error, instrument, trace};
3736

38-
3937
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
4038
/// state.
4139
#[derive(Default, Debug)]
@@ -355,9 +353,6 @@ pub struct MultiproofManager {
355353
/// a big account change into different chunks, which may repeatedly
356354
/// revisit missed leaves.
357355
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
358-
/// Counter tracking available workers. Workers decrement when starting work,
359-
/// increment when finishing. Used to determine whether to chunk multiproofs.
360-
available_workers: Arc<AtomicUsize>,
361356
/// Metrics
362357
metrics: MultiProofTaskMetrics,
363358
}
@@ -368,23 +363,16 @@ impl MultiproofManager {
368363
executor: WorkloadExecutor,
369364
metrics: MultiProofTaskMetrics,
370365
proof_worker_handle: ProofWorkerHandle,
371-
worker_count: usize,
372366
) -> Self {
373367
Self {
374368
inflight: 0,
375369
executor,
376370
metrics,
377371
proof_worker_handle,
378372
missed_leaves_storage_roots: Default::default(),
379-
available_workers: Arc::new(AtomicUsize::new(worker_count)),
380373
}
381374
}
382375

383-
/// Returns true if there are available workers to process tasks.
384-
fn has_available_workers(&self) -> bool {
385-
self.available_workers.load(Ordering::Relaxed) > 0
386-
}
387-
388376
/// Spawns a new multiproof calculation.
389377
fn spawn(&mut self, input: PendingMultiproofTask) {
390378
// If there are no proof targets, we can just send an empty multiproof back immediately
@@ -428,15 +416,8 @@ impl MultiproofManager {
428416

429417
let storage_proof_worker_handle = self.proof_worker_handle.clone();
430418
let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
431-
let available_workers = self.available_workers.clone();
432419

433420
self.executor.spawn_blocking(move || {
434-
let decremented = available_workers
435-
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
436-
current.checked_sub(1)
437-
})
438-
.is_ok();
439-
440421
let storage_targets = proof_targets.len();
441422

442423
trace!(
@@ -488,11 +469,6 @@ impl MultiproofManager {
488469
.send(MultiProofMessage::ProofCalculationError(error.into()));
489470
}
490471
}
491-
492-
// Increment at the end - worker is now available again
493-
if decremented {
494-
available_workers.fetch_add(1, Ordering::Relaxed);
495-
}
496472
});
497473

498474
self.inflight += 1;
@@ -518,15 +494,8 @@ impl MultiproofManager {
518494
} = multiproof_input;
519495
let account_proof_worker_handle = self.proof_worker_handle.clone();
520496
let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
521-
let available_workers = self.available_workers.clone();
522497

523498
self.executor.spawn_blocking(move || {
524-
let decremented = available_workers
525-
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
526-
current.checked_sub(1)
527-
})
528-
.is_ok();
529-
530499
let account_targets = proof_targets.len();
531500
let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
532501

@@ -596,11 +565,6 @@ impl MultiproofManager {
596565
.send(MultiProofMessage::ProofCalculationError(error.into()));
597566
}
598567
}
599-
600-
// Increment available workers after no more calculations are expected.
601-
if decremented {
602-
available_workers.fetch_add(1, Ordering::Relaxed);
603-
}
604568
});
605569

606570
self.inflight += 1;
@@ -690,7 +654,6 @@ impl MultiProofTask {
690654
config: MultiProofConfig,
691655
executor: WorkloadExecutor,
692656
proof_worker_handle: ProofWorkerHandle,
693-
worker_count: usize,
694657
to_sparse_trie: Sender<SparseTrieUpdate>,
695658
chunk_size: Option<usize>,
696659
) -> Self {
@@ -710,7 +673,6 @@ impl MultiProofTask {
710673
executor,
711674
metrics.clone(),
712675
proof_worker_handle,
713-
worker_count,
714676
),
715677
metrics,
716678
}
@@ -745,8 +707,9 @@ impl MultiProofTask {
745707
// Process proof targets in chunks.
746708
let mut chunks = 0;
747709

748-
// Only chunk if workers are available to take advantage of parallelism.
749-
let should_chunk = self.multiproof_manager.has_available_workers();
710+
// Only chunk if account workers are available to take advantage of parallelism.
711+
let should_chunk =
712+
self.multiproof_manager.proof_worker_handle.has_available_account_workers();
750713

751714
let mut spawn = |proof_targets| {
752715
self.multiproof_manager.spawn(
@@ -884,8 +847,9 @@ impl MultiProofTask {
884847

885848
let mut spawned_proof_targets = MultiProofTargets::default();
886849

887-
// Only chunk if workers are available to take advantage of parallelism.
888-
let should_chunk = self.multiproof_manager.has_available_workers();
850+
// Only chunk if account workers are available to take advantage of parallelism.
851+
let should_chunk =
852+
self.multiproof_manager.proof_worker_handle.has_available_account_workers();
889853

890854
let mut spawn = |hashed_state_update| {
891855
let proof_targets = get_proof_targets(
@@ -1249,7 +1213,7 @@ mod tests {
12491213
ProofWorkerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1);
12501214
let channel = channel();
12511215

1252-
MultiProofTask::new(config, executor, proof_handle, 1, channel.0, Some(1))
1216+
MultiProofTask::new(config, executor, proof_handle, channel.0, Some(1))
12531217
}
12541218

12551219
#[test]

crates/trie/parallel/src/proof_task.rs

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
5151
use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
5252
use std::{
5353
sync::{
54+
atomic::{AtomicUsize, Ordering},
5455
mpsc::{channel, Receiver, Sender},
5556
Arc,
5657
},
@@ -116,6 +117,7 @@ fn storage_worker_loop<Factory>(
116117
task_ctx: ProofTaskCtx,
117118
work_rx: CrossbeamReceiver<StorageWorkerJob>,
118119
worker_id: usize,
120+
available_workers: Arc<AtomicUsize>,
119121
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
120122
) where
121123
Factory: DatabaseProviderFactory<Provider: BlockReader>,
@@ -145,6 +147,12 @@ fn storage_worker_loop<Factory>(
145147
let mut storage_nodes_processed = 0u64;
146148

147149
while let Ok(job) = work_rx.recv() {
150+
// Reserve a worker slot with saturating decrement to prevent underflow.
151+
// Only decrement if counter is above zero.
152+
let decremented = available_workers
153+
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1))
154+
.is_ok();
155+
148156
match job {
149157
StorageWorkerJob::StorageProof { input, result_sender } => {
150158
let hashed_address = input.hashed_address;
@@ -186,6 +194,12 @@ fn storage_worker_loop<Factory>(
186194
total_processed = storage_proofs_processed,
187195
"Storage proof completed"
188196
);
197+
198+
// Release worker slot only if we successfully reserved it.
199+
// Prevents counter from wrapping to usize::MAX on underflow.
200+
if decremented {
201+
available_workers.fetch_add(1, Ordering::Relaxed);
202+
}
189203
}
190204

191205
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
@@ -224,6 +238,12 @@ fn storage_worker_loop<Factory>(
224238
total_processed = storage_nodes_processed,
225239
"Blinded storage node completed"
226240
);
241+
242+
// Release worker slot only if we successfully reserved it.
243+
// Prevents counter from wrapping to usize::MAX on underflow.
244+
if decremented {
245+
available_workers.fetch_add(1, Ordering::Relaxed);
246+
}
227247
}
228248
}
229249
}
@@ -246,9 +266,11 @@ fn storage_worker_loop<Factory>(
246266
///
247267
/// Each worker:
248268
/// 1. Receives `AccountWorkerJob` from crossbeam unbounded channel
249-
/// 2. Computes result using its dedicated long-lived transaction
250-
/// 3. Sends result directly to original caller via `std::mpsc`
251-
/// 4. Repeats until channel closes (graceful shutdown)
269+
/// 2. Decrements availability counter to mark itself as busy
270+
/// 3. Computes result using its dedicated long-lived transaction
271+
/// 4. Sends result directly to original caller via `std::mpsc`
272+
/// 5. Increments availability counter to mark itself as available
273+
/// 6. Repeats until channel closes (graceful shutdown)
252274
///
253275
/// # Transaction Reuse
254276
///
@@ -269,6 +291,7 @@ fn account_worker_loop<Factory>(
269291
work_rx: CrossbeamReceiver<AccountWorkerJob>,
270292
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
271293
worker_id: usize,
294+
available_workers: Arc<AtomicUsize>,
272295
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
273296
) where
274297
Factory: DatabaseProviderFactory<Provider: BlockReader>,
@@ -298,6 +321,12 @@ fn account_worker_loop<Factory>(
298321
let mut account_nodes_processed = 0u64;
299322

300323
while let Ok(job) = work_rx.recv() {
324+
// Reserve a worker slot with saturating decrement to prevent underflow.
325+
// Only decrement if counter is above zero.
326+
let decremented = available_workers
327+
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1))
328+
.is_ok();
329+
301330
match job {
302331
AccountWorkerJob::AccountMultiproof { mut input, result_sender } => {
303332
let span = tracing::debug_span!(
@@ -381,6 +410,12 @@ fn account_worker_loop<Factory>(
381410
"Account multiproof completed"
382411
);
383412
drop(_span_guard);
413+
414+
// Release worker slot only if we successfully reserved it.
415+
// Prevents counter from wrapping to usize::MAX on underflow.
416+
if decremented {
417+
available_workers.fetch_add(1, Ordering::Relaxed);
418+
}
384419
}
385420

386421
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
@@ -420,6 +455,12 @@ fn account_worker_loop<Factory>(
420455
"Blinded account node completed"
421456
);
422457
drop(_span_guard);
458+
459+
// Release worker slot only if we successfully reserved it.
460+
// Prevents counter from wrapping to usize::MAX on underflow.
461+
if decremented {
462+
available_workers.fetch_add(1, Ordering::Relaxed);
463+
}
423464
}
424465
}
425466
}
@@ -866,6 +907,12 @@ pub struct ProofWorkerHandle {
866907
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
867908
/// Direct sender to account worker pool
868909
account_work_tx: CrossbeamSender<AccountWorkerJob>,
910+
/// Counter tracking available storage workers. Workers decrement when starting work,
911+
/// increment when finishing. Used to determine whether to chunk multiproofs.
912+
storage_available_workers: Arc<AtomicUsize>,
913+
/// Counter tracking available account workers. Workers decrement when starting work,
914+
/// increment when finishing. Used to determine whether to chunk multiproofs.
915+
account_available_workers: Arc<AtomicUsize>,
869916
}
870917

871918
impl ProofWorkerHandle {
@@ -893,6 +940,10 @@ impl ProofWorkerHandle {
893940
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
894941
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
895942

943+
// Initialize availability counters
944+
let storage_available_workers = Arc::new(AtomicUsize::new(storage_worker_count));
945+
let account_available_workers = Arc::new(AtomicUsize::new(account_worker_count));
946+
896947
tracing::debug!(
897948
target: "trie::proof_task",
898949
storage_worker_count,
@@ -910,6 +961,7 @@ impl ProofWorkerHandle {
910961
let view_clone = view.clone();
911962
let task_ctx_clone = task_ctx.clone();
912963
let work_rx_clone = storage_work_rx.clone();
964+
let storage_available_workers_clone = storage_available_workers.clone();
913965

914966
executor.spawn_blocking(move || {
915967
#[cfg(feature = "metrics")]
@@ -921,6 +973,7 @@ impl ProofWorkerHandle {
921973
task_ctx_clone,
922974
work_rx_clone,
923975
worker_id,
976+
storage_available_workers_clone,
924977
#[cfg(feature = "metrics")]
925978
metrics,
926979
)
@@ -946,6 +999,7 @@ impl ProofWorkerHandle {
946999
let task_ctx_clone = task_ctx.clone();
9471000
let work_rx_clone = account_work_rx.clone();
9481001
let storage_work_tx_clone = storage_work_tx.clone();
1002+
let account_available_workers_clone = account_available_workers.clone();
9491003

9501004
executor.spawn_blocking(move || {
9511005
#[cfg(feature = "metrics")]
@@ -958,6 +1012,7 @@ impl ProofWorkerHandle {
9581012
work_rx_clone,
9591013
storage_work_tx_clone,
9601014
worker_id,
1015+
account_available_workers_clone,
9611016
#[cfg(feature = "metrics")]
9621017
metrics,
9631018
)
@@ -972,7 +1027,12 @@ impl ProofWorkerHandle {
9721027

9731028
drop(_guard);
9741029

975-
Self::new_handle(storage_work_tx, account_work_tx)
1030+
Self::new_handle(
1031+
storage_work_tx,
1032+
account_work_tx,
1033+
storage_available_workers,
1034+
account_available_workers,
1035+
)
9761036
}
9771037

9781038
/// Creates a new [`ProofWorkerHandle`] with direct access to worker pools.
@@ -981,8 +1041,25 @@ impl ProofWorkerHandle {
9811041
const fn new_handle(
9821042
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
9831043
account_work_tx: CrossbeamSender<AccountWorkerJob>,
1044+
storage_available_workers: Arc<AtomicUsize>,
1045+
account_available_workers: Arc<AtomicUsize>,
9841046
) -> Self {
985-
Self { storage_work_tx, account_work_tx }
1047+
Self {
1048+
storage_work_tx,
1049+
account_work_tx,
1050+
storage_available_workers,
1051+
account_available_workers,
1052+
}
1053+
}
1054+
1055+
/// Returns true if there are available storage workers to process tasks.
1056+
pub fn has_available_storage_workers(&self) -> bool {
1057+
self.storage_available_workers.load(Ordering::Relaxed) > 0
1058+
}
1059+
1060+
/// Returns true if there are available account workers to process tasks.
1061+
pub fn has_available_account_workers(&self) -> bool {
1062+
self.account_available_workers.load(Ordering::Relaxed) > 0
9861063
}
9871064

9881065
/// Dispatch a storage proof computation to storage worker pool

0 commit comments

Comments
 (0)