Skip to content

Commit bfc69c7

Browse files
feat: optimize prefetch
1 parent b0902d9 commit bfc69c7

5 files changed

Lines changed: 204 additions & 86 deletions

File tree

crates/blockchain-tree/src/chain.rs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,37 @@
33
//! A [`Chain`] contains the state of accounts for the chain after execution of its constituent
44
//! blocks, as well as a list of the blocks the chain is composed of.
55
6-
use super::externals::TreeExternals;
7-
use crate::BundleStateDataRef;
6+
use std::{
7+
collections::{BTreeMap, HashMap},
8+
ops::{Deref, DerefMut},
9+
time::Instant,
10+
};
11+
812
use reth_blockchain_tree_api::{
9-
error::{BlockchainTreeError, InsertBlockErrorKind},
10-
BlockAttachment, BlockValidationKind,
13+
BlockAttachment,
14+
BlockValidationKind, error::{BlockchainTreeError, InsertBlockErrorKind},
1115
};
1216
use reth_consensus::{Consensus, ConsensusError, PostExecutionInput};
1317
use reth_db_api::database::Database;
1418
use reth_evm::execute::{BlockExecutorProvider, Executor};
1519
use reth_execution_errors::BlockExecutionError;
1620
use reth_execution_types::{Chain, ExecutionOutcome};
1721
use reth_primitives::{
18-
BlockHash, BlockNumber, ForkBlock, GotExpected, Header, SealedBlockWithSenders, SealedHeader,
19-
B256, U256,
22+
B256, BlockHash, BlockNumber, ForkBlock, GotExpected, Header, SealedBlockWithSenders,
23+
SealedHeader, U256,
2024
};
2125
use reth_provider::{
22-
providers::{BundleStateProvider, ConsistentDbView},
23-
FullExecutionDataProvider, ProviderError, StateRootProvider,
26+
FullExecutionDataProvider,
27+
ProviderError, providers::{BundleStateProvider, ConsistentDbView}, StateRootProvider,
2428
};
2529
use reth_revm::{database::StateProviderDatabase, primitives::EvmState};
26-
use reth_trie::{updates::TrieUpdates, HashedPostState};
30+
use reth_trie::{HashedPostState, updates::TrieUpdates};
2731
use reth_trie_parallel::parallel_root::ParallelStateRoot;
2832
use reth_trie_prefetch::TriePrefetch;
29-
use std::{
30-
collections::{BTreeMap, HashMap},
31-
ops::{Deref, DerefMut},
32-
sync::Arc,
33-
time::Instant,
34-
};
33+
34+
use crate::BundleStateDataRef;
35+
36+
use super::externals::TreeExternals;
3537

