Skip to content

feat: add timeouts to prevent indexer hang during sync operations #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/slots_processor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ pub enum SlotProcessingError {
ClientError(#[from] crate::clients::common::ClientError),
#[error(transparent)]
Provider(#[from] alloy::transports::TransportError),
#[error("Operation timed out: {operation} for slot {slot}")]
OperationTimeout {
operation: String,
slot: u32,
},
#[error(transparent)]
Other(#[from] anyhow::Error),
}
Expand Down
69 changes: 59 additions & 10 deletions src/slots_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
primitives::B256, rpc::types::BlockTransactionsKind, transports::http::ReqwestTransport,
};
use anyhow::{anyhow, Context as AnyhowContext, Result};
use futures::future::timeout;
use std::time::Duration;

use crate::clients::beacon::types::BlockHeader;
use tracing::{debug, info, Instrument};
use tracing::{debug, info, Instrument, warn};

use crate::{
clients::{
Expand Down Expand Up @@ -42,6 +44,8 @@
pub struct SlotsProcessor<T> {
context: Box<dyn CommonContext<T>>,
pub last_processed_block: Option<BlockHeader>,
// Timeout for network operations in seconds
request_timeout: u64,
}

impl SlotsProcessor<ReqwestTransport> {
Expand All @@ -52,6 +56,19 @@
Self {
context,
last_processed_block,
request_timeout: 30,
}
}

pub fn with_timeout(
context: Box<dyn CommonContext<ReqwestTransport>>,
last_processed_block: Option<BlockHeader>,
timeout_secs: u64,
) -> SlotsProcessor<ReqwestTransport> {
SlotsProcessor {
context,
last_processed_block,
request_timeout: timeout_secs,
}
}

Expand All @@ -70,11 +87,16 @@
let mut last_processed_block = self.last_processed_block.clone();

for current_slot in slots {
let block_header = match self
.context
.beacon_client()
.get_block_header(current_slot.into())
.await?
let block_header_result = timeout(
Duration::from_secs(self.request_timeout),
self.context.beacon_client().get_block_header(current_slot.into())
).await
.map_err(|_| SlotProcessingError::OperationTimeout {
operation: "get_block_header".to_string(),
slot: current_slot,
})??;

let block_header = match block_header_result
{
Some(header) => header,
None => {
Expand All @@ -97,9 +119,18 @@
"Reorg detected!",
);

self.process_reorg(&prev_block_header, &block_header)
.await
.map_err(|error| SlotsProcessorError::FailedReorgProcessing {
timeout(
Duration::from_secs(self.request_timeout * 2),
self.process_reorg(&prev_block_header, &block_header)
).await
.map_err(|_| SlotsProcessorError::FailedReorgProcessing {
old_slot: prev_block_header.slot,
new_slot: block_header.slot,
new_head_block_root: block_header.root,
old_head_block_root: prev_block_header.root,
error: anyhow!("Operation timed out: process_reorg"),
})??
.map_err(|error| SlotsProcessorError::FailedReorgProcessing {
old_slot: prev_block_header.slot,
new_slot: block_header.slot,
new_head_block_root: block_header.root,
Expand All @@ -110,7 +141,25 @@
}
}

if let Err(error) = self.process_block(&block_header).await {
let process_block_result = timeout(
Duration::from_secs(self.request_timeout * 2),
self.process_block(&block_header)
).await;

if let Err(timeout_error) = process_block_result {
warn!(slot = current_slot, "Process block operation timed out");
return Err(SlotsProcessorError::FailedSlotsProcessing {
initial_slot,
final_slot,
failed_slot: current_slot,
error: SlotProcessingError::OperationTimeout {
operation: "process_block".to_string(),
slot: current_slot,
},
});
}

if let Err(error) = process_block_result.unwrap() {
return Err(SlotsProcessorError::FailedSlotsProcessing {
initial_slot,
final_slot,
Expand Down Expand Up @@ -267,7 +316,7 @@
let mut current_old_slot = old_head_header.slot;
let mut reorg_depth = 0;

let mut rewinded_blocks: Vec<B256> = vec![];

Check warning on line 319 in src/slots_processor/mod.rs

View workflow job for this annotation

GitHub Actions / Run tests

useless use of `format!`

warning: useless use of `format!` --> src/slots_processor/mod.rs:319:50 | 319 | ... .with_context(|| format!("Failed to sync forwarded block"))?; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using `.to_string()`: `"Failed to sync forwarded block".to_string()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_format = note: `#[warn(clippy::useless_format)]` on by default

while reorg_depth <= MAX_ALLOWED_REORG_DEPTH && current_old_slot > 0 {
// We iterate over blocks by slot and not block root as blobscan blocks don't
Expand Down
5 changes: 5 additions & 0 deletions src/synchronizer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ pub enum SynchronizerError {
slot: u32,
error: crate::clients::common::ClientError,
},
#[error("Operation timed out: {operation} for block_id {block_id}")]
OperationTimeout {
operation: String,
block_id: String,
},
#[error(transparent)]
Other(#[from] anyhow::Error),
}
Expand Down
105 changes: 77 additions & 28 deletions src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::fmt::Debug;
use std::time::Duration;

use alloy::transports::http::ReqwestTransport;
use anyhow::anyhow;
use async_trait::async_trait;
use futures::future::join_all;
use futures::future::{join_all, timeout};
use tokio::task::JoinHandle;
use tracing::{debug, info, Instrument};
use tracing::{debug, info, warn, Instrument};

#[cfg(test)]
use mockall::automock;
Expand Down Expand Up @@ -52,6 +53,8 @@ pub struct Synchronizer<T> {
slots_checkpoint: u32,
checkpoint_type: CheckpointType,
last_synced_block: Option<BlockHeader>,
// Timeout for network operations in seconds
request_timeout: u64,
}

#[derive(Clone, Copy, Debug, PartialEq)]
Expand All @@ -69,6 +72,7 @@ impl Default for SynchronizerBuilder {
slots_checkpoint: 1000,
checkpoint_type: CheckpointType::Upper,
last_synced_block: None,
request_timeout: 30,
}
}
}
Expand Down Expand Up @@ -101,6 +105,11 @@ impl SynchronizerBuilder {
self
}

pub fn with_request_timeout(&mut self, timeout_secs: u64) -> &mut Self {
self.request_timeout = timeout_secs;
self
}

pub fn build(
&self,
context: Box<dyn CommonContext<ReqwestTransport>>,
Expand All @@ -112,6 +121,7 @@ impl SynchronizerBuilder {
slots_checkpoint: self.slots_checkpoint,
checkpoint_type: self.checkpoint_type,
last_synced_block: self.last_synced_block.clone(),
request_timeout: self.request_timeout,
}
}
}
Expand Down Expand Up @@ -251,9 +261,14 @@ impl Synchronizer<ReqwestTransport> {
checkpoint_final_slot = final_chunk_slot
);

self.process_slots(initial_chunk_slot, final_chunk_slot)
.instrument(sync_slots_chunk_span)
.await?;
timeout(
Duration::from_secs(self.request_timeout * 3),
self.process_slots(initial_chunk_slot, final_chunk_slot).instrument(sync_slots_chunk_span)
).await
.map_err(|_| SynchronizerError::OperationTimeout {
operation: "process_slots".to_string(),
block_id: format!("from {} to {}", initial_chunk_slot, final_chunk_slot),
})??;

let last_slot = Some(if is_reverse_sync {
final_chunk_slot + 1
Expand All @@ -277,17 +292,24 @@ impl Synchronizer<ReqwestTransport> {
self.last_synced_block.as_ref().map(|block| block.slot);
}

if let Err(error) = self
.context
.blobscan_client()
.update_sync_state(BlockchainSyncState {
last_finalized_block: None,
last_lower_synced_slot,
last_upper_synced_slot,
last_upper_synced_block_root,
last_upper_synced_block_slot,
})
.await
let update_result = timeout(
Duration::from_secs(self.request_timeout),
self.context
.blobscan_client()
.update_sync_state(BlockchainSyncState {
last_finalized_block: None,
last_lower_synced_slot,
last_upper_synced_slot,
last_upper_synced_block_root,
last_upper_synced_block_slot,
})
).await
.map_err(|_| SynchronizerError::OperationTimeout {
operation: "update_sync_state".to_string(),
block_id: format!("slot {}", last_lower_synced_slot.or(last_upper_synced_slot).unwrap_or(0)),
})?;

if let Err(error) = update_result
{
let new_synced_slot = match last_lower_synced_slot.or(last_upper_synced_slot) {
Some(slot) => slot,
Expand Down Expand Up @@ -339,12 +361,26 @@ impl CommonSynchronizer for Synchronizer<ReqwestTransport> {
}

async fn sync_block(&mut self, block_id: BlockId) -> Result<(), SynchronizerError> {
let final_slot = block_id
.resolve_to_slot(self.context.beacon_client())
.await?;

self.process_slots_by_checkpoints(final_slot, final_slot + 1)
.await?;
let slot_resolution = timeout(
Duration::from_secs(self.request_timeout),
block_id.resolve_to_slot(self.context.beacon_client())
).await
.map_err(|_| SynchronizerError::OperationTimeout {
operation: "resolve_to_slot".to_string(),
block_id: format!("{:?}", block_id),
})??
;

let final_slot = slot_resolution;

timeout(
Duration::from_secs(self.request_timeout * 2),
self.process_slots_by_checkpoints(final_slot, final_slot + 1)
).await
.map_err(|_| SynchronizerError::OperationTimeout {
operation: "process_slots_by_checkpoints".to_string(),
block_id: format!("{}", final_slot),
})??;

Ok(())
}
Expand All @@ -354,12 +390,25 @@ impl CommonSynchronizer for Synchronizer<ReqwestTransport> {
initial_block_id: BlockId,
final_block_id: BlockId,
) -> Result<(), SynchronizerError> {
let initial_slot = initial_block_id
.resolve_to_slot(self.context.beacon_client())
.await?;
let mut final_slot = final_block_id
.resolve_to_slot(self.context.beacon_client())
.await?;
let initial_slot = timeout(
Duration::from_secs(self.request_timeout),
initial_block_id.resolve_to_slot(self.context.beacon_client())
).await
.map_err(|_| SynchronizerError::OperationTimeout {
operation: "resolve_to_slot (initial)".to_string(),
block_id: format!("{:?}", initial_block_id),
})??
;

let mut final_slot = timeout(
Duration::from_secs(self.request_timeout),
final_block_id.resolve_to_slot(self.context.beacon_client())
).await
.map_err(|_| SynchronizerError::OperationTimeout {
operation: "resolve_to_slot (final)".to_string(),
block_id: format!("{:?}", final_block_id),
})??
;

if initial_slot == final_slot {
return Ok(());
Expand Down
Loading