Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ async-trait = "0.1"
hmac = "0.12"
sha2 = "0.10"
hex = "0.4"
opentelemetry = { version = "0.21", features = ["trace"] }
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version = "0.14", features = ["tonic", "trace"] }
opentelemetry-semantic-conventions = "0.13"
tracing-opentelemetry = "0.22"

[dev-dependencies]
mockito = "1"
Expand Down
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct Config {
pub allowed_ips: AllowedIps,
pub backup_dir: String,
pub backup_encryption_key: Option<String>,
pub otlp_endpoint: Option<String>,
}

pub mod assets;
Expand Down Expand Up @@ -85,6 +86,7 @@ impl Config {
allowed_ips,
backup_dir: env::var("BACKUP_DIR").unwrap_or_else(|_| "./backups".to_string()),
backup_encryption_key: env::var("BACKUP_ENCRYPTION_KEY").ok(),
otlp_endpoint: env::var("OTLP_ENDPOINT").ok(),
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/handlers/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use axum::{
use serde::{Deserialize, Serialize};
use sqlx::types::BigDecimal;
use std::str::FromStr;
use tracing::instrument;
use utoipa::ToSchema;
use uuid::Uuid;

Expand Down Expand Up @@ -117,6 +118,7 @@ fn validate_webhook_payload(
})
}

#[instrument(name = "webhook.transaction_callback", skip(state, payload))]
pub async fn transaction_callback(
State(state): State<AppState>,
Json(payload): Json<WebhookTransactionRequest>,
Expand Down Expand Up @@ -309,6 +311,7 @@ fn validate_memo_type(memo_type: &Option<String>) -> Result<(), AppError> {
),
tag = "Webhooks"
)]
#[instrument(name = "webhook.callback", skip(state, payload))]
pub async fn callback(
State(state): State<ApiState>,
Json(payload): Json<CallbackPayload>,
Expand Down Expand Up @@ -348,6 +351,7 @@ pub async fn callback(
),
tag = "Webhooks"
)]
#[instrument(name = "webhook.handle_webhook", skip(payload))]
pub async fn handle_webhook(
State(_state): State<ApiState>,
Json(payload): Json<WebhookPayload>,
Expand Down Expand Up @@ -378,6 +382,7 @@ pub async fn handle_webhook(
),
tag = "Transactions"
)]
#[instrument(name = "webhook.get_transaction", skip(state), fields(transaction.id = %id))]
pub async fn get_transaction(
State(state): State<ApiState>,
Path(id): Path<Uuid>,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod secrets;
pub mod services;
pub mod startup;
pub mod stellar;
pub mod telemetry;
pub mod utils;
pub mod validation;

Expand Down
21 changes: 20 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use synapse_core::{
schemas,
services::{FeatureFlagService, SettlementService},
stellar::HorizonClient,
telemetry,
ApiState, AppState, ReadinessState,
};
use tokio::sync::broadcast;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use utoipa::OpenApi;
mod cli;
Expand Down Expand Up @@ -69,20 +71,34 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let config = config::Config::load().await?;

// Setup logging
// Setup logging + OpenTelemetry tracing layer
let env_filter =
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());

// Init OTel tracer early so the tracing layer can reference it.
let tracer_provider = telemetry::init_tracer(
"synapse-core",
config.otlp_endpoint.as_deref(),
)
.expect("failed to initialise OpenTelemetry tracer");

let otel_layer = OpenTelemetryLayer::new(
tracer_provider.tracer("synapse-core"),
);

