diff --git a/backend/indexer/.env.example b/backend/indexer/.env.example new file mode 100644 index 0000000..8ccb5c5 --- /dev/null +++ b/backend/indexer/.env.example @@ -0,0 +1,36 @@ +# ── RPC Providers ──────────────────────────────────────────────────────────── +# Primary Soroban RPC endpoint +RPC_URL=https://soroban-testnet.stellar.org + +# Comma-separated ordered fallback URLs (tried in order when primary is unhealthy) +# RPC_FALLBACK_URLS=https://rpc.ankr.com/stellar_testnet,https://horizon-testnet.stellar.org + +# Seconds a failed provider stays in cool-down before being re-enabled (default: 60) +# RPC_COOLDOWN_SECS=60 + +# ── Contract ────────────────────────────────────────────────────────────────── +# Single contract address +CONTRACT_ID=CXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +# Or comma-separated list for multi-deployment indexing +# CONTRACT_IDS=CXXX...,CYYY... + +# ── Database ────────────────────────────────────────────────────────────────── +DATABASE_URL=sqlite:./pifp_events.db + +# ── Server ──────────────────────────────────────────────────────────────────── +API_PORT=3001 +METRICS_PORT=9090 + +# ── Indexer tuning ──────────────────────────────────────────────────────────── +POLL_INTERVAL_SECS=5 +EVENTS_PER_PAGE=100 +START_LEDGER=0 + +# Optional backfill overrides +# BACKFILL_START_LEDGER= +# BACKFILL_CURSOR= + +# ── Cache (optional) ────────────────────────────────────────────────────────── +# REDIS_URL=redis://localhost:6379 +CACHE_TTL_TOP_PROJECTS_SECS=30 +CACHE_TTL_ACTIVE_PROJECTS_COUNT_SECS=15 diff --git a/backend/indexer/src/config.rs b/backend/indexer/src/config.rs index 3ab9f8d..bf34eeb 100644 --- a/backend/indexer/src/config.rs +++ b/backend/indexer/src/config.rs @@ -4,8 +4,13 @@ use crate::errors::{IndexerError, Result}; #[derive(Debug, Clone)] pub struct Config { - /// Soroban/Horizon RPC endpoint (e.g. https://soroban-testnet.stellar.org) + /// Primary Soroban/Horizon RPC endpoint (e.g. https://soroban-testnet.stellar.org) pub rpc_url: String, + /// Ordered list of fallback RPC URLs tried when the primary is unhealthy. + /// Comma-separated via `RPC_FALLBACK_URLS`. + pub rpc_fallback_urls: Vec, + /// Seconds a failed provider stays in cool-down before being re-enabled. + pub rpc_cooldown_secs: u64, /// PIFP contract addresses (Strkey format). Supports multi-deployment indexing. pub contract_ids: Vec, /// Path to the SQLite database file @@ -39,6 +44,17 @@ impl Config { Ok(Config { rpc_url: env_var("RPC_URL") .unwrap_or_else(|_| "https://soroban-testnet.stellar.org".to_string()), + rpc_fallback_urls: std::env::var("RPC_FALLBACK_URLS") + .unwrap_or_default() + .split(',') + .map(str::trim) + .filter(|v| !v.is_empty()) + .map(ToString::to_string) + .collect(), + rpc_cooldown_secs: env_var("RPC_COOLDOWN_SECS") + .unwrap_or_else(|_| "60".to_string()) + .parse() + .map_err(|_| IndexerError::Config("Invalid RPC_COOLDOWN_SECS".to_string()))?, contract_ids, database_url: env_var("DATABASE_URL") .unwrap_or_else(|_| "sqlite:./pifp_events.db".to_string()), diff --git a/backend/indexer/src/indexer.rs b/backend/indexer/src/indexer.rs index 355bc16..e7418cf 100644 --- a/backend/indexer/src/indexer.rs +++ b/backend/indexer/src/indexer.rs @@ -13,7 +13,7 @@ use crate::cache::Cache; use crate::config::Config; use crate::db; use crate::metrics; -use crate::rpc; +use crate::rpc::{self, ProviderManager}; use crate::webhook; pub struct IndexerState { @@ -21,6 +21,7 @@ pub struct IndexerState { pub config: Config, pub client: Client, pub cache: Option, + pub providers: ProviderManager, } /// Spawn the indexer loop as a background [`tokio`] task. @@ -61,6 +62,7 @@ pub async fn run(state: Arc) { &state.pool, &state.client, &state.config, + &state.providers, state.cache.as_ref(), current_ledger, cursor.as_deref(), @@ -87,14 +89,14 @@ async fn poll_once( pool: &SqlitePool, client: &Client, config: &Config, + providers: &ProviderManager, cache: Option<&Cache>, start_ledger: u32, cursor: Option<&str>, ) -> crate::errors::Result<(u32, Option)> { - // Fetch events from the Stellar RPC — latency is recorded inside fetch_events. let (raw_events, next_cursor, latest_ledger) = rpc::fetch_events( client, - &config.rpc_url, + providers, &config.contract_ids, start_ledger, cursor, diff --git a/backend/indexer/src/main.rs b/backend/indexer/src/main.rs index 48dccb4..74146d4 100644 --- a/backend/indexer/src/main.rs +++ b/backend/indexer/src/main.rs @@ -36,6 +36,7 @@ use tracing_subscriber::EnvFilter; use cache::Cache; use config::Config; use indexer::IndexerState; +use rpc::ProviderManager; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -73,11 +74,17 @@ async fn main() -> anyhow::Result<()> { }); // ─── Background indexer ─────────────────────────────── + let providers = ProviderManager::new( + config.rpc_url.clone(), + config.rpc_fallback_urls.clone(), + config.rpc_cooldown_secs, + ); let indexer_state = Arc::new(IndexerState { pool: pool.clone(), config: config.clone(), client, cache: cache.clone(), + providers, }); tokio::spawn(indexer::run(indexer_state)); diff --git a/backend/indexer/src/rpc.rs b/backend/indexer/src/rpc.rs index 62a5e76..ea2adac 100644 --- a/backend/indexer/src/rpc.rs +++ b/backend/indexer/src/rpc.rs @@ -2,16 +2,20 @@ //! //! ## Resilience //! -//! * Exponential back-off is applied when the RPC returns an error or rate-limit -//! response, up to [`MAX_BACKOFF_SECS`] seconds. -//! * Transient network errors (connection reset, timeout) are retried silently. +//! * [`ProviderManager`] holds a prioritised list of RPC URLs. On a 5xx, +//! 429, or connection error the current provider is marked unhealthy and +//! the next one is tried immediately (zero extra latency on the happy path). +//! * A failed provider re-enters the rotation after `cooldown_secs` (default 60 s). +//! * When **all** providers are unhealthy the call falls back to exponential +//! back-off and logs a critical error. -use std::time::Duration; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; use reqwest::Client; use serde::Deserialize; use serde_json::{json, Value}; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; use crate::errors::{IndexerError, Result}; use crate::events::{EventKind, PifpEvent}; @@ -79,11 +83,79 @@ pub struct RawEvent { pub paging_token: Option, } +// ───────────────────────────────────────────────────────── +// Provider health tracking +// ───────────────────────────────────────────────────────── + +#[derive(Debug)] +struct ProviderState { + url: String, + /// `Some(instant)` means the provider failed at that time and is in cool-down. + failed_at: Option, +} + +/// Thread-safe, prioritised list of RPC providers with automatic cool-down. +/// +/// Clone is cheap — the inner state is behind an `Arc`. +#[derive(Debug, Clone)] +pub struct ProviderManager { + providers: Arc>>, + cooldown: Duration, +} + +impl ProviderManager { + /// Build from a primary URL plus any number of fallbacks. + pub fn new(primary: String, fallbacks: Vec, cooldown_secs: u64) -> Self { + let mut providers: Vec = std::iter::once(primary) + .chain(fallbacks) + .map(|url| ProviderState { url, failed_at: None }) + .collect(); + // Deduplicate while preserving order. + let mut seen = std::collections::HashSet::new(); + providers.retain(|p| seen.insert(p.url.clone())); + + Self { + providers: Arc::new(RwLock::new(providers)), + cooldown: Duration::from_secs(cooldown_secs), + } + } + + /// Return the URL of the first healthy provider, or `None` if all are in cool-down. + pub fn healthy_url(&self) -> Option { + let mut providers = self.providers.write().unwrap(); + let now = Instant::now(); + for p in providers.iter_mut() { + match p.failed_at { + None => return Some(p.url.clone()), + Some(t) if now.duration_since(t) >= self.cooldown => { + // Cool-down expired — re-enable. + p.failed_at = None; + return Some(p.url.clone()); + } + _ => {} + } + } + None + } + + /// Mark the provider with the given URL as unhealthy. + pub fn mark_failed(&self, url: &str) { + let mut providers = self.providers.write().unwrap(); + if let Some(p) = providers.iter_mut().find(|p| p.url == url) { + if p.failed_at.is_none() { + warn!("RPC provider marked unhealthy: {url}"); + p.failed_at = Some(Instant::now()); + } + } + } +} + // ───────────────────────────────────────────────────────── // Public API // ───────────────────────────────────────────────────────── -/// Fetch a page of events from the RPC. +/// Fetch a page of events from the RPC, automatically failing over to the +/// next healthy provider on 5xx / 429 / connection errors. /// /// * `start_ledger` — the ledger sequence to scan from (inclusive). /// * `cursor` — optional opaque pagination cursor from a previous response. @@ -92,7 +164,7 @@ pub struct RawEvent { /// Returns `(events, next_cursor, latest_ledger)`. pub async fn fetch_events( client: &Client, - rpc_url: &str, + providers: &ProviderManager, contract_ids: &[String], start_ledger: u32, cursor: Option<&str>, @@ -102,6 +174,17 @@ pub async fn fetch_events( let mut use_topic_filter = true; loop { + // ── Provider selection ──────────────────────────────────────────────── + let rpc_url = match providers.healthy_url() { + Some(url) => url, + None => { + error!("All RPC providers are unhealthy — retrying in {backoff}s"); + tokio::time::sleep(Duration::from_secs(backoff)).await; + backoff = (backoff * 2).min(MAX_BACKOFF_SECS); + continue; + } + }; + let params = build_params( contract_ids, start_ledger, @@ -114,12 +197,10 @@ pub async fn fetch_events( }, ); - // Start the latency timer before the request; it records on drop, - // so it covers both the network send and the response body parse. let rpc_timer = metrics::RPC_LATENCY.start_timer(); let send_result = client - .post(rpc_url) + .post(&rpc_url) .json(&json!({ "jsonrpc": "2.0", "id": 1, @@ -133,23 +214,22 @@ pub async fn fetch_events( Err(e) => { rpc_timer.stop_and_record(); metrics::RPC_ERRORS_TOTAL.inc(); - warn!("RPC request failed (will retry in {backoff}s): {e}"); - tokio::time::sleep(Duration::from_secs(backoff)).await; - backoff = (backoff * 2).min(MAX_BACKOFF_SECS); + // Connection / timeout errors — mark provider unhealthy and retry immediately. + providers.mark_failed(&rpc_url); + warn!("RPC request failed ({rpc_url}): {e}"); continue; } Ok(resp) => { let status = resp.status(); - if status == reqwest::StatusCode::TOO_MANY_REQUESTS { + + if status == reqwest::StatusCode::TOO_MANY_REQUESTS || status.is_server_error() { rpc_timer.stop_and_record(); metrics::RPC_ERRORS_TOTAL.inc(); - warn!("Rate-limited by RPC (will retry in {backoff}s)"); - tokio::time::sleep(Duration::from_secs(backoff)).await; - backoff = (backoff * 2).min(MAX_BACKOFF_SECS); + providers.mark_failed(&rpc_url); + warn!("RPC provider {rpc_url} returned {status} — switching provider"); continue; } - // Parse the body — timer is still live, covering the full round-trip. let body: RpcResponse = match resp.json().await { Ok(b) => b, Err(e) => { @@ -161,8 +241,6 @@ pub async fn fetch_events( rpc_timer.stop_and_record(); if let Some(err) = body.error { - // Some RPC nodes may reject `topics` filter formats. - // Retry once with contract-only filtering. if err.code == -32602 && use_topic_filter { warn!( "RPC rejected topic filter (code {}), retrying with contract-only filter", @@ -171,7 +249,6 @@ pub async fn fetch_events( use_topic_filter = false; continue; } - // Code -32600 / -32601 are hard failures; everything else we retry. metrics::RPC_ERRORS_TOTAL.inc(); if err.code == -32600 || err.code == -32601 { return Err(IndexerError::EventParse(format!( @@ -193,8 +270,9 @@ pub async fn fetch_events( })?; debug!( - "Fetched {} events (latest_ledger={:?})", + "Fetched {} events via {} (latest_ledger={:?})", result.events.len(), + rpc_url, result.latest_ledger );