Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions backend/indexer/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# ── RPC Providers ────────────────────────────────────────────────────────────
# Primary Soroban RPC endpoint
RPC_URL=https://soroban-testnet.stellar.org

# Comma-separated ordered fallback URLs (tried in order when primary is unhealthy)
# RPC_FALLBACK_URLS=https://rpc.ankr.com/stellar_testnet,https://horizon-testnet.stellar.org

# Seconds a failed provider stays in cool-down before being re-enabled (default: 60)
# RPC_COOLDOWN_SECS=60

# ── Contract ──────────────────────────────────────────────────────────────────
# Single contract address
CONTRACT_ID=CXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# Or comma-separated list for multi-deployment indexing
# CONTRACT_IDS=CXXX...,CYYY...

# ── Database ──────────────────────────────────────────────────────────────────
DATABASE_URL=sqlite:./pifp_events.db

# ── Server ────────────────────────────────────────────────────────────────────
API_PORT=3001
METRICS_PORT=9090

# ── Indexer tuning ────────────────────────────────────────────────────────────
POLL_INTERVAL_SECS=5
EVENTS_PER_PAGE=100
START_LEDGER=0

# Optional backfill overrides
# BACKFILL_START_LEDGER=
# BACKFILL_CURSOR=

# ── Cache (optional) ──────────────────────────────────────────────────────────
# REDIS_URL=redis://localhost:6379
CACHE_TTL_TOP_PROJECTS_SECS=30
CACHE_TTL_ACTIVE_PROJECTS_COUNT_SECS=15
18 changes: 17 additions & 1 deletion backend/indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ use crate::errors::{IndexerError, Result};

