From 4a5d0edcdaf5f128fac4b199b4a5941ab251326c Mon Sep 17 00:00:00 2001 From: luhrhenz Date: Wed, 25 Mar 2026 17:39:25 +0100 Subject: [PATCH] feat: distributed tracing with OpenTelemetry (issue #22) - Add opentelemetry, opentelemetry_sdk, opentelemetry-otlp, tracing-opentelemetry deps - Add src/telemetry.rs: init_tracer() sets up OTLP batch exporter (or no-op if OTLP_ENDPOINT unset), installs W3C TraceContext propagator - Wire OTel layer into tracing-subscriber registry in main() - Add OTLP_ENDPOINT to Config (optional env var) - Instrument HorizonClient.get_account with #[instrument] + W3C traceparent header injection - Instrument webhook handlers: callback, handle_webhook, get_transaction, transaction_callback - Instrument TransactionProcessor: process_transaction, requeue_dlq - Flush tracer on clean server shutdown --- Cargo.toml | 5 ++ src/config.rs | 2 + src/handlers/webhook.rs | 5 ++ src/lib.rs | 1 + src/main.rs | 21 +++++++- src/services/transaction_processor.rs | 3 ++ src/stellar/client.rs | 20 +++++++- src/telemetry.rs | 73 +++++++++++++++++++++++++++ 8 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 src/telemetry.rs diff --git a/Cargo.toml b/Cargo.toml index 199022f..8c85c35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,11 @@ async-trait = "0.1" hmac = "0.12" sha2 = "0.10" hex = "0.4" +opentelemetry = { version = "0.21", features = ["trace"] } +opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "trace"] } +opentelemetry-otlp = { version = "0.14", features = ["tonic", "trace"] } +opentelemetry-semantic-conventions = "0.13" +tracing-opentelemetry = "0.22" [dev-dependencies] mockito = "1" diff --git a/src/config.rs b/src/config.rs index f862835..ce7b57d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -31,6 +31,7 @@ pub struct Config { pub allowed_ips: AllowedIps, pub backup_dir: String, pub backup_encryption_key: Option, + pub otlp_endpoint: Option, } pub mod assets; @@ -85,6 +86,7 @@ impl Config { allowed_ips, backup_dir: env::var("BACKUP_DIR").unwrap_or_else(|_| "./backups".to_string()), backup_encryption_key: env::var("BACKUP_ENCRYPTION_KEY").ok(), + otlp_endpoint: env::var("OTLP_ENDPOINT").ok(), }) } } diff --git a/src/handlers/webhook.rs b/src/handlers/webhook.rs index 5d78c34..60fbefd 100644 --- a/src/handlers/webhook.rs +++ b/src/handlers/webhook.rs @@ -17,6 +17,7 @@ use axum::{ use serde::{Deserialize, Serialize}; use sqlx::types::BigDecimal; use std::str::FromStr; +use tracing::instrument; use utoipa::ToSchema; use uuid::Uuid; @@ -117,6 +118,7 @@ fn validate_webhook_payload( }) } +#[instrument(name = "webhook.transaction_callback", skip(state, payload))] pub async fn transaction_callback( State(state): State, Json(payload): Json, @@ -309,6 +311,7 @@ fn validate_memo_type(memo_type: &Option) -> Result<(), AppError> { ), tag = "Webhooks" )] +#[instrument(name = "webhook.callback", skip(state, payload))] pub async fn callback( State(state): State, Json(payload): Json, @@ -348,6 +351,7 @@ pub async fn callback( ), tag = "Webhooks" )] +#[instrument(name = "webhook.handle_webhook", skip(payload))] pub async fn handle_webhook( State(_state): State, Json(payload): Json, @@ -378,6 +382,7 @@ pub async fn handle_webhook( ), tag = "Transactions" )] +#[instrument(name = "webhook.get_transaction", skip(state), fields(transaction.id = %id))] pub async fn get_transaction( State(state): State, Path(id): Path, diff --git a/src/lib.rs b/src/lib.rs index 2669fc4..1cf5d64 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod secrets; pub mod services; pub mod startup; pub mod stellar; +pub mod telemetry; pub mod utils; pub mod validation; diff --git a/src/main.rs b/src/main.rs index e1c49d8..ac20276 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,9 +17,11 @@ use synapse_core::{ schemas, services::{FeatureFlagService, SettlementService}, stellar::HorizonClient, + telemetry, ApiState, AppState, ReadinessState, }; use tokio::sync::broadcast; +use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use utoipa::OpenApi; mod cli; @@ -69,20 +71,34 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); let config = config::Config::load().await?; - // Setup logging + // Setup logging + OpenTelemetry tracing layer let env_filter = tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()); + + // Init OTel tracer early so the tracing layer can reference it. + let tracer_provider = telemetry::init_tracer( + "synapse-core", + config.otlp_endpoint.as_deref(), + ) + .expect("failed to initialise OpenTelemetry tracer"); + + let otel_layer = OpenTelemetryLayer::new( + tracer_provider.tracer("synapse-core"), + ); + match config.log_format { config::LogFormat::Json => { tracing_subscriber::registry() .with(env_filter) .with(tracing_subscriber::fmt::layer().json()) + .with(otel_layer) .init(); } config::LogFormat::Text => { tracing_subscriber::registry() .with(env_filter) .with(tracing_subscriber::fmt::layer()) + .with(otel_layer) .init(); } } @@ -270,6 +286,9 @@ async fn serve(config: config::Config) -> anyhow::Result<()> { .serve(app.into_make_service_with_connect_info::()) .await?; + // Flush and shut down the OTel exporter on clean exit. + opentelemetry::global::shutdown_tracer_provider(); + Ok(()) } diff --git a/src/services/transaction_processor.rs b/src/services/transaction_processor.rs index 90dc20a..57866a0 100644 --- a/src/services/transaction_processor.rs +++ b/src/services/transaction_processor.rs @@ -1,4 +1,5 @@ use sqlx::PgPool; +use tracing::instrument; #[derive(Clone)] pub struct TransactionProcessor { @@ -10,6 +11,7 @@ impl TransactionProcessor { Self { pool } } + #[instrument(name = "processor.process_transaction", skip(self), fields(transaction.id = %tx_id))] pub async fn process_transaction(&self, tx_id: uuid::Uuid) -> anyhow::Result<()> { sqlx::query( "UPDATE transactions SET status = 'completed', updated_at = NOW() WHERE id = $1", @@ -20,6 +22,7 @@ impl TransactionProcessor { Ok(()) } + #[instrument(name = "processor.requeue_dlq", skip(self), fields(dlq.id = %dlq_id))] pub async fn requeue_dlq(&self, dlq_id: uuid::Uuid) -> anyhow::Result<()> { let tx_id: uuid::Uuid = sqlx::query_scalar("SELECT transaction_id FROM transaction_dlq WHERE id = $1") diff --git a/src/stellar/client.rs b/src/stellar/client.rs index 403bbd6..3bc657c 100644 --- a/src/stellar/client.rs +++ b/src/stellar/client.rs @@ -1,9 +1,13 @@ use failsafe::futures::CircuitBreaker as FuturesCircuitBreaker; use failsafe::{backoff, failure_policy, Config, Error as FailsafeError, StateMachine}; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry_sdk::propagation::TraceContextPropagator; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::time::Duration; use thiserror::Error; +use tracing::instrument; +use tracing_opentelemetry::OpenTelemetrySpanExt; #[derive(Error, Debug)] pub enum HorizonError { @@ -100,7 +104,9 @@ impl HorizonClient { } } - /// Fetches account details from the Horizon API + /// Fetches account details from the Horizon API. + /// The current trace context is propagated via W3C `traceparent` headers. + #[instrument(name = "horizon.get_account", skip(self), fields(stellar.account = %address))] pub async fn get_account(&self, address: &str) -> Result { let url = format!( "{}/accounts/{}", @@ -110,10 +116,20 @@ impl HorizonClient { let client = self.client.clone(); let addr = address.to_string(); + // Inject W3C traceparent / tracestate into outgoing request headers. + let mut headers = std::collections::HashMap::new(); + let propagator = TraceContextPropagator::new(); + let cx = tracing::Span::current().context(); + propagator.inject_context(&cx, &mut headers); + let result = self .circuit_breaker .call(async move { - let response = client.get(&url).send().await?; + let mut req = client.get(&url); + for (k, v) in &headers { + req = req.header(k.as_str(), v.as_str()); + } + let response = req.send().await?; if response.status() == 404 { return Err(HorizonError::AccountNotFound(addr)); diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 0000000..ea5068f --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,73 @@ +//! OpenTelemetry initialisation. +//! +//! Call `init_tracer` once at startup. It returns a `TracerProvider` that +//! must be kept alive for the duration of the process (dropping it flushes +//! and shuts down the exporter). When no OTLP endpoint is configured the +//! function installs a no-op provider so the rest of the code compiles and +//! runs unchanged. + +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + propagation::TraceContextPropagator, + runtime, + trace::{self as sdktrace, TracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; + +/// Initialise the global tracer and return the provider so the caller can +/// shut it down cleanly on exit. +pub fn init_tracer( + service_name: &str, + otlp_endpoint: Option<&str>, +) -> anyhow::Result { + // W3C TraceContext propagation (traceparent / tracestate headers) + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + + let resource = Resource::new(vec![ + opentelemetry::KeyValue::new(SERVICE_NAME, service_name.to_string()), + opentelemetry::KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + ]); + + let provider = match otlp_endpoint { + Some(endpoint) => { + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint) + .build_span_exporter()?; + + let provider = sdktrace::TracerProvider::builder() + .with_resource(resource) + .with_batch_exporter(exporter, runtime::Tokio) + .build(); + + tracing::info!("OpenTelemetry OTLP exporter configured → {endpoint}"); + provider + } + None => { + // No endpoint configured — use a no-op provider (traces are dropped). + let provider = sdktrace::TracerProvider::builder() + .with_resource(resource) + .build(); + + tracing::info!( + "No OTLP_ENDPOINT set — OpenTelemetry running in no-op mode" + ); + provider + } + }; + + // Register as the global provider so `opentelemetry::global::tracer()` + // works anywhere in the codebase. + opentelemetry::global::set_tracer_provider(provider.clone()); + + Ok(provider) +} + +/// Shut down the tracer provider, flushing any buffered spans. +pub fn shutdown_tracer(provider: TracerProvider) { + if let Err(e) = provider.shutdown() { + tracing::error!("OpenTelemetry shutdown error: {e}"); + } +}