diff --git a/core/src/lib.rs b/core/src/lib.rs index a36a823..bb2d94b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,2 +1,3 @@ pub mod parser; +pub mod rpc_provider; pub mod simulation; diff --git a/core/src/main.rs b/core/src/main.rs index ae707a0..42f79e2 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -2,9 +2,12 @@ mod auth; mod benchmarks; mod errors; mod parser; +pub mod rpc_provider; mod simulation; use crate::errors::AppError; +use crate::rpc_provider::{ProviderRegistry, RpcProvider}; +use crate::network_config::NetworkConfig; use crate::simulation::{SimulationCache, SimulationEngine, SimulationResult}; use axum::{ extract::{Json, State}, @@ -30,6 +33,8 @@ use utoipa_swagger_ui::SwaggerUi; struct AppConfig { server_port: u16, rust_log: String, + /// Primary RPC URL — used as a single-provider fallback when + /// `RPC_PROVIDERS` is not set. soroban_rpc_url: String, jwt_secret: String, network_passphrase: String, @@ -37,6 +42,23 @@ struct AppConfig { /// Unused in the MVP in-memory implementation — present so the config /// surface is stable when Redis is wired in. redis_url: String, + /// JSON-encoded array of RPC provider objects. Example: + /// ```json + /// [ + /// {"name":"stellar-testnet","url":"https://soroban-testnet.stellar.org"}, + /// {"name":"blockdaemon","url":"https://soroban.blockdaemon.com","auth_header":"X-API-Key","auth_value":"KEY"} + /// ] + /// ``` + /// When empty or absent the engine falls back to `soroban_rpc_url`. + #[serde(default)] + rpc_providers: String, + /// Health-check interval in seconds (default 30). + #[serde(default = "default_health_check_interval")] + health_check_interval_secs: u64, +} + +fn default_health_check_interval() -> u64 { + 30 } fn load_config() -> Result { @@ -50,11 +72,45 @@ fn load_config() -> Result { .set_default("jwt_secret", "dev-secret-change-in-production")? .set_default("network_passphrase", "Test SDF Network ; September 2015")? .set_default("redis_url", "redis://127.0.0.1:6379")? + .set_default("rpc_providers", "")? + .set_default("health_check_interval_secs", 30)? .build()?; settings.try_deserialize() } +/// Parse the `RPC_PROVIDERS` env var (JSON array) or fall back to wrapping the +/// single `SOROBAN_RPC_URL` into a one-element provider list. +fn build_providers(config: &AppConfig) -> Vec { + if !config.rpc_providers.is_empty() { + match serde_json::from_str::>(&config.rpc_providers) { + Ok(providers) if !providers.is_empty() => { + tracing::info!( + count = providers.len(), + "Loaded RPC providers from RPC_PROVIDERS" + ); + return providers; + } + Ok(_) => { + tracing::warn!("RPC_PROVIDERS is empty array, falling back to SOROBAN_RPC_URL"); + } + Err(e) => { + tracing::warn!( + error = %e, + "Failed to parse RPC_PROVIDERS, falling back to SOROBAN_RPC_URL" + ); + } + } + } + + vec![RpcProvider { + name: "default".to_string(), + url: config.soroban_rpc_url.clone(), + auth_header: None, + auth_value: None, + }] +} + /// Shared application state injected into every Axum handler via [`State`]. struct AppState { #[allow(dead_code)] // will be used when RPC simulation is wired into analyze handler @@ -264,8 +320,23 @@ async fn main() { "SEP-10 server account: {}", auth_state.server_stellar_address() ); + // ── Multi-node RPC setup ──────────────────────────────────────────── + let providers = build_providers(&config); + let provider_names: Vec<&str> = providers.iter().map(|p| p.name.as_str()).collect(); + tracing::info!(providers = ?provider_names, "RPC provider pool"); + + let registry = ProviderRegistry::new(providers); + + // Spawn background health checker. + let health_interval = std::time::Duration::from_secs(config.health_check_interval_secs); + let _health_handle = registry.spawn_health_checker(health_interval); + tracing::info!( + interval_secs = config.health_check_interval_secs, + "Background RPC health checker started" + ); + let app_state = Arc::new(AppState { - engine: SimulationEngine::new(config.soroban_rpc_url.clone()), + engine: SimulationEngine::with_registry(Arc::clone(®istry)), cache: SimulationCache::new(), }); diff --git a/core/src/rpc_provider.rs b/core/src/rpc_provider.rs new file mode 100644 index 0000000..06fbc0d --- /dev/null +++ b/core/src/rpc_provider.rs @@ -0,0 +1,360 @@ +use reqwest::Client; +use serde::Deserialize; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; + +// ── Configuration constants ─────────────────────────────────────────────────── + +/// Number of consecutive health-check failures before a provider is tripped. +const CIRCUIT_BREAKER_THRESHOLD: u64 = 3; + +/// How long a tripped provider is excluded from the pool. +const CIRCUIT_BREAKER_COOLDOWN: Duration = Duration::from_secs(5 * 60); // 5 minutes + +/// Timeout for the lightweight `getLatestLedger` health probe. +const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(10); + +// ── Types ───────────────────────────────────────────────────────────────────── + +/// A single Soroban RPC endpoint with optional authentication. +#[derive(Debug, Clone, Deserialize)] +pub struct RpcProvider { + /// Human-readable label (e.g. "stellar-testnet", "blockdaemon-mainnet"). + pub name: String, + /// Full JSON-RPC URL. + pub url: String, + /// Optional authentication header name (e.g. "Authorization", "X-API-Key"). + #[serde(default)] + pub auth_header: Option, + /// Optional authentication header value (e.g. "Bearer ", ""). + #[serde(default)] + pub auth_value: Option, +} + +/// Runtime health state for a single provider. +#[derive(Debug)] +struct ProviderState { + provider: RpcProvider, + /// Rolling count of consecutive failures (reset on success). + consecutive_failures: AtomicU64, + /// When the circuit breaker was tripped (None = healthy). + tripped_at: RwLock>, + /// Latest ledger number returned by the last successful health check. + latest_ledger: AtomicU64, +} + +/// Thread-safe registry that tracks provider health and drives failover. +pub struct ProviderRegistry { + states: Vec>, + client: Client, +} + +impl ProviderRegistry { + /// Build a registry from a prioritized list of providers. + /// + /// The order matters: the first provider is preferred when healthy. + pub fn new(providers: Vec) -> Arc { + let states = providers + .into_iter() + .map(|p| { + Arc::new(ProviderState { + provider: p, + consecutive_failures: AtomicU64::new(0), + tripped_at: RwLock::new(None), + latest_ledger: AtomicU64::new(0), + }) + }) + .collect(); + + Arc::new(Self { + states, + client: Client::new(), + }) + } + + /// Return the list of providers that are currently available for requests, + /// in priority order (skipping tripped providers whose cooldown hasn't elapsed). + pub async fn healthy_providers(&self) -> Vec<&RpcProvider> { + let mut available = Vec::new(); + for state in &self.states { + if self.is_available(state).await { + available.push(&state.provider); + } + } + available + } + + /// Report a successful request to `url`. Resets the failure counter and + /// clears any active trip. + pub async fn report_success(&self, url: &str) { + if let Some(state) = self.find_by_url(url) { + state.consecutive_failures.store(0, Ordering::Relaxed); + let mut tripped = state.tripped_at.write().await; + *tripped = None; + } + } + + /// Report a failed request to `url`. Increments the failure counter and + /// trips the circuit breaker when the threshold is reached. + pub async fn report_failure(&self, url: &str) { + if let Some(state) = self.find_by_url(url) { + let prev = state.consecutive_failures.fetch_add(1, Ordering::Relaxed); + if prev + 1 >= CIRCUIT_BREAKER_THRESHOLD { + let mut tripped = state.tripped_at.write().await; + if tripped.is_none() { + tracing::warn!( + provider = %state.provider.name, + url = %state.provider.url, + failures = prev + 1, + "Circuit breaker TRIPPED — provider excluded for {:?}", + CIRCUIT_BREAKER_COOLDOWN + ); + } + *tripped = Some(Instant::now()); + } + } + } + + /// Determine whether a request to `url` should be retried on the next + /// provider. Returns `true` for timeouts, HTTP 429, and 5xx status codes. + pub fn is_retryable_status(status: u16) -> bool { + status == 429 || status >= 500 + } + + // ── Background health checker ───────────────────────────────────────── + + /// Spawn a background Tokio task that periodically probes every provider + /// with `getLatestLedger`. + pub fn spawn_health_checker( + self: &Arc, + interval: Duration, + ) -> tokio::task::JoinHandle<()> { + let registry = Arc::clone(self); + tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval); + loop { + ticker.tick().await; + registry.run_health_checks().await; + } + }) + } + + /// Execute a single round of health checks against all providers. + async fn run_health_checks(&self) { + for state in &self.states { + let result = self.probe_provider(state).await; + match result { + Ok(ledger) => { + state.latest_ledger.store(ledger, Ordering::Relaxed); + state.consecutive_failures.store(0, Ordering::Relaxed); + let mut tripped = state.tripped_at.write().await; + *tripped = None; + tracing::debug!( + provider = %state.provider.name, + latest_ledger = ledger, + "Health check OK" + ); + } + Err(e) => { + let prev = state.consecutive_failures.fetch_add(1, Ordering::Relaxed); + tracing::warn!( + provider = %state.provider.name, + consecutive_failures = prev + 1, + error = %e, + "Health check FAILED" + ); + if prev + 1 >= CIRCUIT_BREAKER_THRESHOLD { + let mut tripped = state.tripped_at.write().await; + if tripped.is_none() { + tracing::warn!( + provider = %state.provider.name, + "Circuit breaker TRIPPED by health checker" + ); + } + *tripped = Some(Instant::now()); + } + } + } + } + } + + /// Call `getLatestLedger` on a single provider. Returns the ledger + /// sequence number on success. + async fn probe_provider(&self, state: &ProviderState) -> Result { + let body = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "getLatestLedger", + "params": null + }); + + let mut req = self.client.post(&state.provider.url).json(&body); + + // Attach provider-specific auth header if configured. + if let (Some(header), Some(value)) = + (&state.provider.auth_header, &state.provider.auth_value) + { + req = req.header(header.as_str(), value.as_str()); + } + + let response = tokio::time::timeout(HEALTH_CHECK_TIMEOUT, req.send()) + .await + .map_err(|_| "timeout".to_string())? + .map_err(|e| format!("request error: {e}"))?; + + if !response.status().is_success() { + return Err(format!("HTTP {}", response.status().as_u16())); + } + + let json: serde_json::Value = response + .json() + .await + .map_err(|e| format!("parse error: {e}"))?; + + json["result"]["sequence"] + .as_u64() + .ok_or_else(|| "missing sequence in response".to_string()) + } + + // ── Internal helpers ────────────────────────────────────────────────── + + fn find_by_url(&self, url: &str) -> Option<&Arc> { + self.states.iter().find(|s| s.provider.url == url) + } + + async fn is_available(&self, state: &ProviderState) -> bool { + let tripped = state.tripped_at.read().await; + match *tripped { + None => true, + Some(when) => when.elapsed() >= CIRCUIT_BREAKER_COOLDOWN, + } + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn make_provider(name: &str, url: &str) -> RpcProvider { + RpcProvider { + name: name.to_string(), + url: url.to_string(), + auth_header: None, + auth_value: None, + } + } + + fn make_provider_with_auth(name: &str, url: &str) -> RpcProvider { + RpcProvider { + name: name.to_string(), + url: url.to_string(), + auth_header: Some("X-API-Key".to_string()), + auth_value: Some("secret-key-123".to_string()), + } + } + + #[tokio::test] + async fn test_all_providers_healthy_initially() { + let registry = ProviderRegistry::new(vec![ + make_provider("a", "http://a.test"), + make_provider("b", "http://b.test"), + ]); + let healthy = registry.healthy_providers().await; + assert_eq!(healthy.len(), 2); + assert_eq!(healthy[0].url, "http://a.test"); + assert_eq!(healthy[1].url, "http://b.test"); + } + + #[tokio::test] + async fn test_circuit_breaker_trips_after_threshold() { + let registry = ProviderRegistry::new(vec![ + make_provider("a", "http://a.test"), + make_provider("b", "http://b.test"), + ]); + + // Simulate 3 consecutive failures on provider "a" + for _ in 0..CIRCUIT_BREAKER_THRESHOLD { + registry.report_failure("http://a.test").await; + } + + let healthy = registry.healthy_providers().await; + assert_eq!(healthy.len(), 1); + assert_eq!(healthy[0].url, "http://b.test"); + } + + #[tokio::test] + async fn test_success_resets_failure_counter() { + let registry = ProviderRegistry::new(vec![make_provider("a", "http://a.test")]); + + // Two failures, then a success + registry.report_failure("http://a.test").await; + registry.report_failure("http://a.test").await; + registry.report_success("http://a.test").await; + + // Should still be healthy (counter reset before threshold) + let healthy = registry.healthy_providers().await; + assert_eq!(healthy.len(), 1); + } + + #[tokio::test] + async fn test_success_clears_tripped_state() { + let registry = ProviderRegistry::new(vec![make_provider("a", "http://a.test")]); + + // Trip the breaker + for _ in 0..CIRCUIT_BREAKER_THRESHOLD { + registry.report_failure("http://a.test").await; + } + assert_eq!(registry.healthy_providers().await.len(), 0); + + // Report success (simulating health check recovery) + registry.report_success("http://a.test").await; + assert_eq!(registry.healthy_providers().await.len(), 1); + } + + #[test] + fn test_is_retryable_status() { + assert!(ProviderRegistry::is_retryable_status(429)); + assert!(ProviderRegistry::is_retryable_status(500)); + assert!(ProviderRegistry::is_retryable_status(502)); + assert!(ProviderRegistry::is_retryable_status(503)); + assert!(!ProviderRegistry::is_retryable_status(200)); + assert!(!ProviderRegistry::is_retryable_status(400)); + assert!(!ProviderRegistry::is_retryable_status(404)); + } + + #[tokio::test] + async fn test_report_failure_unknown_url_is_noop() { + let registry = ProviderRegistry::new(vec![make_provider("a", "http://a.test")]); + registry.report_failure("http://unknown.test").await; + assert_eq!(registry.healthy_providers().await.len(), 1); + } + + #[tokio::test] + async fn test_provider_with_auth_headers() { + let provider = make_provider_with_auth("authed", "http://authed.test"); + assert_eq!(provider.auth_header.as_deref(), Some("X-API-Key")); + assert_eq!(provider.auth_value.as_deref(), Some("secret-key-123")); + + let registry = ProviderRegistry::new(vec![provider]); + let healthy = registry.healthy_providers().await; + assert_eq!(healthy.len(), 1); + assert_eq!(healthy[0].auth_header.as_deref(), Some("X-API-Key")); + } + + #[tokio::test] + async fn test_priority_order_preserved() { + let registry = ProviderRegistry::new(vec![ + make_provider("primary", "http://primary.test"), + make_provider("secondary", "http://secondary.test"), + make_provider("tertiary", "http://tertiary.test"), + ]); + let healthy = registry.healthy_providers().await; + assert_eq!(healthy[0].name, "primary"); + assert_eq!(healthy[1].name, "secondary"); + assert_eq!(healthy[2].name, "tertiary"); + } +} diff --git a/core/src/simulation.rs b/core/src/simulation.rs index 502ab93..c76448f 100644 --- a/core/src/simulation.rs +++ b/core/src/simulation.rs @@ -1,4 +1,5 @@ use crate::parser::ArgParser; +use crate::rpc_provider::ProviderRegistry; use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; use reqwest::Client; use serde::{Deserialize, Serialize}; @@ -144,17 +145,33 @@ struct ResourceCost { } pub struct SimulationEngine { + /// Kept for single-provider backward compatibility; empty when using registry. rpc_url: String, client: Client, request_timeout: std::time::Duration, + /// When set, the engine will iterate healthy providers and failover automatically. + registry: Option>, } impl SimulationEngine { + /// Create an engine backed by a single RPC URL (backward-compatible). + #[allow(dead_code)] pub fn new(rpc_url: String) -> Self { Self { rpc_url, client: Client::new(), request_timeout: std::time::Duration::from_secs(30), + registry: None, + } + } + + /// Create an engine backed by a `ProviderRegistry` for multi-node failover. + pub fn with_registry(registry: Arc) -> Self { + Self { + rpc_url: String::new(), + client: Client::new(), + request_timeout: std::time::Duration::from_secs(30), + registry: Some(registry), } } @@ -192,9 +209,113 @@ impl SimulationEngine { self.simulate_transaction(&transaction_xdr).await } + /// Top-level simulate dispatcher: uses the provider registry when available, + /// otherwise falls back to the single `rpc_url`. async fn simulate_transaction( &self, transaction_xdr: &str, + ) -> Result { + match &self.registry { + Some(registry) => { + self.simulate_transaction_with_failover(registry, transaction_xdr) + .await + } + None => { + self.simulate_transaction_single(&self.rpc_url, None, None, transaction_xdr) + .await + } + } + } + + /// Try each healthy provider in priority order until one succeeds or all + /// are exhausted. + async fn simulate_transaction_with_failover( + &self, + registry: &Arc, + transaction_xdr: &str, + ) -> Result { + let providers = registry.healthy_providers().await; + + if providers.is_empty() { + return Err(SimulationError::RpcRequestFailed( + "All RPC providers are unavailable (circuit breaker tripped)".to_string(), + )); + } + + let mut last_error: Option = None; + + for provider in &providers { + tracing::debug!( + provider = %provider.name, + url = %provider.url, + "Attempting simulation request" + ); + + let auth = provider + .auth_header + .as_deref() + .zip(provider.auth_value.as_deref()); + + match self + .simulate_transaction_single( + &provider.url, + auth.map(|(h, _)| h), + auth.map(|(_, v)| v), + transaction_xdr, + ) + .await + { + Ok(result) => { + registry.report_success(&provider.url).await; + return Ok(result); + } + Err(e) => { + let should_retry = match &e { + SimulationError::NodeTimeout | SimulationError::NetworkError(_) => true, + SimulationError::RpcRequestFailed(msg) + if msg.starts_with("HTTP error:") => + { + // Extract status code from "HTTP error: " + msg.split_whitespace() + .last() + .and_then(|s| s.parse::().ok()) + .map(ProviderRegistry::is_retryable_status) + .unwrap_or(false) + } + _ => false, + }; + + registry.report_failure(&provider.url).await; + + if should_retry { + tracing::warn!( + provider = %provider.name, + error = %e, + "Provider failed with retryable error, trying next" + ); + last_error = Some(e); + continue; + } + + // Non-retryable error (e.g. bad request) — don't bother + // trying other providers; the request itself is bad. + return Err(e); + } + } + } + + Err(last_error.unwrap_or_else(|| { + SimulationError::RpcRequestFailed("All providers exhausted".to_string()) + })) + } + + /// Send a `simulateTransaction` JSON-RPC call to a single endpoint. + async fn simulate_transaction_single( + &self, + url: &str, + auth_header: Option<&str>, + auth_value: Option<&str>, + transaction_xdr: &str, ) -> Result { let request = SimulateTransactionRequest { jsonrpc: "2.0".to_string(), @@ -205,23 +326,27 @@ impl SimulationEngine { }, }; - tracing::debug!("Sending simulateTransaction request to {}", self.rpc_url); - - let response = tokio::time::timeout( - self.request_timeout, - self.client.post(&self.rpc_url).json(&request).send(), - ) - .await - .map_err(|_| SimulationError::NodeTimeout)? - .map_err(|e| { - if e.is_timeout() { - SimulationError::NodeTimeout - } else if e.is_connect() { - SimulationError::NetworkError(e) - } else { - SimulationError::RpcRequestFailed(format!("Network error: {}", e)) - } - })?; + tracing::debug!("Sending simulateTransaction request to {}", url); + + let mut req_builder = self.client.post(url).json(&request); + + // Attach provider-specific auth header if present. + if let (Some(header), Some(value)) = (auth_header, auth_value) { + req_builder = req_builder.header(header, value); + } + + let response = tokio::time::timeout(self.request_timeout, req_builder.send()) + .await + .map_err(|_| SimulationError::NodeTimeout)? + .map_err(|e| { + if e.is_timeout() { + SimulationError::NodeTimeout + } else if e.is_connect() { + SimulationError::NetworkError(e) + } else { + SimulationError::RpcRequestFailed(format!("Network error: {}", e)) + } + })?; if !response.status().is_success() { return Err(SimulationError::RpcRequestFailed(format!(