Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 15 additions & 10 deletions crates/engine/tree/benches/state_root_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,21 @@ fn bench_state_root(c: &mut Criterion) {
},
|(genesis_hash, mut payload_processor, provider, state_updates)| {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
);
let mut handle = payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
)
.expect("failed to spawn payload processor task");

let mut state_hook = handle.state_hook();

Expand Down
61 changes: 27 additions & 34 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ use reth_evm::{
};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
StateReader,
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ProviderResult,
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::TrieInput;
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofTaskManager},
proof_task::{new_proof_task_handle, ProofTaskCtx},
root::ParallelStateRootError,
};
use reth_trie_sparse::{
Expand All @@ -58,7 +58,7 @@ use configured_sparse_trie::ConfiguredSparseTrie;
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
/// the effects. Below 100 throughput would generally be equal or slightly less, while above 150 it
/// would deteriorate to the point where PST might as well not be used.
pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
Expand All @@ -69,7 +69,7 @@ pub struct PayloadProcessor<Evm>
where
Evm: ConfigureEvm,
{
/// The executor used by to spawn tasks.
/// The executor used to spawn tasks.
executor: WorkloadExecutor,
/// The most recent cache used for execution.
execution_cache: ExecutionCache,
Expand Down Expand Up @@ -163,9 +163,9 @@ where
///
/// This task runs until there are no further updates to process.
///
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
#[allow(clippy::type_complexity)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
Expand All @@ -174,7 +174,7 @@ where
consistent_view: ConsistentDbView<P>,
trie_input: TrieInput,
config: &TreeConfig,
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
) -> ProviderResult<PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ BlockReader
Expand All @@ -196,20 +196,20 @@ where
state_root_config.prefix_sets.clone(),
);
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let proof_task = ProofTaskManager::new(
let proof_task_handle = new_proof_task_handle(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx,
max_proof_task_concurrency,
);
)?;

// We set it to half of the proof task concurrency, because often for each multiproof we
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
let max_multi_proof_task_concurrency = (max_proof_task_concurrency / 2).max(1);
let multi_proof_task = MultiProofTask::new(
state_root_config,
self.executor.clone(),
proof_task.handle(),
proof_task_handle.clone(),
to_sparse_trie,
max_multi_proof_task_concurrency,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
Expand Down Expand Up @@ -238,26 +238,14 @@ where
let (state_root_tx, state_root_rx) = channel();

// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task_handle, state_root_tx);

// spawn the proof task
self.executor.spawn_blocking(move || {
if let Err(err) = proof_task.run() {
// At least log if there is an error at any point
tracing::error!(
target: "engine::root",
?err,
"Storage proof task returned an error"
);
}
});

PayloadHandle {
Ok(PayloadHandle {
to_multi_proof,
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
}
})
}

/// Spawns a task that exclusively handles cache prewarming for transaction execution.
Expand Down Expand Up @@ -857,14 +845,19 @@ mod tests {
PrecompileCacheMap::default(),
);
let provider = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
);
let mut handle =
payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
)
.expect("failed to spawn payload processor task");

let mut state_hook = handle.state_hook();

Expand Down
11 changes: 6 additions & 5 deletions crates/engine/tree/src/tree/payload_processor/multiproof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,7 @@ mod tests {
use alloy_primitives::map::B256Set;
use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
use reth_trie::{MultiProof, TrieInput};
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
use reth_trie_parallel::proof_task::{new_proof_task_handle, ProofTaskCtx};
use revm_primitives::{B256, U256};
use std::sync::Arc;

Expand All @@ -1231,15 +1231,16 @@ mod tests {
config.state_sorted.clone(),
config.prefix_sets.clone(),
);
let proof_task = ProofTaskManager::new(
let proof_task_handle = new_proof_task_handle(
executor.handle().clone(),
config.consistent_view.clone(),
task_ctx,
1,
);
1, // max_concurrency for test
)
.expect("Failed to create proof task handle for multiproof test");
let channel = channel();

MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
MultiProofTask::new(config, executor, proof_task_handle, channel.0, 1, None)
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ where
consistent_view,
trie_input,
&self.config,
),
)?,
StateRootStrategy::StateRootTask,
)
// if prefix sets are not empty, we spawn a task that exclusively handles cache
Expand Down
1 change: 1 addition & 0 deletions crates/trie/parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ derive_more.workspace = true
rayon.workspace = true
itertools.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
crossbeam-channel.workspace = true

