diff --git a/.codex/settings/kiroCodex-settings.json b/.codex/settings/kiroCodex-settings.json new file mode 100644 index 0000000..d169a18 --- /dev/null +++ b/.codex/settings/kiroCodex-settings.json @@ -0,0 +1,6 @@ +{ + "paths": { + "specs": ".codex/specs", + "steering": ".codex/steering" + } +} \ No newline at end of file diff --git a/migrations/20260325000000_webhook_endpoints.sql b/migrations/20260325000000_webhook_endpoints.sql new file mode 100644 index 0000000..cd098de --- /dev/null +++ b/migrations/20260325000000_webhook_endpoints.sql @@ -0,0 +1,35 @@ +-- Migration: Create webhook_endpoints and webhook_deliveries tables +-- Supports outgoing webhook notifications for transaction state transitions + +CREATE TABLE IF NOT EXISTS webhook_endpoints ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + url TEXT NOT NULL, + secret TEXT NOT NULL, -- HMAC-SHA256 signing secret + event_types TEXT[] NOT NULL DEFAULT '{}', -- e.g. ARRAY['transaction.completed','transaction.failed'] + enabled BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_webhook_endpoints_enabled ON webhook_endpoints(enabled); + +CREATE TABLE IF NOT EXISTS webhook_deliveries ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + endpoint_id UUID NOT NULL REFERENCES webhook_endpoints(id) ON DELETE CASCADE, + transaction_id UUID NOT NULL, + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + last_attempt_at TIMESTAMPTZ, + next_attempt_at TIMESTAMPTZ, + status TEXT NOT NULL DEFAULT 'pending', -- pending | delivered | failed + response_status INTEGER, + response_body TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_webhook_deliveries_endpoint_id ON webhook_deliveries(endpoint_id); +CREATE INDEX idx_webhook_deliveries_transaction_id ON webhook_deliveries(transaction_id); +CREATE INDEX idx_webhook_deliveries_status ON webhook_deliveries(status); +CREATE INDEX idx_webhook_deliveries_next_attempt ON webhook_deliveries(next_attempt_at) + WHERE status = 'pending'; diff --git a/src/main.rs b/src/main.rs index e1c49d8..e70b587 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use synapse_core::{ metrics, middleware, middleware::idempotency::IdempotencyService, schemas, - services::{FeatureFlagService, SettlementService}, + services::{FeatureFlagService, SettlementService, WebhookDispatcher}, stellar::HorizonClient, ApiState, AppState, ReadinessState, }; @@ -164,6 +164,20 @@ async fn serve(config: config::Config) -> anyhow::Result<()> { } }); + // Start background webhook delivery worker (runs every 30 seconds) + let webhook_pool = pool.clone(); + tokio::spawn(async move { + let dispatcher = WebhookDispatcher::new(webhook_pool); + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + loop { + interval.tick().await; + if let Err(e) = dispatcher.process_pending().await { + tracing::error!("Webhook dispatcher error: {e}"); + } + } + }); + tracing::info!("Webhook dispatcher background worker started"); + // Initialize metrics let _metrics_handle = metrics::init_metrics() .map_err(|e| anyhow::anyhow!("Failed to initialize metrics: {}", e))?; diff --git a/src/services/mod.rs b/src/services/mod.rs index 3bc8b5a..fb97791 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -5,6 +5,7 @@ pub mod scheduler; pub mod settlement; pub mod transaction_processor; pub mod transaction_processor_job; +pub mod webhook_dispatcher; pub use backup::BackupService; pub use feature_flags::FeatureFlagService; @@ -12,3 +13,4 @@ pub use scheduler::{Job, JobScheduler, JobStatus}; pub use settlement::SettlementService; pub use transaction_processor::TransactionProcessor; pub use transaction_processor_job::TransactionProcessorJob; +pub use webhook_dispatcher::WebhookDispatcher; diff --git a/src/services/transaction_processor.rs b/src/services/transaction_processor.rs index 90dc20a..02b5498 100644 --- a/src/services/transaction_processor.rs +++ b/src/services/transaction_processor.rs @@ -1,13 +1,24 @@ +use crate::services::webhook_dispatcher::WebhookDispatcher; use sqlx::PgPool; #[derive(Clone)] pub struct TransactionProcessor { pool: PgPool, + webhook_dispatcher: Option, } impl TransactionProcessor { pub fn new(pool: PgPool) -> Self { - Self { pool } + Self { + pool, + webhook_dispatcher: None, + } + } + + /// Attach a WebhookDispatcher so state transitions trigger outgoing webhooks. + pub fn with_webhook_dispatcher(mut self, dispatcher: WebhookDispatcher) -> Self { + self.webhook_dispatcher = Some(dispatcher); + self } pub async fn process_transaction(&self, tx_id: uuid::Uuid) -> anyhow::Result<()> { @@ -17,6 +28,44 @@ impl TransactionProcessor { .bind(tx_id) .execute(&self.pool) .await?; + + // Notify external systems that the transaction completed. + if let Some(dispatcher) = &self.webhook_dispatcher { + let data = serde_json::json!({ "transaction_id": tx_id }); + if let Err(e) = dispatcher + .enqueue(tx_id, "transaction.completed", data) + .await + { + tracing::error!( + transaction_id = %tx_id, + "Failed to enqueue webhook for transaction.completed: {e}" + ); + } + } + + Ok(()) + } + + pub async fn fail_transaction(&self, tx_id: uuid::Uuid, reason: &str) -> anyhow::Result<()> { + sqlx::query("UPDATE transactions SET status = 'failed', updated_at = NOW() WHERE id = $1") + .bind(tx_id) + .execute(&self.pool) + .await?; + + // Notify external systems that the transaction failed. + if let Some(dispatcher) = &self.webhook_dispatcher { + let data = serde_json::json!({ + "transaction_id": tx_id, + "reason": reason, + }); + if let Err(e) = dispatcher.enqueue(tx_id, "transaction.failed", data).await { + tracing::error!( + transaction_id = %tx_id, + "Failed to enqueue webhook for transaction.failed: {e}" + ); + } + } + Ok(()) } diff --git a/src/services/webhook_dispatcher.rs b/src/services/webhook_dispatcher.rs new file mode 100644 index 0000000..62765eb --- /dev/null +++ b/src/services/webhook_dispatcher.rs @@ -0,0 +1,293 @@ +//! Outgoing webhook dispatcher. +//! +//! Delivers signed HMAC-SHA256 payloads to registered endpoints when +//! transactions reach terminal states. Retries with exponential backoff +//! up to MAX_ATTEMPTS times and records every attempt in webhook_deliveries. + +use chrono::Utc; +use hmac::{Hmac, Mac}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use sha2::Sha256; +use sqlx::PgPool; +use uuid::Uuid; + +const MAX_ATTEMPTS: i32 = 5; +/// Base delay in seconds for exponential backoff (2^attempt * BASE_DELAY_SECS) +const BASE_DELAY_SECS: i64 = 10; + +// ── Domain types ───────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct WebhookEndpoint { + pub id: Uuid, + pub url: String, + pub secret: String, + pub event_types: Vec, + pub enabled: bool, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct WebhookDelivery { + pub id: Uuid, + pub endpoint_id: Uuid, + pub transaction_id: Uuid, + pub event_type: String, + pub payload: serde_json::Value, + pub attempt_count: i32, + pub last_attempt_at: Option>, + pub next_attempt_at: Option>, + pub status: String, + pub response_status: Option, + pub response_body: Option, + pub created_at: chrono::DateTime, +} + +/// Payload sent to external endpoints. +#[derive(Debug, Serialize)] +pub struct OutgoingPayload { + pub event_type: String, + pub transaction_id: String, + pub timestamp: chrono::DateTime, + pub data: serde_json::Value, +} + +// ── Service ─────────────────────────────────────────────────────────────────── + +#[derive(Clone)] +pub struct WebhookDispatcher { + pool: PgPool, + http: Client, +} + +impl WebhookDispatcher { + pub fn new(pool: PgPool) -> Self { + Self { + pool, + http: Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .expect("failed to build reqwest client"), + } + } + + /// Enqueue deliveries for all enabled endpoints subscribed to `event_type`. + /// Call this from TransactionProcessor on every terminal state transition. + pub async fn enqueue( + &self, + transaction_id: Uuid, + event_type: &str, + data: serde_json::Value, + ) -> anyhow::Result<()> { + let endpoints = self.endpoints_for_event(event_type).await?; + if endpoints.is_empty() { + return Ok(()); + } + + let payload = serde_json::to_value(OutgoingPayload { + event_type: event_type.to_string(), + transaction_id: transaction_id.to_string(), + timestamp: Utc::now(), + data, + })?; + + for ep in endpoints { + sqlx::query( + r#" + INSERT INTO webhook_deliveries + (endpoint_id, transaction_id, event_type, payload, status, next_attempt_at) + VALUES ($1, $2, $3, $4, 'pending', NOW()) + "#, + ) + .bind(ep.id) + .bind(transaction_id) + .bind(event_type) + .bind(&payload) + .execute(&self.pool) + .await?; + } + + Ok(()) + } + + /// Process all pending deliveries that are due. Intended to be called from + /// a background task on a short interval (e.g. every 30 seconds). + pub async fn process_pending(&self) -> anyhow::Result<()> { + let deliveries: Vec = sqlx::query_as( + r#" + SELECT * FROM webhook_deliveries + WHERE status = 'pending' + AND (next_attempt_at IS NULL OR next_attempt_at <= NOW()) + ORDER BY created_at + LIMIT 100 + "#, + ) + .fetch_all(&self.pool) + .await?; + + for delivery in deliveries { + if let Err(e) = self.attempt_delivery(&delivery).await { + tracing::error!( + delivery_id = %delivery.id, + "Webhook delivery attempt error: {e}" + ); + } + } + + Ok(()) + } + + // ── Internal helpers ────────────────────────────────────────────────────── + + async fn attempt_delivery(&self, delivery: &WebhookDelivery) -> anyhow::Result<()> { + let endpoint: WebhookEndpoint = + sqlx::query_as("SELECT * FROM webhook_endpoints WHERE id = $1") + .bind(delivery.endpoint_id) + .fetch_one(&self.pool) + .await?; + + let body = serde_json::to_string(&delivery.payload)?; + let signature = sign_payload(&endpoint.secret, &body); + + let response = self + .http + .post(&endpoint.url) + .header("Content-Type", "application/json") + .header("X-Webhook-Signature", format!("sha256={signature}")) + .header("X-Webhook-Event", &delivery.event_type) + .body(body) + .send() + .await; + + let new_attempt_count = delivery.attempt_count + 1; + let now = Utc::now(); + + match response { + Ok(resp) => { + let status_code = resp.status().as_u16() as i32; + let resp_body = resp.text().await.unwrap_or_default(); + let success = (200..300).contains(&(status_code as u16)); + + if success { + sqlx::query( + r#" + UPDATE webhook_deliveries + SET status = 'delivered', + attempt_count = $1, + last_attempt_at = $2, + response_status = $3, + response_body = $4 + WHERE id = $5 + "#, + ) + .bind(new_attempt_count) + .bind(now) + .bind(status_code) + .bind(&resp_body) + .bind(delivery.id) + .execute(&self.pool) + .await?; + + tracing::info!( + delivery_id = %delivery.id, + endpoint = %endpoint.url, + "Webhook delivered successfully" + ); + } else { + self.handle_failure( + delivery, + new_attempt_count, + now, + Some(status_code), + Some(resp_body), + ) + .await?; + } + } + Err(e) => { + self.handle_failure(delivery, new_attempt_count, now, None, Some(e.to_string())) + .await?; + } + } + + Ok(()) + } + + async fn handle_failure( + &self, + delivery: &WebhookDelivery, + attempt_count: i32, + now: chrono::DateTime, + response_status: Option, + response_body: Option, + ) -> anyhow::Result<()> { + let (new_status, next_attempt_at) = if attempt_count >= MAX_ATTEMPTS { + tracing::warn!( + delivery_id = %delivery.id, + "Webhook delivery permanently failed after {} attempts", + attempt_count + ); + ("failed", None) + } else { + // Exponential backoff: 10s, 20s, 40s, 80s, 160s + let delay = BASE_DELAY_SECS * (1_i64 << attempt_count); + let next = now + chrono::Duration::seconds(delay); + tracing::warn!( + delivery_id = %delivery.id, + attempt = attempt_count, + next_retry_in_secs = delay, + "Webhook delivery failed, scheduling retry" + ); + ("pending", Some(next)) + }; + + sqlx::query( + r#" + UPDATE webhook_deliveries + SET status = $1, + attempt_count = $2, + last_attempt_at = $3, + next_attempt_at = $4, + response_status = $5, + response_body = $6 + WHERE id = $7 + "#, + ) + .bind(new_status) + .bind(attempt_count) + .bind(now) + .bind(next_attempt_at) + .bind(response_status) + .bind(response_body) + .bind(delivery.id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn endpoints_for_event(&self, event_type: &str) -> anyhow::Result> { + let endpoints: Vec = sqlx::query_as( + r#" + SELECT * FROM webhook_endpoints + WHERE enabled = TRUE + AND $1 = ANY(event_types) + "#, + ) + .bind(event_type) + .fetch_all(&self.pool) + .await?; + + Ok(endpoints) + } +} + +/// Compute HMAC-SHA256 hex signature for a payload. +fn sign_payload(secret: &str, body: &str) -> String { + let mut mac = + Hmac::::new_from_slice(secret.as_bytes()).expect("HMAC accepts any key length"); + mac.update(body.as_bytes()); + hex::encode(mac.finalize().into_bytes()) +}