Skip to content

Commit 6963980

Browse files
authored
refactor: builder uses stream and requires WS (#135)
1 parent d33a4c1 commit 6963980

File tree

6 files changed

+49
-33
lines changed

6 files changed

+49
-33
lines changed

bin/builder.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,14 @@ async fn main() -> eyre::Result<()> {
2121
let config = BuilderConfig::from_env()?.clone();
2222
let constants = SignetSystemConstants::pecorino();
2323

24+
// We connect the WS greedily, so we can fail early if the connection is
25+
// invalid.
26+
let ru_provider = config.connect_ru_provider().await?;
27+
2428
// Spawn the EnvTask
2529
let env_task = config.env_task();
26-
let (block_env, env_jh) = env_task.spawn();
30+
let (block_env, env_jh) =
31+
env_task.await.expect("ws validity checked in connect_ru_provider above").spawn();
2732

2833
// Spawn the cache system
2934
let cache_tasks = CacheTasks::new(config.clone(), block_env.clone());
@@ -32,7 +37,6 @@ async fn main() -> eyre::Result<()> {
3237
// Prep providers and contracts
3338
let (host_provider, quincey) =
3439
tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?;
35-
let ru_provider = config.connect_ru_provider();
3640
let zenith = config.connect_zenith(host_provider.clone());
3741

3842
// Set up the metrics task

src/config.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use alloy::{
1212
SimpleNonceManager, WalletFiller,
1313
},
1414
},
15+
rpc::client::BuiltInConnectionString,
1516
};
1617
use eyre::Result;
1718
use init4_bin_base::{
@@ -63,11 +64,19 @@ pub struct BuilderConfig {
6364
pub ru_chain_id: u64,
6465

6566
/// URL for Host RPC node.
66-
#[from_env(var = "HOST_RPC_URL", desc = "URL for Host RPC node", infallible)]
67+
#[from_env(
68+
var = "HOST_RPC_URL",
69+
desc = "URL for Host RPC node. This MUST be a valid HTTP or WS URL, starting with http://, https://, ws:// or wss://",
70+
infallible
71+
)]
6772
pub host_rpc_url: Cow<'static, str>,
6873

6974
/// URL for the Rollup RPC node.
70-
#[from_env(var = "ROLLUP_RPC_URL", desc = "URL for Rollup RPC node", infallible)]
75+
#[from_env(
76+
var = "ROLLUP_RPC_URL",
77+
desc = "URL for Rollup RPC node. This MUST be a valid WS url starting with ws:// or wss://. Http providers are not supported.",
78+
infallible
79+
)]
7180
pub ru_rpc_url: Cow<'static, str>,
7281

7382
/// URL of the tx pool to poll for incoming transactions.
@@ -176,14 +185,25 @@ impl BuilderConfig {
176185
}
177186

