Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 12 additions & 166 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ pub mod error;
pub mod events;
pub mod metrics;
pub mod pg_utils;
pub mod processing;
pub mod provider;
pub mod tasks;
pub mod transaction;

pub mod test_utils;
Expand All @@ -22,13 +24,7 @@ pub const STAKING_CONTRACT_ADDRESS: Address =
alloy::primitives::address!("0000000000000000000000000000000000001000");

use std::ops::Range;

use eyre::Result;
use log::{error, info};
use sqlx::PgPool;
use tokio::sync::mpsc;

use crate::events::u256_to_bigdecimal;
use tokio::sync::oneshot;

pub fn chunk_range(range: Range<u64>, chunk_size: u64) -> Vec<Range<u64>> {
let mut chunks = Vec::with_capacity(((range.end - range.start) / chunk_size) as usize);
Expand Down Expand Up @@ -119,171 +115,21 @@ pub enum BackfillWork {
BlockGap(Range<u64>),
TransactionFetch(TransactionFetchRequest),
BlockTipsFetch(BlockTipsFetchRequest),
NoBlockGaps(Range<u64>),
}

pub struct BlockGapsResponse {
pub gaps: Vec<Range<u64>>,
pub checkpoint: u64,
pub max_block: u64,
}

pub enum DbRequest {
InsertCompleteBlocks(Box<BlockBatch>),
InsertTransactions(Vec<transaction::EventTxData>),
InsertBlockTip((u64, U256)),
GetBlockGaps,
GetTransactionGaps,
GetBlockTipsGaps,
}

pub async fn process_db_requests(
pool: PgPool,
mut rx: mpsc::UnboundedReceiver<DbRequest>,
backfill_tx: mpsc::UnboundedSender<BackfillWork>,
validator_ids: Vec<u64>,
metrics_tx: mpsc::UnboundedSender<metrics::Metric>,
db_operation_timeout_secs: u64,
) -> Result<()> {
let timeout = tokio::time::Duration::from_secs(db_operation_timeout_secs);
while let Some(req) = rx.recv().await {
match req {
DbRequest::GetBlockGaps => {
let checkpoint = match db::repository::get_block_sync_checkpoint(&pool).await {
Ok(cp) => cp,
Err(e) => {
error!("Failed to get block sync checkpoint: {}", e);
continue;
}
};

let max_block = match db::repository::get_max_block_number(&pool).await {
Ok(Some(max)) => max,
Ok(None) => {
info!("No blocks in database yet");
continue;
}
Err(e) => {
error!("Failed to get max block number: {}", e);
continue;
}
};

match db::repository::get_block_gaps(&pool, checkpoint).await {
Ok(gaps) => {
if gaps.is_empty() {
info!("No block gaps detected");
if max_block > checkpoint {
match db::repository::update_block_sync_checkpoint(&pool, max_block)
.await
{
Ok(_) => {
info!("Updated block sync checkpoint to {}", max_block)
}
Err(e) => {
error!("Failed to update block sync checkpoint: {}", e)
}
}
}
backfill_tx.send(BackfillWork::NoBlockGaps(checkpoint..max_block))?;
} else {
info!("Detected {} block gap(s)", gaps.len());
for range in gaps {
info!("Queueing block gap for backfill: {:?}", range);
backfill_tx.send(BackfillWork::BlockGap(range))?;
}
}
}
Err(e) => {
error!("Failed to check for gaps: {}", e);
}
};
}
DbRequest::GetTransactionGaps => {
match db::repository::get_missing_transaction_hashes(&pool, &validator_ids).await {
Ok(missing_hashes) => {
if missing_hashes.is_empty() {
info!("No transaction gaps detected");
} else {
info!("Detected {} missing transactions", missing_hashes.len());
let _ = metrics_tx.send(metrics::Metric::TransactionGapsFound(
missing_hashes.len() as u64,
));
for (transaction_hash, block_number, event_type) in missing_hashes {
backfill_tx.send(BackfillWork::TransactionFetch(
TransactionFetchRequest {
transaction_hash,
block_number,
event_type,
},
))?;
}
}
}
Err(e) => {
error!("Failed to check for transaction gaps: {}", e);
}
}
}
DbRequest::GetBlockTipsGaps => {
match db::repository::get_missing_block_tips(&pool, &validator_ids).await {
Ok(missing_blocks) => {
if missing_blocks.is_empty() {
info!("No block tips gaps detected");
} else {
info!("Detected {} missing block tips", missing_blocks.len());
for block_number in missing_blocks {
backfill_tx.send(BackfillWork::BlockTipsFetch(
BlockTipsFetchRequest { block_number },
))?;
}
}
}
Err(e) => {
error!("Failed to check for block tips gaps: {}", e);
}
}
}
DbRequest::InsertCompleteBlocks(blocks) => {
info!("Inserting {} blocks", blocks.block_meta.len(),);

match db::insert_blocks(&pool, &blocks, timeout).await {
Ok(event_counts) => {
let total_inserted: u64 =
event_counts.values().map(|(inserted, _)| inserted).sum();
info!("Successfully inserted {} events", total_inserted);
let _ = metrics_tx.send(metrics::Metric::InsertedEvents(event_counts));
}
Err(db::repository::DbError::Sqlx(sqlx::Error::PoolTimedOut)) => {
error!("Insert operation timed out");
let _ = metrics_tx.send(metrics::Metric::InsertTimeout);
}
Err(e) => {
error!("Failed to insert blocks: {:?}", e);
let _ = metrics_tx.send(metrics::Metric::FailedToInsert);
}
}
}
DbRequest::InsertTransactions(tx_data) => {
info!("Inserting {} transactions", tx_data.len());
match db::insert_transactions(&pool, &tx_data).await {
Ok(inserted) => {
info!("Successfully inserted {} transactions", inserted);
let _ = metrics_tx.send(metrics::Metric::TransactionsFetched(inserted));
}
Err(e) => {
error!("Failed to insert transactions: {:?}", e);
}
}
}
DbRequest::InsertBlockTip((block_height, tip)) => {
info!("Inserting block tip at block {block_height}");
match db::set_block_tip(&pool, block_height, u256_to_bigdecimal(tip)).await {
Ok(()) => {
info!("Successfully inserted block tip");
}
Err(e) => {
error!("Failed to insert block tip: {:?}", e);
}
}
}
}
}
Ok(())
GetBlockGaps(oneshot::Sender<Option<BlockGapsResponse>>),
GetTransactionGaps(oneshot::Sender<Vec<TransactionFetchRequest>>),
GetBlockTipsGaps(oneshot::Sender<Vec<BlockTipsFetchRequest>>),
}

#[cfg(test)]
Expand Down
Loading