3638
/// A chain in the blockchain tree that has functionality to execute blocks and append them to
3739
/// itself.
@@ -227,22 +229,25 @@ impl AppendableChain {
227229
let block_hash = block.hash();
228230
let block = block.unseal();
229231

232+
let execute_start = Instant::now();
230233
let state = executor.execute((&block, U256::MAX, ancestor_blocks).into())?;
231234
externals.consensus.validate_block_post_execution(
232235
&block,
233236
PostExecutionInput::new(&state.receipts, &state.requests),
234237
)?;
235238

236-
let initial_execution_outcome = ExecutionOutcome::from((state, block.number));
239+
tracing::debug!(
240+
target: "blockchain_tree::chain",
241+
number = block.number,
242+
duration = ?execute_start.elapsed(),
243+
"executed and validated block"
244+
);
237245

238-
// stop the prefetch task.
239-
if let Some(interrupt_tx) = interrupt_tx {
240-
let _ = interrupt_tx.send(());
241-
}
246+
let initial_execution_outcome = ExecutionOutcome::from((state, block.number));
242247

243248
// check state root if the block extends the canonical chain __and__ if state root
244249
// validation was requested.
245-
if block_validation_kind.is_exhaustive() {
250+
let result = if block_validation_kind.is_exhaustive() {
246251
// calculate and check state root
247252
let start = Instant::now();
248253
let (state_root, trie_updates) = if block_attachment.is_canonical() {
@@ -285,7 +290,14 @@ impl AppendableChain {
285290
Ok((initial_execution_outcome, trie_updates))
286291
} else {
287292
Ok((initial_execution_outcome, None))
288-
}
293+
};
294+
295+
// stop the prefetch task.
296+
if let Some(interrupt_tx) = interrupt_tx {
297+
let _ = interrupt_tx.send(());
298+
};
299+
300+
result
289301
}
290302

291303
/// Validate and execute the given block, and append it to this chain.
@@ -358,18 +370,11 @@ impl AppendableChain {
358370
let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel();
359371

360372
let mut trie_prefetch = TriePrefetch::new();
361-
let consistent_view = if let Ok(view) =
362-
ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())
363-
{
364-
view
365-
} else {
366-
tracing::debug!("Failed to create consistent view for trie prefetch");
367-
return (None, None)
368-
};
373+
let provider_factory = externals.provider_factory.clone();
369374

370375
tokio::spawn({
371376
async move {
372-
trie_prefetch.run::<DB>(Arc::new(consistent_view), prefetch_rx, interrupt_rx).await;
377+
trie_prefetch.run::<DB>(provider_factory, prefetch_rx, interrupt_rx).await;
373378
}
374379
});
375380

crates/trie/parallel/src/parallel_root.rs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
1-
#[cfg(feature = "metrics")]
2-
use crate::metrics::ParallelStateRootMetrics;
3-
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
1+
use std::collections::HashMap;
2+
43
use alloy_rlp::{BufMut, Encodable};
54
use rayon::prelude::*;
5+
use thiserror::Error;
6+
use tracing::*;
7+
68
use reth_db_api::database::Database;
79
use reth_execution_errors::StorageRootError;
810
use reth_primitives::B256;
9-
use reth_provider::{providers::ConsistentDbView, DatabaseProviderFactory, ProviderError};
11+
use reth_provider::{DatabaseProviderFactory, ProviderError, providers::ConsistentDbView};
1012
use reth_trie::{
13+
HashBuilder,
1114
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
15+
HashedPostState,
16+
Nibbles,
1217
node_iter::{TrieElement, TrieNodeIter},
13-
trie_cursor::TrieCursorFactory,
14-
updates::TrieUpdates,
15-
walker::TrieWalker,
16-
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
18+
StorageRoot, trie_cursor::TrieCursorFactory, TrieAccount, updates::TrieUpdates, walker::TrieWalker,
1719
};
1820
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
19-
use std::collections::HashMap;
20-
use thiserror::Error;
21-
use tracing::*;
21+
22+
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
23+
#[cfg(feature = "metrics")]
24+
use crate::metrics::ParallelStateRootMetrics;
2225

2326
/// Parallel incremental state root calculator.
2427
///
@@ -129,14 +132,17 @@ where
129132
hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?,
130133
);
131134

135+
let account_tree_start = std::time::Instant::now();
132136
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
133137
let mut account_rlp = Vec::with_capacity(128);
134138
while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
135139
match node {
136140
TrieElement::Branch(node) => {
141+
tracker.inc_branch();
137142
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
138143
}
139144
TrieElement::Leaf(hashed_address, account) => {
145+
tracker.inc_leaf();
140146
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
141147
Some(result) => result,
142148
// Since we do not store all intermediate nodes in the database, there might
@@ -174,15 +180,18 @@ where
174180
prefix_sets.destroyed_accounts,
175181
);
176182

183+
let account_tree_duration = account_tree_start.elapsed();
177184
let stats = tracker.finish();
178185

179186
#[cfg(feature = "metrics")]
180187
self.metrics.record_state_trie(stats);
181188

182-
trace!(
189+
debug!(
183190
target: "trie::parallel_state_root",
184191
%root,
185192
duration = ?stats.duration(),
193+
account_tree_duration = ?account_tree_duration,
194+
storage_trees_duration = ?(stats.duration() - account_tree_duration),
186195
branches_added = stats.branches_added(),
187196
leaves_added = stats.leaves_added(),
188197
missed_leaves = stats.missed_leaves(),
@@ -218,11 +227,13 @@ impl From<ParallelStateRootError> for ProviderError {
218227

219228
#[cfg(test)]
220229
mod tests {
221-
use super::*;
222230
use rand::Rng;
223-
use reth_primitives::{keccak256, Account, Address, StorageEntry, U256};
224-
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
225-
use reth_trie::{test_utils, HashedStorage};
231+
232+
use reth_primitives::{Account, Address, keccak256, StorageEntry, U256};
233+
use reth_provider::{HashingWriter, test_utils::create_test_provider_factory};
234+
use reth_trie::{HashedStorage, test_utils};
235+
236+
use super::*;
226237

227238
#[tokio::test]
228239
async fn random_parallel_root() {

crates/trie/prefetch/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
)]
88
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
99

10+
pub use prefetch::TriePrefetch;
1011
pub use reth_trie_parallel::StorageRootTargets;
1112

13+
/// Trie prefetch stats.
14+
pub mod stats;
15+
1216
/// Implementation of trie prefetch.
13-
mod prefetch;
14-
pub use prefetch::TriePrefetch;
17+
pub mod prefetch;

0 commit comments

Comments
 (0)