Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 46 additions & 48 deletions crates/engine/tree/src/tree/payload_processor/multiproof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use reth_trie_parallel::{
root::ParallelStateRootError,
};
use std::{
collections::{BTreeMap, VecDeque},
collections::BTreeMap,
ops::DerefMut,
sync::{
mpsc::{channel, Receiver, Sender},
Expand All @@ -34,10 +34,6 @@ use std::{
};
use tracing::{debug, error, instrument, trace};

/// Default upper bound for inflight multiproof calculations. These would be sitting in the queue
/// waiting to be processed.
const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 128;

/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
#[derive(Default, Debug)]
Expand Down Expand Up @@ -337,17 +333,10 @@ impl MultiproofInput {
}

/// Manages concurrent multiproof calculations.
/// Takes care of not having more calculations in flight than a given maximum
/// concurrency, further calculation requests are queued and spawn later, after
/// availability has been signaled.
#[derive(Debug)]
pub struct MultiproofManager {
/// Maximum number of proof calculations allowed to be inflight at once.
inflight_limit: usize,
/// Currently running calculations.
inflight: usize,
/// Queued calculations.
pending: VecDeque<PendingMultiproofTask>,
/// Executor for tasks
executor: WorkloadExecutor,
/// Handle to the proof worker pools (storage and account).
Expand Down Expand Up @@ -376,22 +365,16 @@ impl MultiproofManager {
proof_worker_handle: ProofWorkerHandle,
) -> Self {
Self {
pending: VecDeque::with_capacity(DEFAULT_MULTIPROOF_INFLIGHT_LIMIT),
inflight_limit: DEFAULT_MULTIPROOF_INFLIGHT_LIMIT,
executor,
inflight: 0,
executor,
metrics,
proof_worker_handle,
missed_leaves_storage_roots: Default::default(),
}
}

const fn is_full(&self) -> bool {
self.inflight >= self.inflight_limit
}

/// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached.
fn spawn_or_queue(&mut self, input: PendingMultiproofTask) {
/// Spawns a new multiproof calculation.
fn spawn(&mut self, input: PendingMultiproofTask) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets_is_empty() {
debug!(
Expand All @@ -402,27 +385,9 @@ impl MultiproofManager {
return
}

if self.is_full() {
self.pending.push_back(input);
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
return;
}

self.spawn_multiproof_task(input);
}

/// Signals that a multiproof calculation has finished and there's room to
/// spawn a new calculation if needed.
fn on_calculation_complete(&mut self) {
self.inflight = self.inflight.saturating_sub(1);
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);

if let Some(input) = self.pending.pop_front() {
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
self.spawn_multiproof_task(input);
}
}

/// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
/// multiproof, and dispatching to `spawn_multiproof` otherwise.
fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) {
Expand Down Expand Up @@ -508,6 +473,24 @@ impl MultiproofManager {

self.inflight += 1;
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
self.metrics
.pending_storage_multiproofs_histogram
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
self.metrics
.pending_account_multiproofs_histogram
.record(self.proof_worker_handle.pending_account_tasks() as f64);
}

/// Signals that a multiproof calculation has finished.
fn on_calculation_complete(&mut self) {
self.inflight = self.inflight.saturating_sub(1);
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
self.metrics
.pending_storage_multiproofs_histogram
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
self.metrics
.pending_account_multiproofs_histogram
.record(self.proof_worker_handle.pending_account_tasks() as f64);
}

/// Spawns a single multiproof calculation task.
Expand Down Expand Up @@ -598,6 +581,12 @@ impl MultiproofManager {

self.inflight += 1;
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
self.metrics
.pending_storage_multiproofs_histogram
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
self.metrics
.pending_account_multiproofs_histogram
.record(self.proof_worker_handle.pending_account_tasks() as f64);
}
}

Expand All @@ -606,8 +595,10 @@ impl MultiproofManager {
pub(crate) struct MultiProofTaskMetrics {
/// Histogram of inflight multiproofs.
pub inflight_multiproofs_histogram: Histogram,
/// Histogram of pending multiproofs.
pub pending_multiproofs_histogram: Histogram,
/// Histogram of pending storage multiproofs in the queue.
pub pending_storage_multiproofs_histogram: Histogram,
/// Histogram of pending account multiproofs in the queue.
pub pending_account_multiproofs_histogram: Histogram,

/// Histogram of the number of prefetch proof target accounts.
pub prefetch_proof_targets_accounts_histogram: Histogram,
Expand Down Expand Up @@ -657,8 +648,7 @@ pub(crate) struct MultiProofTaskMetrics {
#[derive(Debug)]
pub(super) struct MultiProofTask {
/// The size of proof targets chunk to spawn in one calculation.
///
/// If [`None`], then chunking is disabled.
/// If None, chunking is disabled and all targets are processed in a single proof.
chunk_size: Option<usize>,
/// Task configuration.
config: MultiProofConfig,
Expand Down Expand Up @@ -738,10 +728,14 @@ impl MultiProofTask {

// Process proof targets in chunks.
let mut chunks = 0;
let should_chunk = !self.multiproof_manager.is_full();

// Only chunk if account or storage workers are available to take advantage of parallelism.
let should_chunk =
self.multiproof_manager.proof_worker_handle.has_available_account_workers() ||
self.multiproof_manager.proof_worker_handle.has_available_storage_workers();

let mut spawn = |proof_targets| {
self.multiproof_manager.spawn_or_queue(
self.multiproof_manager.spawn(
MultiproofInput {
config: self.config.clone(),
source: None,
Expand Down Expand Up @@ -873,10 +867,14 @@ impl MultiProofTask {

// Process state updates in chunks.
let mut chunks = 0;
let should_chunk = !self.multiproof_manager.is_full();

let mut spawned_proof_targets = MultiProofTargets::default();

// Only chunk if account or storage workers are available to take advantage of parallelism.
let should_chunk =
self.multiproof_manager.proof_worker_handle.has_available_account_workers() ||
self.multiproof_manager.proof_worker_handle.has_available_storage_workers();

let mut spawn = |hashed_state_update| {
let proof_targets = get_proof_targets(
&hashed_state_update,
Expand All @@ -885,7 +883,7 @@ impl MultiProofTask {
);
spawned_proof_targets.extend_ref(&proof_targets);

self.multiproof_manager.spawn_or_queue(
self.multiproof_manager.spawn(
MultiproofInput {
config: self.config.clone(),
source: Some(source),
Expand Down Expand Up @@ -954,7 +952,7 @@ impl MultiProofTask {
/// so that the proofs for accounts and storage slots that were already fetched are not
/// requested again.
/// 2. Using the proof targets, a new multiproof is calculated using
/// [`MultiproofManager::spawn_or_queue`].
/// [`MultiproofManager::spawn`].
/// * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
/// sent back to this task along with the original state update.
/// * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`]
Expand Down
Loading
Loading