Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .codex/settings/kiroCodex-settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"paths": {
"specs": ".codex/specs",
"steering": ".codex/steering"
}
}
35 changes: 35 additions & 0 deletions migrations/20260325000000_webhook_endpoints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Migration: Create webhook_endpoints and webhook_deliveries tables
-- Supports outgoing webhook notifications for transaction state transitions

CREATE TABLE IF NOT EXISTS webhook_endpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
url TEXT NOT NULL,
secret TEXT NOT NULL, -- HMAC-SHA256 signing secret
event_types TEXT[] NOT NULL DEFAULT '{}', -- e.g. ARRAY['transaction.completed','transaction.failed']
enabled BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_webhook_endpoints_enabled ON webhook_endpoints(enabled);

CREATE TABLE IF NOT EXISTS webhook_deliveries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
endpoint_id UUID NOT NULL REFERENCES webhook_endpoints(id) ON DELETE CASCADE,
transaction_id UUID NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at TIMESTAMPTZ,
next_attempt_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'pending', -- pending | delivered | failed
response_status INTEGER,
response_body TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_webhook_deliveries_endpoint_id ON webhook_deliveries(endpoint_id);
CREATE INDEX idx_webhook_deliveries_transaction_id ON webhook_deliveries(transaction_id);
CREATE INDEX idx_webhook_deliveries_status ON webhook_deliveries(status);
CREATE INDEX idx_webhook_deliveries_next_attempt ON webhook_deliveries(next_attempt_at)
WHERE status = 'pending';
16 changes: 15 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use synapse_core::{
metrics, middleware,
middleware::idempotency::IdempotencyService,
schemas,
services::{FeatureFlagService, SettlementService},
services::{FeatureFlagService, SettlementService, WebhookDispatcher},
stellar::HorizonClient,
ApiState, AppState, ReadinessState,
};
Expand Down Expand Up @@ -164,6 +164,20 @@ async fn serve(config: config::Config) -> anyhow::Result<()> {
}
});

// Start background webhook delivery worker (runs every 30 seconds)
let webhook_pool = pool.clone();
tokio::spawn(async move {
let dispatcher = WebhookDispatcher::new(webhook_pool);
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = dispatcher.process_pending().await {
tracing::error!("Webhook dispatcher error: {e}");
}
}
});
tracing::info!("Webhook dispatcher background worker started");

// Initialize metrics
let _metrics_handle = metrics::init_metrics()
.map_err(|e| anyhow::anyhow!("Failed to initialize metrics: {}", e))?;
Expand Down
2 changes: 2 additions & 0 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ pub mod scheduler;
pub mod settlement;
pub mod transaction_processor;
pub mod transaction_processor_job;
pub mod webhook_dispatcher;

pub use backup::BackupService;
pub use feature_flags::FeatureFlagService;
pub use scheduler::{Job, JobScheduler, JobStatus};
pub use settlement::SettlementService;
pub use transaction_processor::TransactionProcessor;
pub use transaction_processor_job::TransactionProcessorJob;
pub use webhook_dispatcher::WebhookDispatcher;
51 changes: 50 additions & 1 deletion src/services/transaction_processor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
use crate::services::webhook_dispatcher::WebhookDispatcher;
use sqlx::PgPool;

#[derive(Clone)]
pub struct TransactionProcessor {
pool: PgPool,
webhook_dispatcher: Option<WebhookDispatcher>,
}

impl TransactionProcessor {
pub fn new(pool: PgPool) -> Self {
Self { pool }
Self {
pool,
webhook_dispatcher: None,
}
}

/// Attach a WebhookDispatcher so state transitions trigger outgoing webhooks.
pub fn with_webhook_dispatcher(mut self, dispatcher: WebhookDispatcher) -> Self {
self.webhook_dispatcher = Some(dispatcher);
self
}

pub async fn process_transaction(&self, tx_id: uuid::Uuid) -> anyhow::Result<()> {
Expand All @@ -17,6 +28,44 @@ impl TransactionProcessor {
.bind(tx_id)
.execute(&self.pool)
.await?;

// Notify external systems that the transaction completed.
if let Some(dispatcher) = &self.webhook_dispatcher {
let data = serde_json::json!({ "transaction_id": tx_id });
if let Err(e) = dispatcher
.enqueue(tx_id, "transaction.completed", data)
.await
{
tracing::error!(
transaction_id = %tx_id,
"Failed to enqueue webhook for transaction.completed: {e}"
);
}
}

Ok(())
}

pub async fn fail_transaction(&self, tx_id: uuid::Uuid, reason: &str) -> anyhow::Result<()> {
sqlx::query("UPDATE transactions SET status = 'failed', updated_at = NOW() WHERE id = $1")
.bind(tx_id)
.execute(&self.pool)
.await?;

// Notify external systems that the transaction failed.
if let Some(dispatcher) = &self.webhook_dispatcher {
let data = serde_json::json!({
"transaction_id": tx_id,
"reason": reason,
});
if let Err(e) = dispatcher.enqueue(tx_id, "transaction.failed", data).await {
tracing::error!(
transaction_id = %tx_id,
"Failed to enqueue webhook for transaction.failed: {e}"
);
}
}

Ok(())
}

Expand Down
Loading
Loading