Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig};
use init4_bin_base::utils::calc::SlotCalculator;
use reth::transaction_pool::TransactionPool;
use url::Url;

Expand Down Expand Up @@ -35,7 +34,6 @@ pub struct BlobFetcherBuilder<Pool> {
client: Option<reqwest::Client>,
cl_url: Option<String>,
pylon_url: Option<String>,
slot_calculator: Option<SlotCalculator>,
}

impl<Pool> BlobFetcherBuilder<Pool> {
Expand All @@ -47,7 +45,6 @@ impl<Pool> BlobFetcherBuilder<Pool> {
client: self.client,
cl_url: self.cl_url,
pylon_url: self.pylon_url,
slot_calculator: self.slot_calculator,
}
}

Expand Down Expand Up @@ -107,22 +104,6 @@ impl<Pool> BlobFetcherBuilder<Pool> {
self.pylon_url = Some(pylon_url.to_string());
Ok(self)
}

/// Set the slot calculator to use for the extractor.
pub const fn with_slot_calculator(
mut self,
slot_calculator: SlotCalculator,
) -> BlobFetcherBuilder<Pool> {
self.slot_calculator = Some(slot_calculator);
self
}

/// Set the slot calculator to use for the extractor, using the Pecornino
/// host configuration.
pub const fn with_pecornino_slots(mut self) -> BlobFetcherBuilder<Pool> {
self.slot_calculator = Some(SlotCalculator::pecorino_host());
self
}
}

impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
Expand All @@ -141,9 +122,7 @@ impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
let explorer =
foundry_blob_explorers::Client::new_with_client(explorer_url, client.clone());

let slot_calculator = self.slot_calculator.ok_or(BuilderError::MissingSlotCalculator)?;

Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url, slot_calculator))
Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url))
}

