Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const EXTENSION: &str = "toml";
/// The default prune block interval
pub const DEFAULT_BLOCK_INTERVAL: usize = 5;


/// Configuration for the reth node.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down Expand Up @@ -125,6 +126,8 @@ pub struct StageConfig {
pub index_account_history: IndexHistoryConfig,
/// Index Storage History stage configuration.
pub index_storage_history: IndexHistoryConfig,
/// Snap Sync stage configuration.
pub snap_sync: SnapSyncConfig,
/// Common ETL related configuration.
pub etl: EtlConfig,
}
Expand Down Expand Up @@ -258,6 +261,39 @@ impl Default for SenderRecoveryConfig {
}
}

/// Snap Sync stage configuration.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(default))]
pub struct SnapSyncConfig {
/// Enable snap sync stage.
pub enabled: bool,
/// Max account ranges per execution.
pub max_ranges_per_execution: usize,
/// Max response bytes per request.
pub max_response_bytes: u64,
/// Timeout for peer requests (seconds).
pub request_timeout_seconds: u64,
/// Range size for account hash ranges (in hash space units).
/// Larger values = fewer requests but more data per request.
pub range_size: u64,
/// Maximum number of retries for failed requests.
pub max_retries: u32,
}

impl Default for SnapSyncConfig {
fn default() -> Self {
Self {
enabled: false,
max_ranges_per_execution: 100,
max_response_bytes: 2 * 1024 * 1024, // 2MB
request_timeout_seconds: 30,
range_size: 0x10, // 16 hash values (very small default for testing)
max_retries: 3, // 3 retries by default
}
}
}

/// Execution stage configuration.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand All @@ -278,6 +314,8 @@ pub struct ExecutionConfig {
)
)]
pub max_duration: Option<Duration>,
/// External clean threshold for execution stage.
pub external_clean_threshold: u64,
}

impl Default for ExecutionConfig {
Expand All @@ -289,6 +327,7 @@ impl Default for ExecutionConfig {
max_cumulative_gas: Some(30_000_000 * 50_000),
// 10 minutes
max_duration: Some(Duration::from_secs(10 * 60)),
external_clean_threshold: 100_000,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod config;
pub use config::{BodiesConfig, Config, PruneConfig};
pub use config::{BodiesConfig, Config, PruneConfig, StageConfig, SnapSyncConfig, SenderRecoveryConfig, ExecutionConfig};
4 changes: 4 additions & 0 deletions crates/stages/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ reth-stages-api.workspace = true
reth-static-file-types.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-db = { workspace = true, features = ["metrics"] }
reth-trie-common.workspace = true
reth-eth-wire-types.workspace = true

reth-testing-utils = { workspace = true, optional = true }

alloy-eips.workspace = true
alloy-primitives.workspace = true
alloy-consensus.workspace = true
alloy-trie = { workspace = true, features = ["ethereum"] }
alloy-rlp.workspace = true

# async
tokio = { workspace = true, features = ["sync"] }
Expand Down
79 changes: 67 additions & 12 deletions crates/stages/stages/src/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ use alloy_primitives::B256;
use reth_config::config::StageConfig;
use reth_consensus::{ConsensusError, FullConsensus};
use reth_evm::ConfigureEvm;
use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, snap::client::SnapClient};
use reth_primitives_traits::{Block, NodePrimitives};
use reth_provider::HeaderSyncGapProvider;
use reth_prune_types::PruneModes;
use reth_stages_api::Stage;
use std::{ops::Not, sync::Arc};
use tokio::sync::watch;


/// A set containing all stages to run a fully syncing instance of reth.
///
/// A combination of (in order)
Expand Down Expand Up @@ -348,13 +349,16 @@ where
/// A set containing all stages that are required to execute pre-existing block data.
#[derive(Debug)]
#[non_exhaustive]
pub struct ExecutionStages<E: ConfigureEvm> {
pub struct ExecutionStages<E: ConfigureEvm, S = ()> {
/// Executor factory that will create executors.
evm_config: E,
/// Consensus instance for validating blocks.
consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
/// Configuration for each stage in the pipeline
stages_config: StageConfig,
/// Optional snap client for snap sync (when enabled)
/// `SnapSyncStage` is integrated into the pipeline when snap sync is enabled
snap_client: Option<Arc<S>>,
}

impl<E: ConfigureEvm> ExecutionStages<E> {
Expand All @@ -364,25 +368,76 @@ impl<E: ConfigureEvm> ExecutionStages<E> {
consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
stages_config: StageConfig,
) -> Self {
Self { evm_config: executor_provider, consensus, stages_config }
Self {
evm_config: executor_provider,
consensus,
stages_config,
snap_client: None,
}
}
}

impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
impl<E: ConfigureEvm, S: SnapClient + Send + Sync + 'static> ExecutionStages<E, S> {
/// Create a new set of execution stages with snap client.
pub fn with_snap_client(
executor_provider: E,
consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
stages_config: StageConfig,
snap_client: Option<Arc<S>>,
) -> Self {
Self {
evm_config: executor_provider,
consensus,
stages_config,
snap_client,
}
}
}

impl<E, S, Provider> StageSet<Provider> for ExecutionStages<E, S>
where
E: ConfigureEvm + 'static,
S: reth_network_p2p::snap::client::SnapClient + Send + Sync + 'static,
Provider: reth_provider::DBProvider + reth_provider::StatsReader + reth_provider::HeaderProvider,
SenderRecoveryStage: Stage<Provider>,
ExecutionStage<E>: Stage<Provider>,
crate::stages::SnapSyncStage<S>: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default()
.add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
.add_stage(ExecutionStage::from_config(
self.evm_config,
self.consensus,
self.stages_config.execution,
self.stages_config.execution_external_clean_threshold(),
))
let mut builder = StageSetBuilder::default();

// Check if snap sync is enabled
if self.stages_config.snap_sync.enabled {
// Use snap sync stage when enabled - REPLACES SenderRecoveryStage, ExecutionStage, PruneSenderRecoveryStage
if let Some(snap_client) = self.snap_client {
builder = builder.add_stage(crate::stages::SnapSyncStage::new(
self.stages_config.snap_sync,
snap_client,
));
} else {
// Fall back to traditional stages if snap client not provided
builder = builder
.add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
.add_stage(ExecutionStage::from_config(
self.evm_config,
self.consensus,
self.stages_config.execution,
self.stages_config.execution_external_clean_threshold(),
));
}
} else {
// Use traditional stages when snap sync is disabled
builder = builder
.add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
.add_stage(ExecutionStage::from_config(
self.evm_config,
self.consensus,
self.stages_config.execution,
self.stages_config.execution_external_clean_threshold(),
));
}

builder
}
}

Expand Down
Loading