Skip to content

Commit 452e530

Browse files
committed
refactor: improve worker availability tracking in proof tasks
- We no longer seed the counters with the worker count. In ProofWorkerHandle::new we switched to zero-initializing both atomics
1 parent a1b36ea commit 452e530

File tree

1 file changed

+25
-40
lines changed

1 file changed

+25
-40
lines changed

crates/trie/parallel/src/proof_task.rs

Lines changed: 25 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,12 @@ fn storage_worker_loop<Factory>(
146146
let mut storage_proofs_processed = 0u64;
147147
let mut storage_nodes_processed = 0u64;
148148

149+
// Initially mark this worker as available.
150+
available_workers.fetch_add(1, Ordering::Relaxed);
151+
149152
while let Ok(job) = work_rx.recv() {
150-
// Reserve a worker slot with saturating decrement to prevent underflow.
151-
// Only decrement if counter is above zero.
152-
let decremented = available_workers
153-
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1))
154-
.is_ok();
153+
// Mark worker as busy.
154+
available_workers.fetch_sub(1, Ordering::Relaxed);
155155

156156
match job {
157157
StorageWorkerJob::StorageProof { input, result_sender } => {
@@ -195,11 +195,8 @@ fn storage_worker_loop<Factory>(
195195
"Storage proof completed"
196196
);
197197

198-
// Release worker slot only if we successfully reserved it.
199-
// Prevents counter from wrapping to usize::MAX on underflow.
200-
if decremented {
201-
available_workers.fetch_add(1, Ordering::Relaxed);
202-
}
198+
// Mark worker as available again.
199+
available_workers.fetch_add(1, Ordering::Relaxed);
203200
}
204201

205202
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
@@ -239,11 +236,8 @@ fn storage_worker_loop<Factory>(
239236
"Blinded storage node completed"
240237
);
241238

242-
// Release worker slot only if we successfully reserved it.
243-
// Prevents counter from wrapping to usize::MAX on underflow.
244-
if decremented {
245-
available_workers.fetch_add(1, Ordering::Relaxed);
246-
}
239+
// Mark worker as available again.
240+
available_workers.fetch_add(1, Ordering::Relaxed);
247241
}
248242
}
249243
}
@@ -264,13 +258,9 @@ fn storage_worker_loop<Factory>(
264258
///
265259
/// # Lifecycle
266260
///
267-
/// Each worker:
268-
/// 1. Receives `AccountWorkerJob` from crossbeam unbounded channel
269-
/// 2. Decrements availability counter to mark itself as busy
270-
/// 3. Computes result using its dedicated long-lived transaction
271-
/// 4. Sends result directly to original caller via `std::mpsc`
272-
/// 5. Increments availability counter to mark itself as available
273-
/// 6. Repeats until channel closes (graceful shutdown)
261+
/// Each worker initializes its providers, advertises availability, then loops:
262+
/// receive an account job, mark busy, process the work, respond, and mark available again.
263+
/// The loop ends gracefully once the channel closes.
274264
///
275265
/// # Transaction Reuse
276266
///
@@ -320,12 +310,12 @@ fn account_worker_loop<Factory>(
320310
let mut account_proofs_processed = 0u64;
321311
let mut account_nodes_processed = 0u64;
322312

313+
// Count this worker as available only after successful initialization.
314+
available_workers.fetch_add(1, Ordering::Relaxed);
315+
323316
while let Ok(job) = work_rx.recv() {
324-
// Reserve a worker slot with saturating decrement to prevent underflow.
325-
// Only decrement if counter is above zero.
326-
let decremented = available_workers
327-
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| current.checked_sub(1))
328-
.is_ok();
317+
// Mark worker as busy.
318+
available_workers.fetch_sub(1, Ordering::Relaxed);
329319

330320
match job {
331321
AccountWorkerJob::AccountMultiproof { mut input, result_sender } => {
@@ -411,11 +401,8 @@ fn account_worker_loop<Factory>(
411401
);
412402
drop(_span_guard);
413403

414-
// Release worker slot only if we successfully reserved it.
415-
// Prevents counter from wrapping to usize::MAX on underflow.
416-
if decremented {
417-
available_workers.fetch_add(1, Ordering::Relaxed);
418-
}
404+
// Mark worker as available again.
405+
available_workers.fetch_add(1, Ordering::Relaxed);
419406
}
420407

421408
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
@@ -456,11 +443,8 @@ fn account_worker_loop<Factory>(
456443
);
457444
drop(_span_guard);
458445

459-
// Release worker slot only if we successfully reserved it.
460-
// Prevents counter from wrapping to usize::MAX on underflow.
461-
if decremented {
462-
available_workers.fetch_add(1, Ordering::Relaxed);
463-
}
446+
// Mark worker as available again.
447+
available_workers.fetch_add(1, Ordering::Relaxed);
464448
}
465449
}
466450
}
@@ -940,9 +924,10 @@ impl ProofWorkerHandle {
940924
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
941925
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
942926

943-
// Initialize availability counters
944-
let storage_available_workers = Arc::new(AtomicUsize::new(storage_worker_count));
945-
let account_available_workers = Arc::new(AtomicUsize::new(account_worker_count));
927+
// Initialize availability counters at zero. Each worker will increment when it
928+
// successfully initializes, ensuring only healthy workers are counted.
929+
let storage_available_workers = Arc::new(AtomicUsize::new(0));
930+
let account_available_workers = Arc::new(AtomicUsize::new(0));
946931

947932
tracing::debug!(
948933
target: "trie::proof_task",

0 commit comments

Comments
 (0)