/// Build a [`BlobCacher`] with the provided parameters.
Expand Down
79 changes: 27 additions & 52 deletions crates/blobber/src/cache.rs → crates/blobber/src/blobs/cache.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::{BlobberError, BlobberResult, Blobs, FetchResult};
use crate::{BlobFetcher, BlobberError, BlobberResult, Blobs, FetchResult};
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
use alloy::eips::merge::EPOCH_SLOTS;
use alloy::primitives::{B256, Bytes, keccak256};
use core::fmt;
use reth::transaction_pool::TransactionPool;
use reth::{network::cache::LruMap, primitives::Receipt};
use signet_extract::ExtractedEvent;
use signet_zenith::Zenith::BlockSubmitted;
use signet_zenith::ZenithBlock;
use std::marker::PhantomData;
use std::{
sync::{Arc, Mutex},
time::Duration,
Expand Down Expand Up @@ -37,11 +39,13 @@ enum CacheInst {

/// Handle for the cache.
#[derive(Debug, Clone)]
pub struct CacheHandle {
pub struct CacheHandle<Coder = SimpleCoder> {
sender: mpsc::Sender<CacheInst>,

_coder: PhantomData<Coder>,
}

impl CacheHandle {
impl<Coder> CacheHandle<Coder> {
/// Sends a cache instruction.
async fn send(&self, inst: CacheInst) {
let _ = self.sender.send(inst).await;
Expand Down Expand Up @@ -71,12 +75,14 @@ impl CacheHandle {

/// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
/// Zenith block data using the provided coder.
pub async fn fetch_and_decode_with_coder<C: SidecarCoder>(
pub async fn fetch_and_decode(
&self,
slot: usize,
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
mut coder: C,
) -> BlobberResult<Bytes> {
) -> BlobberResult<Bytes>
where
Coder: SidecarCoder + Default,
{
let tx_hash = extract.tx_hash();
let versioned_hashes = extract
.tx
Expand All @@ -87,23 +93,13 @@ impl CacheHandle {

let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?;

coder
Coder::default()
.decode_all(blobs.as_ref())
.ok_or_else(BlobberError::blob_decode_error)?
.into_iter()
.find(|data| keccak256(data) == extract.block_data_hash())
.map(Into::into)
.ok_or_else(|| BlobberError::block_data_not_found(tx_hash))
}

/// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
/// [`SimpleCoder`] to get the Zenith block data.
pub async fn fech_and_decode(
&self,
slot: usize,
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
) -> BlobberResult<Bytes> {
self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await
.ok_or_else(|| BlobberError::block_data_not_found(extract.block_data_hash()))
}

/// Fetch the blobs, decode them using the provided coder, and construct a
Expand All @@ -117,15 +113,17 @@ impl CacheHandle {
/// decoded (e.g., due to a malformatted blob).
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
/// blobs.
pub async fn signet_block_with_coder<C: SidecarCoder>(
pub async fn signet_block(
&self,
host_block_number: u64,
slot: usize,
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
coder: C,
) -> FetchResult<ZenithBlock> {
) -> FetchResult<ZenithBlock>
where
Coder: SidecarCoder + Default,
{
let header = extract.ru_header(host_block_number);
let block_data = match self.fetch_and_decode_with_coder(slot, extract, coder).await {
let block_data = match self.fetch_and_decode(slot, extract).await {
Ok(buf) => buf,
Err(BlobberError::Decode(_)) => {
trace!("Failed to decode block data");
Expand All @@ -135,44 +133,24 @@ impl CacheHandle {
};
Ok(ZenithBlock::from_header_and_data(header, block_data))
}

/// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
/// Zenith block from the header and data.
///
/// # Returns
///
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
/// decoded.
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
/// decoded (e.g., due to a malformatted blob).
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
/// blobs.
pub async fn signet_block(
&self,
host_block_number: u64,
slot: usize,
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
) -> FetchResult<ZenithBlock> {
self.signet_block_with_coder(host_block_number, slot, extract, SimpleCoder::default()).await
}
}

/// Retrieves blobs and stores them in a cache for later use.
pub struct BlobCacher<Pool> {
fetcher: crate::BlobFetcher<Pool>,
fetcher: BlobFetcher<Pool>,

cache: Mutex<LruMap<(usize, B256), Blobs>>,
}

impl<Pool: core::fmt::Debug> core::fmt::Debug for BlobCacher<Pool> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
impl<Pool: fmt::Debug> fmt::Debug for BlobCacher<Pool> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive()
}
}

impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
/// Creates a new `BlobCacher` with the provided extractor and cache size.
pub fn new(fetcher: crate::BlobFetcher<Pool>) -> Self {
pub fn new(fetcher: BlobFetcher<Pool>) -> Self {
Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() }
}

Expand Down Expand Up @@ -237,10 +215,10 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
///
/// # Panics
/// This function will panic if the cache task fails to spawn.
pub fn spawn(self) -> CacheHandle {
pub fn spawn<C: SidecarCoder + Default>(self) -> CacheHandle<C> {
let (sender, inst) = mpsc::channel(CACHE_REQUEST_CHANNEL_SIZE);
tokio::spawn(Arc::new(self).task_future(inst));
CacheHandle { sender }
CacheHandle { sender, _coder: PhantomData }
}
}

Expand All @@ -256,7 +234,6 @@ mod tests {
rlp::encode,
signers::{SignerSync, local::PrivateKeySigner},
};
use init4_bin_base::utils::calc::SlotCalculator;
use reth::primitives::Transaction;
use reth_transaction_pool::{
PoolTransaction, TransactionOrigin,
Expand All @@ -272,7 +249,6 @@ mod tests {
let test = signet_constants::KnownChains::Test;

let constants: SignetSystemConstants = test.try_into().unwrap();
let calc = SlotCalculator::new(0, 0, 12);

let explorer_url = "https://api.holesky.blobscan.com/";
let client = reqwest::Client::builder().use_rustls_tls();
Expand Down Expand Up @@ -308,9 +284,8 @@ mod tests {
.with_explorer_url(explorer_url)
.with_client_builder(client)
.unwrap()
.with_slot_calculator(calc)
.build_cache()?;
let handle = cache.spawn();
let handle = cache.spawn::<SimpleCoder>();

let got = handle
.fetch_blobs(
Expand Down
File renamed without changes.
38 changes: 38 additions & 0 deletions crates/blobber/src/blobs/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use alloy::primitives::B256;
use reth::transaction_pool::BlobStoreError;

/// Result using [`FetchError`] as the default error type.
pub type FetchResult<T> = Result<T, FetchError>;

/// Unrecoverable blob fetching errors. These result in the node shutting
/// down. They occur when the blobstore is down or the sidecar is unretrievable.
#[derive(Debug, thiserror::Error)]
pub enum FetchError {
/// Reqwest error
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
/// Missing sidecar error
#[error("Cannot retrieve sidecar for {0} from any source")]
MissingSidecar(B256),
/// Reth blobstore error.
#[error(transparent)]
BlobStore(BlobStoreError),
/// Url parse error.
#[error(transparent)]
UrlParse(#[from] url::ParseError),
/// Consensus client URL not set error.
#[error("Consensus client URL not set")]
ConsensusClientUrlNotSet,
/// Pylon client URL not set error.
#[error("Pylon client URL not set")]
PylonClientUrlNotSet,
}

impl From<BlobStoreError> for FetchError {
fn from(err: BlobStoreError) -> Self {
match err {
BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx),
_ => FetchError::BlobStore(err),
}
}
}
Loading