Skip to content

Commit 3ee7c73

Browse files
committed
refactor(engine): streamline proof task management with new handle
- Replaced `ProofTaskManager` with a new `new_proof_task_handle` function for improved task management. - Updated related code to utilize the new proof task handle, enhancing clarity and reducing complexity. - Adjusted tests to reflect changes in proof task initialization and handling.
1 parent 2cf0c25 commit 3ee7c73

File tree

3 files changed

+23
-47
lines changed

3 files changed

+23
-47
lines changed

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use reth_provider::{
3232
use reth_revm::{db::BundleState, state::EvmState};
3333
use reth_trie::TrieInput;
3434
use reth_trie_parallel::{
35-
proof_task::{ProofTaskCtx, ProofTaskManager},
35+
proof_task::{new_proof_task_handle, ProofTaskCtx},
3636
root::ParallelStateRootError,
3737
};
3838
use reth_trie_sparse::{
@@ -196,23 +196,21 @@ where
196196
state_root_config.prefix_sets.clone(),
197197
);
198198
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
199-
let storage_worker_count = config.storage_proof_workers();
200-
let proof_task = ProofTaskManager::new(
199+
let proof_task_handle = new_proof_task_handle(
201200
self.executor.handle().clone(),
202201
state_root_config.consistent_view.clone(),
203202
task_ctx,
204203
max_proof_task_concurrency,
205-
storage_worker_count,
206204
)
207-
.expect("Failed to create ProofTaskManager with storage workers");
205+
.expect("Failed to create proof task handle");
208206

209207
// We set it to half of the proof task concurrency, because often for each multiproof we
210208
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
211209
let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
212210
let multi_proof_task = MultiProofTask::new(
213211
state_root_config,
214212
self.executor.clone(),
215-
proof_task.handle(),
213+
proof_task_handle.clone(),
216214
to_sparse_trie,
217215
max_multi_proof_task_concurrency,
218216
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
@@ -241,19 +239,7 @@ where
241239
let (state_root_tx, state_root_rx) = channel();
242240

243241
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
244-
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
245-
246-
// spawn the proof task
247-
self.executor.spawn_blocking(move || {
248-
if let Err(err) = proof_task.run() {
249-
// At least log if there is an error at any point
250-
tracing::error!(
251-
target: "engine::root",
252-
?err,
253-
"Storage proof task returned an error"
254-
);
255-
}
256-
});
242+
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task_handle, state_root_tx);
257243

258244
PayloadHandle {
259245
to_multi_proof,

crates/engine/tree/src/tree/payload_processor/multiproof.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,7 +1204,7 @@ mod tests {
12041204
use alloy_primitives::map::B256Set;
12051205
use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
12061206
use reth_trie::{MultiProof, TrieInput};
1207-
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1207+
use reth_trie_parallel::proof_task::{new_proof_task_handle, ProofTaskCtx};
12081208
use revm_primitives::{B256, U256};
12091209
use std::sync::Arc;
12101210

@@ -1231,17 +1231,16 @@ mod tests {
12311231
config.state_sorted.clone(),
12321232
config.prefix_sets.clone(),
12331233
);
1234-
let proof_task = ProofTaskManager::new(
1234+
let proof_task_handle = new_proof_task_handle(
12351235
executor.handle().clone(),
12361236
config.consistent_view.clone(),
12371237
task_ctx,
1238-
1,
1239-
1, // storage_worker_count for test
1238+
1, // max_concurrency for test
12401239
)
1241-
.expect("Failed to create ProofTaskManager for multiproof test");
1240+
.expect("Failed to create proof task handle for multiproof test");
12421241
let channel = channel();
12431242

1244-
MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
1243+
MultiProofTask::new(config, executor, proof_task_handle, channel.0, 1, None)
12451244
}
12461245

12471246
#[test]

crates/trie/parallel/src/proof.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use dashmap::DashMap;
1414
use itertools::Itertools;
1515
use reth_execution_errors::StorageRootError;
1616
use reth_provider::{
17-
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, FactoryTx,
18-
ProviderError,
17+
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
1918
};
2019
use reth_storage_errors::db::DatabaseError;
2120
use reth_trie::{
@@ -34,7 +33,7 @@ use reth_trie_common::{
3433
proof::{DecodedProofNodes, ProofRetainer},
3534
};
3635
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
37-
use std::sync::{mpsc::Receiver, Arc};
36+
use std::sync::Arc;
3837
use tracing::trace;
3938

4039
/// Parallel proof calculator.
@@ -59,7 +58,7 @@ pub struct ParallelProof<Factory: DatabaseProviderFactory> {
5958
/// Provided by the user to give the necessary context to retain extra proofs.
6059
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
6160
/// Handle to the storage proof task.
62-
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
61+
storage_proof_task_handle: ProofTaskManagerHandle<<Factory::Provider as DBProvider>::Tx>,
6362
/// Cached storage proof roots for missed leaves; this maps
6463
/// hashed (missed) addresses to their storage proof roots.
6564
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
@@ -75,7 +74,7 @@ impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
7574
state_sorted: Arc<HashedPostStateSorted>,
7675
prefix_sets: Arc<TriePrefixSetsMut>,
7776
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
78-
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
77+
storage_proof_task_handle: ProofTaskManagerHandle<<Factory::Provider as DBProvider>::Tx>,
7978
) -> Self {
8079
Self {
8180
view,
@@ -118,18 +117,17 @@ where
118117
hashed_address: B256,
119118
prefix_set: PrefixSet,
120119
target_slots: B256Set,
121-
) -> Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>> {
120+
) -> crossbeam_channel::Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>> {
122121
let input = StorageProofInput::new(
123122
hashed_address,
124123
prefix_set,
125-
Arc::new(target_slots),
124+
target_slots,
126125
self.collect_branch_node_masks,
127126
self.multi_added_removed_keys.clone(),
128127
);
129128

130-
let (sender, receiver) = std::sync::mpsc::channel();
131-
let _ =
132-
self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender));
129+
let (sender, receiver) = crossbeam_channel::unbounded();
130+
self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender));
133131
receiver
134132
}
135133

@@ -368,7 +366,7 @@ where
368366
#[cfg(test)]
369367
mod tests {
370368
use super::*;
371-
use crate::proof_task::{ProofTaskCtx, ProofTaskManager};
369+
use crate::proof_task::{new_proof_task_handle, ProofTaskCtx};
372370
use alloy_primitives::{
373371
keccak256,
374372
map::{B256Set, DefaultHashBuilder},
@@ -447,19 +445,13 @@ mod tests {
447445

448446
let task_ctx =
449447
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
450-
let proof_task = ProofTaskManager::new(
448+
let proof_task_handle = new_proof_task_handle(
451449
rt.handle().clone(),
452450
consistent_view.clone(),
453451
task_ctx,
454-
1,
455-
1, // storage_worker_count for test
452+
1, // max_concurrency for test
456453
)
457454
.expect("Failed to create proof task");
458-
let proof_task_handle = proof_task.handle();
459-
460-
// keep the join handle around to make sure it does not return any errors
461-
// after we compute the state root
462-
let join_handle = rt.spawn_blocking(move || proof_task.run());
463455

464456
let parallel_result = ParallelProof::new(
465457
consistent_view,
@@ -495,9 +487,8 @@ mod tests {
495487
// then compare the entire thing for any mask differences
496488
assert_eq!(parallel_result, sequential_result_decoded);
497489

498-
// drop the handle to terminate the task and then block on the proof task handle to make
499-
// sure it does not return any errors
490+
// Drop the handle to release transaction pool resources
491+
// Note: No manager loop to join in the new design - handle manages lifecycle via Drop
500492
drop(proof_task_handle);
501-
rt.block_on(join_handle).unwrap().expect("The proof task should not return an error");
502493
}
503494
}

0 commit comments

Comments
 (0)