match config.log_format {
config::LogFormat::Json => {
tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().json())
.with(otel_layer)
.init();
}
config::LogFormat::Text => {
tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer())
.with(otel_layer)
.init();
}
}
Expand Down Expand Up @@ -270,6 +286,9 @@ async fn serve(config: config::Config) -> anyhow::Result<()> {
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await?;

// Flush and shut down the OTel exporter on clean exit.
opentelemetry::global::shutdown_tracer_provider();

Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions src/services/transaction_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use sqlx::PgPool;
use tracing::instrument;

#[derive(Clone)]
pub struct TransactionProcessor {
Expand All @@ -10,6 +11,7 @@ impl TransactionProcessor {
Self { pool }
}

#[instrument(name = "processor.process_transaction", skip(self), fields(transaction.id = %tx_id))]
pub async fn process_transaction(&self, tx_id: uuid::Uuid) -> anyhow::Result<()> {
sqlx::query(
"UPDATE transactions SET status = 'completed', updated_at = NOW() WHERE id = $1",
Expand All @@ -20,6 +22,7 @@ impl TransactionProcessor {
Ok(())
}

#[instrument(name = "processor.requeue_dlq", skip(self), fields(dlq.id = %dlq_id))]
pub async fn requeue_dlq(&self, dlq_id: uuid::Uuid) -> anyhow::Result<()> {
let tx_id: uuid::Uuid =
sqlx::query_scalar("SELECT transaction_id FROM transaction_dlq WHERE id = $1")
Expand Down
20 changes: 18 additions & 2 deletions src/stellar/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use failsafe::futures::CircuitBreaker as FuturesCircuitBreaker;
use failsafe::{backoff, failure_policy, Config, Error as FailsafeError, StateMachine};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use thiserror::Error;
use tracing::instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

#[derive(Error, Debug)]
pub enum HorizonError {
Expand Down Expand Up @@ -100,7 +104,9 @@ impl HorizonClient {
}
}

/// Fetches account details from the Horizon API
/// Fetches account details from the Horizon API.
/// The current trace context is propagated via W3C `traceparent` headers.
#[instrument(name = "horizon.get_account", skip(self), fields(stellar.account = %address))]
pub async fn get_account(&self, address: &str) -> Result<AccountResponse, HorizonError> {
let url = format!(
"{}/accounts/{}",
Expand All @@ -110,10 +116,20 @@ impl HorizonClient {
let client = self.client.clone();
let addr = address.to_string();

// Inject W3C traceparent / tracestate into outgoing request headers.
let mut headers = std::collections::HashMap::new();
let propagator = TraceContextPropagator::new();
let cx = tracing::Span::current().context();
propagator.inject_context(&cx, &mut headers);

let result = self
.circuit_breaker
.call(async move {
let response = client.get(&url).send().await?;
let mut req = client.get(&url);
for (k, v) in &headers {
req = req.header(k.as_str(), v.as_str());
}
let response = req.send().await?;

if response.status() == 404 {
return Err(HorizonError::AccountNotFound(addr));
Expand Down
73 changes: 73 additions & 0 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! OpenTelemetry initialisation.
//!
//! Call `init_tracer` once at startup. It returns a `TracerProvider` that
//! must be kept alive for the duration of the process (dropping it flushes
//! and shuts down the exporter). When no OTLP endpoint is configured the
//! function installs a no-op provider so the rest of the code compiles and
//! runs unchanged.

use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
propagation::TraceContextPropagator,
runtime,
trace::{self as sdktrace, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};

/// Initialise the global tracer and return the provider so the caller can
/// shut it down cleanly on exit.
pub fn init_tracer(
service_name: &str,
otlp_endpoint: Option<&str>,
) -> anyhow::Result<TracerProvider> {
// W3C TraceContext propagation (traceparent / tracestate headers)
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

let resource = Resource::new(vec![
opentelemetry::KeyValue::new(SERVICE_NAME, service_name.to_string()),
opentelemetry::KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
]);

let provider = match otlp_endpoint {
Some(endpoint) => {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint)
.build_span_exporter()?;

let provider = sdktrace::TracerProvider::builder()
.with_resource(resource)
.with_batch_exporter(exporter, runtime::Tokio)
.build();

tracing::info!("OpenTelemetry OTLP exporter configured → {endpoint}");
provider
}
None => {
// No endpoint configured — use a no-op provider (traces are dropped).
let provider = sdktrace::TracerProvider::builder()
.with_resource(resource)
.build();

tracing::info!(
"No OTLP_ENDPOINT set — OpenTelemetry running in no-op mode"
);
provider
}
};

// Register as the global provider so `opentelemetry::global::tracer()`
// works anywhere in the codebase.
opentelemetry::global::set_tracer_provider(provider.clone());

Ok(provider)
}

/// Shut down the tracer provider, flushing any buffered spans.
pub fn shutdown_tracer(provider: TracerProvider) {
if let Err(e) = provider.shutdown() {
tracing::error!("OpenTelemetry shutdown error: {e}");
}
}
Loading