# `metrics` feature
reth-metrics = { workspace = true, optional = true }
Expand Down
47 changes: 27 additions & 20 deletions crates/trie/parallel/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use dashmap::DashMap;
use itertools::Itertools;
use reth_execution_errors::StorageRootError;
use reth_provider::{
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, FactoryTx,
ProviderError,
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
};
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
Expand All @@ -34,7 +33,7 @@ use reth_trie_common::{
proof::{DecodedProofNodes, ProofRetainer},
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::sync::{mpsc::Receiver, Arc};
use std::sync::Arc;
use tracing::trace;

/// Parallel proof calculator.
Expand All @@ -59,7 +58,7 @@ pub struct ParallelProof<Factory: DatabaseProviderFactory> {
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// Handle to the storage proof task.
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
storage_proof_task_handle: ProofTaskManagerHandle<<Factory::Provider as DBProvider>::Tx>,
/// Cached storage proof roots for missed leaves; this maps
/// hashed (missed) addresses to their storage proof roots.
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
Expand All @@ -75,7 +74,7 @@ impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
state_sorted: Arc<HashedPostStateSorted>,
prefix_sets: Arc<TriePrefixSetsMut>,
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
storage_proof_task_handle: ProofTaskManagerHandle<<Factory::Provider as DBProvider>::Tx>,
) -> Self {
Self {
view,
Expand Down Expand Up @@ -118,7 +117,7 @@ where
hashed_address: B256,
prefix_set: PrefixSet,
target_slots: B256Set,
) -> Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>> {
) -> crossbeam_channel::Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>> {
let input = StorageProofInput::new(
hashed_address,
prefix_set,
Expand All @@ -127,9 +126,8 @@ where
self.multi_added_removed_keys.clone(),
);

let (sender, receiver) = std::sync::mpsc::channel();
let _ =
self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender));
let (sender, receiver) = crossbeam_channel::unbounded();
self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender));
receiver
}

Expand Down Expand Up @@ -323,6 +321,17 @@ where
}
}
}

// Drain receivers for accounts the walker never touched (e.g. destroyed targets) so workers
// can deliver their results without hitting a closed channel.
for (hashed_address, rx) in storage_proof_receivers {
let decoded_storage_multiproof = rx.recv().map_err(|e| {
ParallelStateRootError::StorageRoot(StorageRootError::Database(
DatabaseError::Other(format!("channel closed for {hashed_address}: {e}")),
))
})??;
collected_decoded_storages.insert(hashed_address, decoded_storage_multiproof);
}
let _ = hash_builder.root();

let stats = tracker.finish();
Expand Down Expand Up @@ -368,7 +377,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::proof_task::{ProofTaskCtx, ProofTaskManager};
use crate::proof_task::{new_proof_task_handle, ProofTaskCtx};
use alloy_primitives::{
keccak256,
map::{B256Set, DefaultHashBuilder},
Expand Down Expand Up @@ -447,13 +456,13 @@ mod tests {

let task_ctx =
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
let proof_task =
ProofTaskManager::new(rt.handle().clone(), consistent_view.clone(), task_ctx, 1);
let proof_task_handle = proof_task.handle();

// keep the join handle around to make sure it does not return any errors
// after we compute the state root
let join_handle = rt.spawn_blocking(move || proof_task.run());
let proof_task_handle = new_proof_task_handle(
rt.handle().clone(),
consistent_view.clone(),
task_ctx,
1, // max_concurrency for test
)
.expect("Failed to create proof task");

let parallel_result = ParallelProof::new(
consistent_view,
Expand Down Expand Up @@ -489,9 +498,7 @@ mod tests {
// then compare the entire thing for any mask differences
assert_eq!(parallel_result, sequential_result_decoded);

// drop the handle to terminate the task and then block on the proof task handle to make
// sure it does not return any errors
// Drop the handle to release transaction pool resources
drop(proof_task_handle);
rt.block_on(join_handle).unwrap().expect("The proof task should not return an error");
}
}
Loading