178187
/// Connect to the Rollup rpc provider.
179-
pub fn connect_ru_provider(&self) -> RootProvider<Ethereum> {
180-
static ONCE: std::sync::OnceLock<RootProvider<Ethereum>> = std::sync::OnceLock::new();
188+
pub async fn connect_ru_provider(&self) -> eyre::Result<RootProvider<Ethereum>> {
189+
static ONCE: tokio::sync::OnceCell<RootProvider<Ethereum>> =
190+
tokio::sync::OnceCell::const_new();
181191

182-
ONCE.get_or_init(|| {
183-
let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL");
184-
RootProvider::new_http(url)
192+
ONCE.get_or_try_init(|| async {
193+
let url = url::Url::parse(&self.ru_rpc_url)?;
194+
195+
let scheme = url.scheme();
196+
eyre::ensure!(
197+
scheme == "ws" || scheme == "wss",
198+
"Invalid Rollup RPC URL scheme: {scheme}. Expected ws:// or wss://"
199+
);
200+
201+
RootProvider::connect_with(BuiltInConnectionString::Ws(url, None))
202+
.await
203+
.map_err(Into::into)
185204
})
186-
.clone()
205+
.await
206+
.cloned()
187207
}
188208

189209
/// Connect to the Host rpc provider.
@@ -245,9 +265,9 @@ impl BuilderConfig {
245265
}
246266

247267
/// Create an [`EnvTask`] using this config.
248-
pub fn env_task(&self) -> EnvTask {
249-
let ru_provider = self.connect_ru_provider();
250-
EnvTask::new(self.clone(), ru_provider)
268+
pub async fn env_task(&self) -> eyre::Result<EnvTask> {
269+
let ru_provider = self.connect_ru_provider().await?;
270+
Ok(EnvTask::new(self.clone(), ru_provider))
251271
}
252272

253273
/// Create a [`SignetCfgEnv`] using this config.

src/tasks/env.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use alloy::{
66
providers::Provider,
77
};
88
use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span};
9-
use std::time::Duration;
109
use tokio::{sync::watch, task::JoinHandle};
1110
use tokio_stream::StreamExt;
1211
use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice};
@@ -58,39 +57,32 @@ impl EnvTask {
5857
/// Returns a sender that sends [`SimEnv`] for communicating the next block environment.
5958
async fn task_fut(self, sender: watch::Sender<Option<SimEnv>>) {
6059
let span = info_span!("EnvTask::task_fut::init");
61-
let mut poller = match self.ru_provider.watch_blocks().instrument(span.clone()).await {
60+
61+
let mut blocks = match self.ru_provider.subscribe_blocks().await {
6262
Ok(poller) => poller,
6363
Err(err) => {
6464
let _span = span.enter();
65-
error!(%err, "Failed to watch blocks");
65+
error!(%err, "Failed to subscribe to blocks");
6666
return;
6767
}
68-
};
69-
70-
poller.set_poll_interval(Duration::from_millis(250));
71-
72-
let mut blocks = poller.into_stream();
68+
}
69+
.into_stream();
7370

74-
while let Some(blocks) =
71+
while let Some(block) =
7572
blocks.next().instrument(info_span!("EnvTask::task_fut::stream")).await
7673
{
77-
let Some(block_hash) = blocks.last() else {
78-
// This case occurs when there are no changes to the block,
79-
// so we do nothing.
80-
continue;
81-
};
8274
let span =
83-
info_span!("EnvTask::task_fut::loop", %block_hash, number = tracing::field::Empty);
75+
info_span!("EnvTask::task_fut::loop", %block.hash, number = tracing::field::Empty);
8476

8577
// Get the rollup header for rollup block simulation environment configuration
8678
let rollup_header = match self
87-
.get_latest_rollup_header(&sender, block_hash, &span)
79+
.get_latest_rollup_header(&sender, &block.hash, &span)
8880
.await
8981
{
9082
Some(value) => value,
9183
None => {
9284
// If we failed to get the rollup header, we skip this iteration.
93-
debug!(%block_hash, "failed to get rollup header - continuing to next block");
85+
debug!(%block.hash, "failed to get rollup header - continuing to next block");
9486
continue;
9587
}
9688
};

tests/block_builder_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ async fn test_handle_build() {
4242
// Create a rollup provider
4343
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());
4444

45-
let block_env = config.env_task().spawn().0;
45+
let block_env = config.env_task().await.unwrap().spawn().0;
4646

4747
let block_builder = Simulator::new(&config, ru_provider.clone(), block_env);
4848

tests/cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ async fn test_bundle_poller_roundtrip() -> eyre::Result<()> {
1212

1313
let config = setup_test_config().unwrap();
1414

15-
let (block_env, _jh) = config.env_task().spawn();
15+
let (block_env, _jh) = config.env_task().await.unwrap().spawn();
1616
let cache_tasks = CacheTasks::new(config.clone(), block_env);
1717
let cache_system = cache_tasks.spawn();
1818

tests/env.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() {
77

88
let config = setup_test_config().unwrap();
99
let env_task = config.env_task();
10-
let (mut env_watcher, _jh) = env_task.spawn();
10+
let (mut env_watcher, _jh) = env_task.await.unwrap().spawn();
1111

1212
env_watcher.changed().await.unwrap();
1313
let env = env_watcher.borrow_and_update();

0 commit comments

Comments
 (0)