#[derive(Debug, Clone)]
pub struct Config {
/// Soroban/Horizon RPC endpoint (e.g. https://soroban-testnet.stellar.org)
/// Primary Soroban/Horizon RPC endpoint (e.g. https://soroban-testnet.stellar.org)
pub rpc_url: String,
/// Ordered list of fallback RPC URLs tried when the primary is unhealthy.
/// Comma-separated via `RPC_FALLBACK_URLS`.
pub rpc_fallback_urls: Vec<String>,
/// Seconds a failed provider stays in cool-down before being re-enabled.
pub rpc_cooldown_secs: u64,
/// PIFP contract addresses (Strkey format). Supports multi-deployment indexing.
pub contract_ids: Vec<String>,
/// Path to the SQLite database file
Expand Down Expand Up @@ -39,6 +44,17 @@ impl Config {
Ok(Config {
rpc_url: env_var("RPC_URL")
.unwrap_or_else(|_| "https://soroban-testnet.stellar.org".to_string()),
rpc_fallback_urls: std::env::var("RPC_FALLBACK_URLS")
.unwrap_or_default()
.split(',')
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string)
.collect(),
rpc_cooldown_secs: env_var("RPC_COOLDOWN_SECS")
.unwrap_or_else(|_| "60".to_string())
.parse()
.map_err(|_| IndexerError::Config("Invalid RPC_COOLDOWN_SECS".to_string()))?,
contract_ids,
database_url: env_var("DATABASE_URL")
.unwrap_or_else(|_| "sqlite:./pifp_events.db".to_string()),
Expand Down
8 changes: 5 additions & 3 deletions backend/indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ use crate::cache::Cache;
use crate::config::Config;
use crate::db;
use crate::metrics;
use crate::rpc;
use crate::rpc::{self, ProviderManager};
use crate::webhook;

pub struct IndexerState {
pub pool: SqlitePool,
pub config: Config,
pub client: Client,
pub cache: Option<Cache>,
pub providers: ProviderManager,
}

/// Spawn the indexer loop as a background [`tokio`] task.
Expand Down Expand Up @@ -61,6 +62,7 @@ pub async fn run(state: Arc<IndexerState>) {
&state.pool,
&state.client,
&state.config,
&state.providers,
state.cache.as_ref(),
current_ledger,
cursor.as_deref(),
Expand All @@ -87,14 +89,14 @@ async fn poll_once(
pool: &SqlitePool,
client: &Client,
config: &Config,
providers: &ProviderManager,
cache: Option<&Cache>,
start_ledger: u32,
cursor: Option<&str>,
) -> crate::errors::Result<(u32, Option<String>)> {
// Fetch events from the Stellar RPC — latency is recorded inside fetch_events.
let (raw_events, next_cursor, latest_ledger) = rpc::fetch_events(
client,
&config.rpc_url,
providers,
&config.contract_ids,
start_ledger,
cursor,
Expand Down
7 changes: 7 additions & 0 deletions backend/indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tracing_subscriber::EnvFilter;
use cache::Cache;
use config::Config;
use indexer::IndexerState;
use rpc::ProviderManager;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -73,11 +74,17 @@ async fn main() -> anyhow::Result<()> {
});

// ─── Background indexer ───────────────────────────────
let providers = ProviderManager::new(
config.rpc_url.clone(),
config.rpc_fallback_urls.clone(),
config.rpc_cooldown_secs,
);
let indexer_state = Arc::new(IndexerState {
pool: pool.clone(),
config: config.clone(),
client,
cache: cache.clone(),
providers,
});
tokio::spawn(indexer::run(indexer_state));

Expand Down
122 changes: 100 additions & 22 deletions backend/indexer/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
//!
//! ## Resilience
//!
//! * Exponential back-off is applied when the RPC returns an error or rate-limit
//! response, up to [`MAX_BACKOFF_SECS`] seconds.
//! * Transient network errors (connection reset, timeout) are retried silently.
//! * [`ProviderManager`] holds a prioritised list of RPC URLs. On a 5xx,
//! 429, or connection error the current provider is marked unhealthy and
//! the next one is tried immediately (zero extra latency on the happy path).
//! * A failed provider re-enters the rotation after `cooldown_secs` (default 60 s).
//! * When **all** providers are unhealthy the call falls back to exponential
//! back-off and logs a critical error.

use std::time::Duration;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use reqwest::Client;
use serde::Deserialize;
use serde_json::{json, Value};
use tracing::{debug, warn};
use tracing::{debug, error, warn};

use crate::errors::{IndexerError, Result};
use crate::events::{EventKind, PifpEvent};
Expand Down Expand Up @@ -79,11 +83,79 @@ pub struct RawEvent {
pub paging_token: Option<String>,
}

// ─────────────────────────────────────────────────────────
// Provider health tracking
// ─────────────────────────────────────────────────────────

#[derive(Debug)]
struct ProviderState {
url: String,
/// `Some(instant)` means the provider failed at that time and is in cool-down.
failed_at: Option<Instant>,
}

/// Thread-safe, prioritised list of RPC providers with automatic cool-down.
///
/// Clone is cheap — the inner state is behind an `Arc`.
#[derive(Debug, Clone)]
pub struct ProviderManager {
providers: Arc<RwLock<Vec<ProviderState>>>,
cooldown: Duration,
}

impl ProviderManager {
/// Build from a primary URL plus any number of fallbacks.
pub fn new(primary: String, fallbacks: Vec<String>, cooldown_secs: u64) -> Self {
let mut providers: Vec<ProviderState> = std::iter::once(primary)
.chain(fallbacks)
.map(|url| ProviderState { url, failed_at: None })
.collect();
// Deduplicate while preserving order.
let mut seen = std::collections::HashSet::new();
providers.retain(|p| seen.insert(p.url.clone()));

Self {
providers: Arc::new(RwLock::new(providers)),
cooldown: Duration::from_secs(cooldown_secs),
}
}

/// Return the URL of the first healthy provider, or `None` if all are in cool-down.
pub fn healthy_url(&self) -> Option<String> {
let mut providers = self.providers.write().unwrap();
let now = Instant::now();
for p in providers.iter_mut() {
match p.failed_at {
None => return Some(p.url.clone()),
Some(t) if now.duration_since(t) >= self.cooldown => {
// Cool-down expired — re-enable.
p.failed_at = None;
return Some(p.url.clone());
}
_ => {}
}
}
None
}

/// Mark the provider with the given URL as unhealthy.
pub fn mark_failed(&self, url: &str) {
let mut providers = self.providers.write().unwrap();
if let Some(p) = providers.iter_mut().find(|p| p.url == url) {
if p.failed_at.is_none() {
warn!("RPC provider marked unhealthy: {url}");
p.failed_at = Some(Instant::now());
}
}
}
}

// ─────────────────────────────────────────────────────────
// Public API
// ─────────────────────────────────────────────────────────

/// Fetch a page of events from the RPC.
/// Fetch a page of events from the RPC, automatically failing over to the
/// next healthy provider on 5xx / 429 / connection errors.
///
/// * `start_ledger` — the ledger sequence to scan from (inclusive).
/// * `cursor` — optional opaque pagination cursor from a previous response.
Expand All @@ -92,7 +164,7 @@ pub struct RawEvent {
/// Returns `(events, next_cursor, latest_ledger)`.
pub async fn fetch_events(
client: &Client,
rpc_url: &str,
providers: &ProviderManager,
contract_ids: &[String],
start_ledger: u32,
cursor: Option<&str>,
Expand All @@ -102,6 +174,17 @@ pub async fn fetch_events(
let mut use_topic_filter = true;

loop {
// ── Provider selection ────────────────────────────────────────────────
let rpc_url = match providers.healthy_url() {
Some(url) => url,
None => {
error!("All RPC providers are unhealthy — retrying in {backoff}s");
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
continue;
}
};

let params = build_params(
contract_ids,
start_ledger,
Expand All @@ -114,12 +197,10 @@ pub async fn fetch_events(
},
);

// Start the latency timer before the request; it records on drop,
// so it covers both the network send and the response body parse.
let rpc_timer = metrics::RPC_LATENCY.start_timer();

let send_result = client
.post(rpc_url)
.post(&rpc_url)
.json(&json!({
"jsonrpc": "2.0",
"id": 1,
Expand All @@ -133,23 +214,22 @@ pub async fn fetch_events(
Err(e) => {
rpc_timer.stop_and_record();
metrics::RPC_ERRORS_TOTAL.inc();
warn!("RPC request failed (will retry in {backoff}s): {e}");
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
// Connection / timeout errors — mark provider unhealthy and retry immediately.
providers.mark_failed(&rpc_url);
warn!("RPC request failed ({rpc_url}): {e}");
continue;
}
Ok(resp) => {
let status = resp.status();
if status == reqwest::StatusCode::TOO_MANY_REQUESTS {

if status == reqwest::StatusCode::TOO_MANY_REQUESTS || status.is_server_error() {
rpc_timer.stop_and_record();
metrics::RPC_ERRORS_TOTAL.inc();
warn!("Rate-limited by RPC (will retry in {backoff}s)");
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
providers.mark_failed(&rpc_url);
warn!("RPC provider {rpc_url} returned {status} — switching provider");
continue;
}

// Parse the body — timer is still live, covering the full round-trip.
let body: RpcResponse = match resp.json().await {
Ok(b) => b,
Err(e) => {
Expand All @@ -161,8 +241,6 @@ pub async fn fetch_events(
rpc_timer.stop_and_record();

if let Some(err) = body.error {
// Some RPC nodes may reject `topics` filter formats.
// Retry once with contract-only filtering.
if err.code == -32602 && use_topic_filter {
warn!(
"RPC rejected topic filter (code {}), retrying with contract-only filter",
Expand All @@ -171,7 +249,6 @@ pub async fn fetch_events(
use_topic_filter = false;
continue;
}
// Code -32600 / -32601 are hard failures; everything else we retry.
metrics::RPC_ERRORS_TOTAL.inc();
if err.code == -32600 || err.code == -32601 {
return Err(IndexerError::EventParse(format!(
Expand All @@ -193,8 +270,9 @@ pub async fn fetch_events(
})?;

debug!(
"Fetched {} events (latest_ledger={:?})",
"Fetched {} events via {} (latest_ledger={:?})",
result.events.len(),
rpc_url,
result.latest_ledger
);

Expand Down