From 590dd3d478f8672280914c13ec495fdcfd702446 Mon Sep 17 00:00:00 2001 From: HexStar Date: Thu, 26 Feb 2026 01:14:25 +0000 Subject: [PATCH 1/4] Add distributed lock manager for multi-instance deployments - Redis-based distributed locks with TTL - Automatic lock renewal for long-running operations - Lua scripts for atomic lock operations - Lock acquisition with timeout and retry - Graceful lock release on drop - Usage examples for transaction processing - Prevents duplicate processing across instances --- src/services/lock_examples.rs | 82 +++++++++++ src/services/lock_manager.rs | 253 ++++++++++++++++++++++++++++++++++ src/services/mod.rs | 2 + 3 files changed, 337 insertions(+) create mode 100644 src/services/lock_examples.rs create mode 100644 src/services/lock_manager.rs diff --git a/src/services/lock_examples.rs b/src/services/lock_examples.rs new file mode 100644 index 0000000..0d7586e --- /dev/null +++ b/src/services/lock_examples.rs @@ -0,0 +1,82 @@ +use crate::services::{LockManager, TransactionProcessor}; +use std::time::Duration; +use tracing::{info, warn}; +use uuid::Uuid; + +/// Example: Process transaction with distributed lock +pub async fn process_transaction_with_lock( + lock_manager: &LockManager, + processor: &TransactionProcessor, + tx_id: Uuid, +) -> anyhow::Result { + let resource = format!("transaction:{}", tx_id); + let timeout = Duration::from_secs(5); + + // Try to acquire lock + let lock = match lock_manager.acquire(&resource, timeout).await? { + Some(lock) => lock, + None => { + warn!("Could not acquire lock for transaction {}", tx_id); + return Ok(false); + } + }; + + info!("Processing transaction {} with lock", tx_id); + + // Process transaction + let result = processor.process_transaction(tx_id).await; + + // Release lock + lock.release().await?; + + result.map(|_| true) +} + +/// Example: Long-running operation with auto-renewal +pub async fn long_running_with_lock( + lock_manager: &LockManager, + resource: &str, +) -> anyhow::Result<()> { + let timeout = Duration::from_secs(5); + + let lock = match lock_manager.acquire(resource, timeout).await? { + Some(lock) => lock, + None => { + return Err(anyhow::anyhow!("Could not acquire lock")); + } + }; + + // Spawn auto-renewal task + let renewal_lock = lock.clone(); + tokio::spawn(async move { + renewal_lock.auto_renew_task().await; + }); + + // Do long-running work + tokio::time::sleep(Duration::from_secs(60)).await; + + // Lock will be released on drop + Ok(()) +} + +/// Example: Using with_lock helper +pub async fn process_with_helper( + lock_manager: &LockManager, + processor: &TransactionProcessor, + tx_id: Uuid, +) -> anyhow::Result> { + let resource = format!("transaction:{}", tx_id); + let timeout = Duration::from_secs(5); + + lock_manager + .with_lock(&resource, timeout, || { + Box::pin(async move { + processor + .process_transaction(tx_id) + .await + .map_err(|e| Box::new(e) as Box) + }) + }) + .await + .map_err(|e| anyhow::anyhow!("Lock error: {}", e)) +} diff --git a/src/services/lock_manager.rs b/src/services/lock_manager.rs new file mode 100644 index 0000000..220c8a4 --- /dev/null +++ b/src/services/lock_manager.rs @@ -0,0 +1,253 @@ +use redis::{aio::MultiplexedConnection, AsyncCommands, Client, Script}; +use std::time::Duration; +use tokio::time::{sleep, timeout}; +use tracing::{debug, warn}; +use uuid::Uuid; + +pub struct LockManager { + redis_client: Client, + default_ttl: Duration, +} + +#[derive(Clone)] +pub struct Lock { + key: String, + token: String, + redis_client: Client, + ttl: Duration, +} + +impl LockManager { + pub fn new(redis_url: &str, default_ttl_secs: u64) -> Result { + let redis_client = Client::open(redis_url)?; + Ok(Self { + redis_client, + default_ttl: Duration::from_secs(default_ttl_secs), + }) + } + + pub async fn acquire( + &self, + resource: &str, + timeout_duration: Duration, + ) -> Result, redis::RedisError> { + let key = format!("lock:{}", resource); + let token = Uuid::new_v4().to_string(); + let ttl = self.default_ttl; + + let start = tokio::time::Instant::now(); + + loop { + if let Some(lock) = self.try_acquire(&key, &token, ttl).await? { + debug!("Acquired lock for {}", resource); + return Ok(Some(lock)); + } + + if start.elapsed() >= timeout_duration { + debug!("Lock acquisition timeout for {}", resource); + return Ok(None); + } + + sleep(Duration::from_millis(50)).await; + } + } + + async fn try_acquire( + &self, + key: &str, + token: &str, + ttl: Duration, + ) -> Result, redis::RedisError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + // SET key token NX EX ttl_seconds + let result: Option = conn + .set_options( + key, + token, + redis::SetOptions::default() + .conditional_set(redis::ExistenceCheck::NX) + .with_expiration(redis::SetExpiry::EX(ttl.as_secs() as usize)), + ) + .await?; + + if result.is_some() { + Ok(Some(Lock { + key: key.to_string(), + token: token.to_string(), + redis_client: self.redis_client.clone(), + ttl, + })) + } else { + Ok(None) + } + } + + pub async fn with_lock( + &self, + resource: &str, + timeout_duration: Duration, + f: F, + ) -> Result, Box> + where + F: FnOnce() -> std::pin::Pin< + Box< + dyn std::future::Future< + Output = Result>, + > + Send, + >, + >, + { + let lock = match self.acquire(resource, timeout_duration).await? { + Some(lock) => lock, + None => return Ok(None), + }; + + let result = f().await; + + lock.release().await?; + + result.map(Some) + } +} + +impl Lock { + pub async fn release(self) -> Result<(), redis::RedisError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + // Lua script to ensure we only delete if token matches + let script = Script::new( + r#" + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + "#, + ); + + let _: i32 = script + .key(&self.key) + .arg(&self.token) + .invoke_async(&mut conn) + .await?; + + debug!("Released lock for {}", self.key); + Ok(()) + } + + pub async fn renew(&mut self) -> Result { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + // Lua script to renew only if token matches + let script = Script::new( + r#" + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("expire", KEYS[1], ARGV[2]) + else + return 0 + end + "#, + ); + + let result: i32 = script + .key(&self.key) + .arg(&self.token) + .arg(self.ttl.as_secs() as i32) + .invoke_async(&mut conn) + .await?; + + Ok(result == 1) + } + + pub async fn auto_renew_task(mut self) { + let renew_interval = self.ttl / 2; + + loop { + sleep(renew_interval).await; + + match self.renew().await { + Ok(true) => debug!("Renewed lock for {}", self.key), + Ok(false) => { + warn!("Failed to renew lock for {} - token mismatch", self.key); + break; + } + Err(e) => { + warn!("Error renewing lock for {}: {}", self.key, e); + break; + } + } + } + } +} + +impl Drop for Lock { + fn drop(&mut self) { + // Best effort release on drop + let key = self.key.clone(); + let token = self.token.clone(); + let client = self.redis_client.clone(); + + tokio::spawn(async move { + if let Ok(mut conn) = client.get_multiplexed_async_connection().await { + let script = Script::new( + r#" + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + "#, + ); + + let _ = script + .key(&key) + .arg(&token) + .invoke_async::<_, i32>(&mut conn) + .await; + } + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_lock_acquire_release() { + let manager = LockManager::new("redis://localhost:6379", 30).unwrap(); + + let lock = manager + .acquire("test_resource", Duration::from_secs(5)) + .await + .unwrap(); + + assert!(lock.is_some()); + + let lock = lock.unwrap(); + lock.release().await.unwrap(); + } + + #[tokio::test] + async fn test_lock_prevents_duplicate() { + let manager = LockManager::new("redis://localhost:6379", 30).unwrap(); + + let lock1 = manager + .acquire("test_resource_2", Duration::from_secs(5)) + .await + .unwrap(); + + assert!(lock1.is_some()); + + // Try to acquire same lock + let lock2 = manager + .acquire("test_resource_2", Duration::from_millis(100)) + .await + .unwrap(); + + assert!(lock2.is_none()); + + lock1.unwrap().release().await.unwrap(); + } +} diff --git a/src/services/mod.rs b/src/services/mod.rs index 3bc8b5a..f10f4e0 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,5 +1,6 @@ pub mod backup; pub mod feature_flags; +pub mod lock_manager; pub mod processor; pub mod scheduler; pub mod settlement; @@ -8,6 +9,7 @@ pub mod transaction_processor_job; pub use backup::BackupService; pub use feature_flags::FeatureFlagService; +pub use lock_manager::{Lock, LockManager}; pub use scheduler::{Job, JobScheduler, JobStatus}; pub use settlement::SettlementService; pub use transaction_processor::TransactionProcessor; From f64933424d7c71a54c12cfb34ab7de0548be94c2 Mon Sep 17 00:00:00 2001 From: HexStar Date: Thu, 26 Feb 2026 01:19:08 +0000 Subject: [PATCH 2/4] Fix formatting for CI --- Cargo.lock | 91 ++++++++---- src/handlers/search.rs | 1 - src/utils/sanitize.rs | 64 +++++++-- tests/metrics_test.rs | 52 +++---- tests/request_logger_test.rs | 37 +++-- tests/search_test.rs | 154 ++++++++++++--------- tests/settlement_test.rs | 17 ++- tests/startup_validation_test.rs | 71 ++++++---- tests/websocket_test.rs | 228 ++++++++++++++++--------------- 9 files changed, 421 insertions(+), 294 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a65cc22..15019ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -315,7 +315,7 @@ dependencies = [ "sha1", "sync_wrapper 0.1.2", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.20.1", "tower 0.4.13", "tower-layer", "tower-service", @@ -943,12 +943,11 @@ dependencies = [ name = "displaydoc" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ - "darling 0.14.4", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.117", ] [[package]] @@ -1859,9 +1858,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "js-sys" -version = "0.3.89" +version = "0.3.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4eacb0641a310445a4c513f2a5e23e19952e269c6a38887254d5f837a305506" +checksum = "14dc6f6450b3f6d4ed5b16327f38fed626d375a886159ca555bd7822c0c3a5a6" dependencies = [ "once_cell", "wasm-bindgen", @@ -1902,7 +1901,7 @@ checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags 2.11.0", "libc", - "redox_syscall 0.7.1", + "redox_syscall 0.7.2", ] [[package]] @@ -2680,9 +2679,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35985aa610addc02e24fc232012c86fd11f14111180f902b67e2d5331f8ebf2b" +checksum = "6d94dd2f7cd932d4dc02cc8b2b50dfd38bd079a4e5d79198b99743d7fcf9a4b4" dependencies = [ "bitflags 2.11.0", ] @@ -2732,9 +2731,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] name = "reqwest" @@ -2937,9 +2936,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.36" +version = "0.23.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" dependencies = [ "once_cell", "ring", @@ -3168,9 +3167,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.16.1" +version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fa237f2807440d238e0364a218270b98f767a00d3dada77b1c53ae88940e2e7" +checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9" dependencies = [ "base64 0.22.1", "chrono", @@ -3187,9 +3186,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.16.1" +version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52a8e3ca0ca629121f70ab50f95249e5a6f925cc0f6ffe8256c45b728875706c" +checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -3706,6 +3705,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", + "tokio-tungstenite 0.21.0", "tower 0.4.13", "tower-http 0.4.4", "tracing", @@ -4015,7 +4015,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.20.1", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", ] [[package]] @@ -4229,6 +4241,25 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.4.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -4477,9 +4508,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.112" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05d7d0fce354c88b7982aec4400b3e7fcf723c32737cef571bd165f7613557ee" +checksum = "60722a937f594b7fde9adb894d7c092fc1bb6612897c46368d18e7a20208eff2" dependencies = [ "cfg-if", "once_cell", @@ -4490,9 +4521,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.62" +version = "0.4.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee85afca410ac4abba5b584b12e77ea225db6ee5471d0aebaae0861166f9378a" +checksum = "8a89f4650b770e4521aa6573724e2aed4704372151bd0de9d16a3bbabb87441a" dependencies = [ "cfg-if", "futures-util", @@ -4504,9 +4535,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.112" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55839b71ba921e4f75b674cb16f843f4b1f3b26ddfcb3454de1cf65cc021ec0f" +checksum = "0fac8c6395094b6b91c4af293f4c79371c163f9a6f56184d2c9a85f5a95f3950" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4514,9 +4545,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.112" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf2e969c2d60ff52e7e98b7392ff1588bffdd1ccd4769eba27222fd3d621571" +checksum = "ab3fabce6159dc20728033842636887e4877688ae94382766e00b180abac9d60" dependencies = [ "bumpalo", "proc-macro2", @@ -4527,9 +4558,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.112" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0861f0dcdf46ea819407495634953cdcc8a8c7215ab799a7a7ce366be71c7b30" +checksum = "de0e091bdb824da87dc01d967388880d017a0a9bc4f3bdc0d86ee9f9336e3bb5" dependencies = [ "unicode-ident", ] @@ -4570,9 +4601,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.89" +version = "0.3.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10053fbf9a374174094915bbce141e87a6bf32ecd9a002980db4b638405e8962" +checksum = "705eceb4ce901230f8625bd1d665128056ccbe4b7408faa625eec1ba80f59a97" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/src/handlers/search.rs b/src/handlers/search.rs index b1d9f50..90d8017 100644 --- a/src/handlers/search.rs +++ b/src/handlers/search.rs @@ -5,7 +5,6 @@ pub async fn search_transactions(State(_pool_manager): State) -> im StatusCode::NOT_IMPLEMENTED } - /// Wrapper for use with ApiState in create_app pub async fn search_transactions_wrapper( State(api_state): State, diff --git a/src/utils/sanitize.rs b/src/utils/sanitize.rs index 7d24d7e..a0aefee 100644 --- a/src/utils/sanitize.rs +++ b/src/utils/sanitize.rs @@ -96,14 +96,20 @@ mod tests { }); let sanitized = sanitize_json(&input); - - assert!(sanitized["stellar_account"].as_str().unwrap().contains("****")); + + assert!(sanitized["stellar_account"] + .as_str() + .unwrap() + .contains("****")); assert!(sanitized["account"].as_str().unwrap().contains("****")); assert!(sanitized["password"].as_str().unwrap().contains("****")); assert!(sanitized["secret"].as_str().unwrap().contains("****")); assert!(sanitized["token"].as_str().unwrap().contains("****")); assert!(sanitized["api_key"].as_str().unwrap().contains("****")); - assert!(sanitized["authorization"].as_str().unwrap().contains("****")); + assert!(sanitized["authorization"] + .as_str() + .unwrap() + .contains("****")); assert_eq!(sanitized["public_field"], "visible_data"); } @@ -126,11 +132,23 @@ mod tests { }); let sanitized = sanitize_json(&input); - - assert!(sanitized["level1"]["level2"]["level3"]["password"].as_str().unwrap().contains("****")); - assert!(sanitized["level1"]["level2"]["level3"]["level4"]["token"].as_str().unwrap().contains("****")); - assert_eq!(sanitized["level1"]["level2"]["level3"]["level4"]["data"], "public"); - assert!(sanitized["level1"]["level2"]["account"].as_str().unwrap().contains("****")); + + assert!(sanitized["level1"]["level2"]["level3"]["password"] + .as_str() + .unwrap() + .contains("****")); + assert!(sanitized["level1"]["level2"]["level3"]["level4"]["token"] + .as_str() + .unwrap() + .contains("****")); + assert_eq!( + sanitized["level1"]["level2"]["level3"]["level4"]["data"], + "public" + ); + assert!(sanitized["level1"]["level2"]["account"] + .as_str() + .unwrap() + .contains("****")); assert_eq!(sanitized["level1"]["public"], "visible"); } @@ -147,12 +165,21 @@ mod tests { }); let sanitized = sanitize_json(&input); - - assert!(sanitized["users"][0]["account"].as_str().unwrap().contains("****")); + + assert!(sanitized["users"][0]["account"] + .as_str() + .unwrap() + .contains("****")); assert_eq!(sanitized["users"][0]["name"], "Alice"); - assert!(sanitized["users"][1]["account"].as_str().unwrap().contains("****")); + assert!(sanitized["users"][1]["account"] + .as_str() + .unwrap() + .contains("****")); assert_eq!(sanitized["users"][1]["name"], "Bob"); - assert!(sanitized["users"][2]["password"].as_str().unwrap().contains("****")); + assert!(sanitized["users"][2]["password"] + .as_str() + .unwrap() + .contains("****")); assert_eq!(sanitized["users"][2]["email"], "test@example.com"); assert_eq!(sanitized["tokens"], json!(["token1", "token2", "token3"])); assert_eq!(sanitized["numbers"], json!([1, 2, 3])); @@ -172,7 +199,7 @@ mod tests { }); let sanitized = sanitize_json(&input); - + assert_eq!(sanitized["account"], "****"); assert_eq!(sanitized["password"], "****"); assert_eq!(sanitized["token"], "****"); @@ -188,7 +215,10 @@ mod tests { let mut large_object = serde_json::Map::new(); for i in 0..1000 { large_object.insert(format!("field_{}", i), json!(format!("value_{}", i))); - large_object.insert(format!("account_{}", i), json!(format!("secret_account_{}", i))); + large_object.insert( + format!("account_{}", i), + json!(format!("secret_account_{}", i)), + ); } let input = Value::Object(large_object); @@ -196,7 +226,11 @@ mod tests { let sanitized = sanitize_json(&input); let duration = start.elapsed(); - assert!(duration.as_millis() < 1000, "Sanitization took too long: {:?}", duration); + assert!( + duration.as_millis() < 1000, + "Sanitization took too long: {:?}", + duration + ); assert!(sanitized["account_0"].as_str().unwrap().contains("****")); assert_eq!(sanitized["field_0"], "value_0"); } diff --git a/tests/metrics_test.rs b/tests/metrics_test.rs index 0c15ec9..cc2086f 100644 --- a/tests/metrics_test.rs +++ b/tests/metrics_test.rs @@ -27,27 +27,23 @@ async fn test_gauge_updates() { #[tokio::test] async fn test_prometheus_export_format() { use sqlx::postgres::PgPoolOptions; - + let database_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://synapse:synapse@localhost:5432/synapse_test".to_string()); - + let pool = PgPoolOptions::new() .max_connections(1) .connect(&database_url) .await .expect("Failed to connect to test database"); - + let handle = init_metrics().expect("Failed to initialize metrics"); - - let result = metrics_handler( - axum::extract::State(handle), - axum::extract::State(pool), - ) - .await; - + + let result = metrics_handler(axum::extract::State(handle), axum::extract::State(pool)).await; + assert!(result.is_ok()); let metrics_output = result.unwrap(); - + assert!(metrics_output.starts_with('#')); assert!(metrics_output.contains("Metrics")); } @@ -61,7 +57,7 @@ async fn test_metrics_authentication() { response::Response, }; use synapse_core::config::Config; - + let config = Config { server_port: 3000, database_url: "postgres://test".to_string(), @@ -77,23 +73,18 @@ async fn test_metrics_authentication() { backup_dir: "./backups".to_string(), backup_encryption_key: None, }; - + let request = Request::builder() .uri("/metrics") .body(Body::empty()) .unwrap(); - + let next = Next::new(|_req: Request| async { Ok::(Response::new(Body::empty())) }); - - let result = metrics_auth_middleware( - axum::extract::State(config), - request, - next, - ) - .await; - + + let result = metrics_auth_middleware(axum::extract::State(config), request, next).await; + assert!(result.is_ok()); } @@ -101,7 +92,7 @@ async fn test_metrics_authentication() { fn test_metrics_handle_clone() { let handle = init_metrics().expect("Failed to initialize metrics"); let cloned = handle.clone(); - + assert!(std::mem::size_of_val(&handle) > 0); assert!(std::mem::size_of_val(&cloned) > 0); } @@ -109,24 +100,25 @@ fn test_metrics_handle_clone() { #[test] fn test_metrics_state_creation() { use sqlx::postgres::PgPoolOptions; - + tokio::runtime::Runtime::new().unwrap().block_on(async { - let database_url = std::env::var("DATABASE_URL") - .unwrap_or_else(|_| "postgres://synapse:synapse@localhost:5432/synapse_test".to_string()); - + let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| { + "postgres://synapse:synapse@localhost:5432/synapse_test".to_string() + }); + let pool = PgPoolOptions::new() .max_connections(1) .connect(&database_url) .await .expect("Failed to connect to test database"); - + let handle = init_metrics().expect("Failed to initialize metrics"); - + let state = MetricsState { handle: handle.clone(), pool: pool.clone(), }; - + let cloned_state = state.clone(); assert!(std::mem::size_of_val(&cloned_state) > 0); }); diff --git a/tests/request_logger_test.rs b/tests/request_logger_test.rs index c17f657..b5f2e7b 100644 --- a/tests/request_logger_test.rs +++ b/tests/request_logger_test.rs @@ -27,7 +27,9 @@ fn create_test_app() -> Router { .route("/test", post(test_handler)) .route("/query", get(test_handler_with_query)) .route("/error", get(test_handler_error)) - .layer(middleware::from_fn(synapse_core::middleware::request_logger::request_logger_middleware)) + .layer(middleware::from_fn( + synapse_core::middleware::request_logger::request_logger_middleware, + )) } #[tokio::test] @@ -47,10 +49,10 @@ async fn test_request_id_generation() { // Verify request ID is present in response headers assert!(response.headers().contains_key("x-request-id")); - + let request_id = response.headers().get("x-request-id").unwrap(); let request_id_str = request_id.to_str().unwrap(); - + // Verify it's a valid UUID format assert_eq!(request_id_str.len(), 36); // UUID v4 format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx assert_eq!(request_id_str.chars().filter(|&c| c == '-').count(), 4); @@ -83,8 +85,18 @@ async fn test_request_id_uniqueness() { .await .unwrap(); - let request_id1 = response1.headers().get("x-request-id").unwrap().to_str().unwrap(); - let request_id2 = response2.headers().get("x-request-id").unwrap().to_str().unwrap(); + let request_id1 = response1 + .headers() + .get("x-request-id") + .unwrap() + .to_str() + .unwrap(); + let request_id2 = response2 + .headers() + .get("x-request-id") + .unwrap() + .to_str() + .unwrap(); // Verify each request gets a unique ID assert_ne!(request_id1, request_id2); @@ -144,7 +156,7 @@ async fn test_request_logging_query_params() { assert_eq!(response.status(), StatusCode::OK); assert!(response.headers().contains_key("x-request-id")); - + // Verify the request was processed successfully with query params let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX) .await @@ -171,10 +183,10 @@ async fn test_request_logging_errors() { // Verify error status is returned assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); - + // Verify request ID is still present even on error assert!(response.headers().contains_key("x-request-id")); - + let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX) .await .unwrap(); @@ -426,7 +438,7 @@ async fn test_request_logging_multiple_requests() { .unwrap(); assert_eq!(response.status(), StatusCode::OK); - + let request_id = response .headers() .get("x-request-id") @@ -434,11 +446,14 @@ async fn test_request_logging_multiple_requests() { .to_str() .unwrap() .to_string(); - + request_ids.push(request_id); } // Verify all request IDs are unique - let unique_count = request_ids.iter().collect::>().len(); + let unique_count = request_ids + .iter() + .collect::>() + .len(); assert_eq!(unique_count, 5); } diff --git a/tests/search_test.rs b/tests/search_test.rs index 86ba0be..8af52c6 100644 --- a/tests/search_test.rs +++ b/tests/search_test.rs @@ -1,32 +1,42 @@ -use synapse_core::{create_app, AppState}; +use chrono::{Duration, Utc}; +use reqwest::StatusCode; +use serde_json::json; +use sqlx::types::BigDecimal; +use sqlx::{migrate::Migrator, PgPool}; +use std::path::Path; use synapse_core::db::pool_manager::PoolManager; use synapse_core::services::feature_flags::FeatureFlagService; -use testcontainers_modules::postgres::Postgres; +use synapse_core::{create_app, AppState}; use testcontainers::runners::AsyncRunner; -use sqlx::{PgPool, migrate::Migrator}; -use std::path::Path; +use testcontainers_modules::postgres::Postgres; use tokio::net::TcpListener; -use reqwest::StatusCode; -use serde_json::json; -use chrono::{Utc, Duration}; use uuid::Uuid; -use sqlx::types::BigDecimal; async fn setup_test_app() -> (String, PgPool, impl std::any::Any) { let container = Postgres::default().start().await.unwrap(); let host_port = container.get_host_port_ipv4(5432).await.unwrap(); - let database_url = format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", host_port); + let database_url = format!( + "postgres://postgres:postgres@127.0.0.1:{}/postgres", + host_port + ); let pool = PgPool::connect(&database_url).await.unwrap(); - let migrator = Migrator::new(Path::join(Path::new(env!("CARGO_MANIFEST_DIR")), "migrations")).await.unwrap(); + let migrator = Migrator::new(Path::join( + Path::new(env!("CARGO_MANIFEST_DIR")), + "migrations", + )) + .await + .unwrap(); migrator.run(&pool).await.unwrap(); let pool_manager = PoolManager::new(pool.clone(), None); - + let app_state = AppState { db: pool.clone(), pool_manager, - horizon_client: synapse_core::stellar::HorizonClient::new("https://horizon-testnet.stellar.org".to_string()), + horizon_client: synapse_core::stellar::HorizonClient::new( + "https://horizon-testnet.stellar.org".to_string(), + ), feature_flags: FeatureFlagService::new(false), redis_url: "redis://localhost:6379".to_string(), start_time: std::time::Instant::now(), @@ -48,7 +58,7 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) { /// Seed test database with known transactions for predictable assertions async fn seed_test_data(pool: &PgPool) { let now = Utc::now(); - + // Transaction 1: USD, pending, recent sqlx::query!( r#" @@ -154,11 +164,12 @@ async fn seed_test_data(pool: &PgPool) { async fn test_search_by_status() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Search for completed transactions - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("status", "completed")]) .send() .await @@ -166,10 +177,10 @@ async fn test_search_by_status() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + assert_eq!(response["total"], 3); // 3 completed transactions assert!(response["results"].is_array()); - + // Verify all results have completed status for tx in response["results"].as_array().unwrap() { assert_eq!(tx["status"], "completed"); @@ -180,11 +191,12 @@ async fn test_search_by_status() { async fn test_search_by_asset_code() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Search for USD transactions - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("asset_code", "USD")]) .send() .await @@ -192,9 +204,9 @@ async fn test_search_by_asset_code() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + assert_eq!(response["total"], 3); // 3 USD transactions - + // Verify all results have USD asset code for tx in response["results"].as_array().unwrap() { assert_eq!(tx["asset_code"], "USD"); @@ -205,15 +217,16 @@ async fn test_search_by_asset_code() { async fn test_search_by_date_range() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); let now = Utc::now(); - + // Search for transactions in the last 3 days let from = (now - Duration::days(3)).to_rfc3339(); let to = now.to_rfc3339(); - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("from", &from), ("to", &to)]) .send() .await @@ -221,7 +234,7 @@ async fn test_search_by_date_range() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + // Should return transactions from last 3 days (not the 5-day old one) assert_eq!(response["total"], 4); } @@ -230,11 +243,12 @@ async fn test_search_by_date_range() { async fn test_search_pagination() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // First page with limit 2 - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("limit", "2")]) .send() .await @@ -242,14 +256,15 @@ async fn test_search_pagination() { assert_eq!(res.status(), StatusCode::OK); let page1: serde_json::Value = res.json().await.unwrap(); - + assert_eq!(page1["results"].as_array().unwrap().len(), 2); assert!(page1["next_cursor"].is_string()); - + let cursor = page1["next_cursor"].as_str().unwrap(); // Second page using cursor - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("limit", "2"), ("cursor", cursor)]) .send() .await @@ -257,9 +272,9 @@ async fn test_search_pagination() { assert_eq!(res.status(), StatusCode::OK); let page2: serde_json::Value = res.json().await.unwrap(); - + assert_eq!(page2["results"].as_array().unwrap().len(), 2); - + // Verify no duplicate IDs between pages let page1_ids: Vec<&str> = page1["results"] .as_array() @@ -267,14 +282,14 @@ async fn test_search_pagination() { .iter() .map(|tx| tx["id"].as_str().unwrap()) .collect(); - + let page2_ids: Vec<&str> = page2["results"] .as_array() .unwrap() .iter() .map(|tx| tx["id"].as_str().unwrap()) .collect(); - + for id in &page1_ids { assert!(!page2_ids.contains(id)); } @@ -284,11 +299,12 @@ async fn test_search_pagination() { async fn test_search_empty_results() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Search for non-existent asset code - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("asset_code", "XYZ")]) .send() .await @@ -296,7 +312,7 @@ async fn test_search_empty_results() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + assert_eq!(response["total"], 0); assert_eq!(response["results"].as_array().unwrap().len(), 0); assert!(response["next_cursor"].is_null()); @@ -306,11 +322,12 @@ async fn test_search_empty_results() { async fn test_search_invalid_parameters() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Invalid date format - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("from", "invalid-date")]) .send() .await @@ -321,7 +338,8 @@ async fn test_search_invalid_parameters() { assert!(error.contains("Invalid 'from' date")); // Invalid cursor - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("cursor", "invalid-cursor")]) .send() .await @@ -332,7 +350,8 @@ async fn test_search_invalid_parameters() { assert!(error.contains("Invalid cursor")); // Invalid min_amount - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("min_amount", "not-a-number")]) .send() .await @@ -343,16 +362,16 @@ async fn test_search_invalid_parameters() { assert!(error.contains("Invalid 'min_amount'")); } - #[tokio::test] async fn test_search_combined_filters() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Search for completed USD transactions - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("status", "completed"), ("asset_code", "USD")]) .send() .await @@ -360,10 +379,10 @@ async fn test_search_combined_filters() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + // Should return only completed USD transactions assert_eq!(response["total"], 1); - + for tx in response["results"].as_array().unwrap() { assert_eq!(tx["status"], "completed"); assert_eq!(tx["asset_code"], "USD"); @@ -374,11 +393,12 @@ async fn test_search_combined_filters() { async fn test_search_by_stellar_account() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Search for specific stellar account - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("stellar_account", "GABC1111111111")]) .send() .await @@ -386,7 +406,7 @@ async fn test_search_by_stellar_account() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + assert_eq!(response["total"], 1); assert_eq!(response["results"][0]["stellar_account"], "GABC1111111111"); } @@ -395,11 +415,12 @@ async fn test_search_by_stellar_account() { async fn test_search_with_amount_range() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Search for transactions between 100 and 500 - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("min_amount", "100"), ("max_amount", "500")]) .send() .await @@ -407,10 +428,10 @@ async fn test_search_with_amount_range() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + // Should return transactions with amounts 100, 250, and 500 assert_eq!(response["total"], 3); - + for tx in response["results"].as_array().unwrap() { let amount: f64 = tx["amount"].as_str().unwrap().parse().unwrap(); assert!(amount >= 100.0 && amount <= 500.0); @@ -421,11 +442,12 @@ async fn test_search_with_amount_range() { async fn test_search_limit_boundaries() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Test with limit 1 - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("limit", "1")]) .send() .await @@ -437,7 +459,8 @@ async fn test_search_limit_boundaries() { assert!(response["next_cursor"].is_string()); // Test with limit exceeding max (should cap at 100) - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("limit", "200")]) .send() .await @@ -453,11 +476,12 @@ async fn test_search_limit_boundaries() { async fn test_search_no_next_cursor_on_last_page() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Request all results with high limit - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("limit", "100")]) .send() .await @@ -465,7 +489,7 @@ async fn test_search_no_next_cursor_on_last_page() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); - + // Should have no next_cursor since all results fit in one page assert!(response["next_cursor"].is_null()); } @@ -474,11 +498,12 @@ async fn test_search_no_next_cursor_on_last_page() { async fn test_search_ordering() { let (base_url, pool, _container) = setup_test_app().await; seed_test_data(&pool).await; - + let client = reqwest::Client::new(); // Get all transactions - let res = client.get(&format!("{}/transactions/search", base_url)) + let res = client + .get(&format!("{}/transactions/search", base_url)) .query(&[("limit", "100")]) .send() .await @@ -487,11 +512,14 @@ async fn test_search_ordering() { assert_eq!(res.status(), StatusCode::OK); let response: serde_json::Value = res.json().await.unwrap(); let results = response["results"].as_array().unwrap(); - + // Verify results are ordered by created_at DESC for i in 0..results.len() - 1 { let current_date = results[i]["created_at"].as_str().unwrap(); let next_date = results[i + 1]["created_at"].as_str().unwrap(); - assert!(current_date >= next_date, "Results should be ordered by created_at DESC"); + assert!( + current_date >= next_date, + "Results should be ordered by created_at DESC" + ); } } diff --git a/tests/settlement_test.rs b/tests/settlement_test.rs index 0b2c7f6..bc1b741 100644 --- a/tests/settlement_test.rs +++ b/tests/settlement_test.rs @@ -1,12 +1,12 @@ -use synapse_core::services::SettlementService; +use bigdecimal::BigDecimal; +use chrono::{Duration, Utc}; +use sqlx::{migrate::Migrator, PgPool}; +use std::path::Path; use synapse_core::db::models::Transaction; use synapse_core::error::AppError; -use testcontainers_modules::postgres::Postgres; +use synapse_core::services::SettlementService; use testcontainers::runners::AsyncRunner; -use sqlx::{PgPool, migrate::Migrator}; -use std::path::Path; -use chrono::{Utc, Duration}; -use bigdecimal::BigDecimal; +use testcontainers_modules::postgres::Postgres; async fn setup_test_db() -> (PgPool, impl std::any::Any) { let container = Postgres::default().start().await.unwrap(); @@ -173,7 +173,10 @@ async fn test_settle_error_handling() { let service = SettlementService::new(pool.clone()); // cause a database error by dropping the table before the call - sqlx::query("DROP TABLE transactions").execute(&pool).await.unwrap(); + sqlx::query("DROP TABLE transactions") + .execute(&pool) + .await + .unwrap(); let err = service.settle_asset("USD").await; assert!(matches!(err, Err(AppError::DatabaseError(_)))); diff --git a/tests/startup_validation_test.rs b/tests/startup_validation_test.rs index 98e76f6..569c82a 100644 --- a/tests/startup_validation_test.rs +++ b/tests/startup_validation_test.rs @@ -1,9 +1,9 @@ +use sqlx::{migrate::Migrator, PgPool}; +use std::path::Path; +use synapse_core::config::{AllowedIps, Config, LogFormat}; use synapse_core::startup::{validate_environment, ValidationReport}; -use synapse_core::config::{Config, AllowedIps, LogFormat}; -use testcontainers_modules::postgres::Postgres; use testcontainers::runners::AsyncRunner; -use sqlx::{PgPool, migrate::Migrator}; -use std::path::Path; +use testcontainers_modules::postgres::Postgres; /// Helper function to create a test config with valid defaults fn create_test_config(database_url: String, redis_url: String, horizon_url: String) -> Config { @@ -28,12 +28,18 @@ fn create_test_config(database_url: String, redis_url: String, horizon_url: Stri async fn setup_test_database() -> (PgPool, impl std::any::Any) { let container = Postgres::default().start().await.unwrap(); let host_port = container.get_host_port_ipv4(5432).await.unwrap(); - let database_url = format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", host_port); + let database_url = format!( + "postgres://postgres:postgres@127.0.0.1:{}/postgres", + host_port + ); let pool = PgPool::connect(&database_url).await.unwrap(); - let migrator = Migrator::new(Path::join(Path::new(env!("CARGO_MANIFEST_DIR")), "migrations")) - .await - .unwrap(); + let migrator = Migrator::new(Path::join( + Path::new(env!("CARGO_MANIFEST_DIR")), + "migrations", + )) + .await + .unwrap(); migrator.run(&pool).await.unwrap(); (pool, container) @@ -62,10 +68,10 @@ async fn test_validation_all_healthy() { assert!(report.environment, "Environment validation should pass"); assert!(report.database, "Database validation should pass"); assert!(report.horizon, "Horizon validation should pass"); - + // Note: Redis might fail if not running locally, which is expected in CI // In production tests, you'd use testcontainers for Redis too - + report.print(); } @@ -80,7 +86,7 @@ async fn test_validation_database_unavailable() { // Create a pool that will fail to connect let pool_result = PgPool::connect(&invalid_database_url).await; - + // If we can't even create the pool, that's expected if pool_result.is_err() { // This is the expected behavior - database is unavailable @@ -94,11 +100,11 @@ async fn test_validation_database_unavailable() { assert!(!report.database, "Database validation should fail"); assert!(!report.is_valid(), "Overall validation should fail"); assert!(!report.errors.is_empty(), "Should have error messages"); - + // Check that error message mentions database let has_db_error = report.errors.iter().any(|e| e.contains("Database")); assert!(has_db_error, "Should have database error in report"); - + report.print(); } @@ -122,11 +128,11 @@ async fn test_validation_redis_unavailable() { assert!(report.database, "Database validation should pass"); assert!(!report.redis, "Redis validation should fail"); assert!(!report.is_valid(), "Overall validation should fail"); - + // Check that error message mentions Redis let has_redis_error = report.errors.iter().any(|e| e.contains("Redis")); assert!(has_redis_error, "Should have Redis error in report"); - + report.print(); } @@ -137,9 +143,10 @@ async fn test_validation_horizon_unavailable() { let database_url = pool.connect_options().to_url_lossy().to_string(); let redis_url = "redis://127.0.0.1:6379".to_string(); - + // Use invalid Horizon URL - let invalid_horizon_url = "https://invalid-horizon-url-that-does-not-exist.stellar.org".to_string(); + let invalid_horizon_url = + "https://invalid-horizon-url-that-does-not-exist.stellar.org".to_string(); let config = create_test_config(database_url, redis_url, invalid_horizon_url); @@ -151,11 +158,11 @@ async fn test_validation_horizon_unavailable() { assert!(report.database, "Database validation should pass"); assert!(!report.horizon, "Horizon validation should fail"); assert!(!report.is_valid(), "Overall validation should fail"); - + // Check that error message mentions Horizon let has_horizon_error = report.errors.iter().any(|e| e.contains("Horizon")); assert!(has_horizon_error, "Should have Horizon error in report"); - + report.print(); } @@ -177,16 +184,16 @@ async fn test_validation_report_generation() { // Test report structure assert!(!report.is_valid(), "Report should indicate failure"); assert!(!report.errors.is_empty(), "Report should contain errors"); - + // Verify report contains expected fields assert!(report.environment, "Environment should be valid"); assert!(report.database, "Database should be valid"); assert!(!report.redis, "Redis should be invalid"); assert!(report.horizon, "Horizon should be valid"); - + // Test print functionality (visual verification in test output) report.print(); - + // Verify error messages are descriptive for error in &report.errors { assert!(!error.is_empty(), "Error messages should not be empty"); @@ -210,12 +217,15 @@ async fn test_validation_empty_database_url() { let report = validate_environment(&config, &pool).await.unwrap(); // Assertions - assert!(!report.environment, "Environment validation should fail with empty database URL"); + assert!( + !report.environment, + "Environment validation should fail with empty database URL" + ); assert!(!report.is_valid(), "Overall validation should fail"); - + let has_env_error = report.errors.iter().any(|e| e.contains("Environment")); assert!(has_env_error, "Should have environment error in report"); - + report.print(); } @@ -236,9 +246,12 @@ async fn test_validation_invalid_horizon_url_format() { let report = validate_environment(&config, &pool).await.unwrap(); // Assertions - assert!(!report.environment, "Environment validation should fail with invalid URL format"); + assert!( + !report.environment, + "Environment validation should fail with invalid URL format" + ); assert!(!report.is_valid(), "Overall validation should fail"); - + report.print(); } @@ -263,12 +276,12 @@ async fn test_validation_multiple_failures() { assert!(!report.horizon, "Horizon validation should fail"); assert!(!report.is_valid(), "Overall validation should fail"); assert!(report.errors.len() >= 2, "Should have multiple errors"); - + // Verify both Redis and Horizon errors are present let has_redis_error = report.errors.iter().any(|e| e.contains("Redis")); let has_horizon_error = report.errors.iter().any(|e| e.contains("Horizon")); assert!(has_redis_error, "Should have Redis error"); assert!(has_horizon_error, "Should have Horizon error"); - + report.print(); } diff --git a/tests/websocket_test.rs b/tests/websocket_test.rs index 492de10..5b2c5d5 100644 --- a/tests/websocket_test.rs +++ b/tests/websocket_test.rs @@ -1,41 +1,56 @@ -use synapse_core::{create_app, AppState}; +use chrono::Utc; +use futures::{SinkExt, StreamExt}; +use sqlx::{migrate::Migrator, PgPool}; +use std::path::Path; use synapse_core::db::pool_manager::PoolManager; -use synapse_core::services::feature_flags::FeatureFlagService; use synapse_core::handlers::ws::TransactionStatusUpdate; -use testcontainers_modules::postgres::Postgres; +use synapse_core::services::feature_flags::FeatureFlagService; +use synapse_core::{create_app, AppState}; use testcontainers::runners::AsyncRunner; -use sqlx::{PgPool, migrate::Migrator}; -use std::path::Path; +use testcontainers_modules::postgres::Postgres; use tokio::net::TcpListener; use tokio::sync::broadcast; use tokio_tungstenite::{connect_async, tungstenite::Message}; -use futures::{SinkExt, StreamExt}; -use chrono::Utc; use uuid::Uuid; -async fn setup_test_app() -> (String, PgPool, broadcast::Sender, impl std::any::Any) { +async fn setup_test_app() -> ( + String, + PgPool, + broadcast::Sender, + impl std::any::Any, +) { let container = Postgres::default().start().await.unwrap(); let host_port = container.get_host_port_ipv4(5432).await.unwrap(); - let database_url = format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", host_port); + let database_url = format!( + "postgres://postgres:postgres@127.0.0.1:{}/postgres", + host_port + ); let pool = PgPool::connect(&database_url).await.unwrap(); - let migrator = Migrator::new(Path::join(Path::new(env!("CARGO_MANIFEST_DIR")), "migrations")).await.unwrap(); + let migrator = Migrator::new(Path::join( + Path::new(env!("CARGO_MANIFEST_DIR")), + "migrations", + )) + .await + .unwrap(); migrator.run(&pool).await.unwrap(); let pool_manager = PoolManager::new(pool.clone(), None); let (tx_broadcast, _) = broadcast::channel::(100); - + let app_state = AppState { db: pool.clone(), pool_manager, - horizon_client: synapse_core::stellar::HorizonClient::new("https://horizon-testnet.stellar.org".to_string()), + horizon_client: synapse_core::stellar::HorizonClient::new( + "https://horizon-testnet.stellar.org".to_string(), + ), feature_flags: FeatureFlagService::new(false), redis_url: "redis://localhost:6379".to_string(), start_time: std::time::Instant::now(), readiness: synapse_core::ReadinessState::new(), tx_broadcast: tx_broadcast.clone(), }; - + let app = create_app(app_state); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -52,18 +67,18 @@ async fn setup_test_app() -> (String, PgPool, broadcast::Sender { // If it connects, it should close immediately or we should get an error - let msg = tokio::time::timeout( - tokio::time::Duration::from_secs(2), - ws_stream.next() - ).await; - + let msg = + tokio::time::timeout(tokio::time::Duration::from_secs(2), ws_stream.next()).await; + // Should either timeout or receive close message assert!(msg.is_err() || matches!(msg.unwrap(), Some(Ok(Message::Close(_))))); } @@ -99,14 +112,14 @@ async fn test_ws_connection_rejected_invalid_token() { #[tokio::test] async fn test_ws_receives_transaction_updates() { let (base_url, _pool, tx_broadcast, _container) = setup_test_app().await; - + // Connect WebSocket client let ws_url = format!("{}/ws?token=test-token", base_url); let (mut ws_stream, _) = connect_async(&ws_url).await.unwrap(); - + // Give the connection time to establish tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - + // Broadcast a transaction update let transaction_id = Uuid::new_v4(); let update = TransactionStatusUpdate { @@ -115,19 +128,16 @@ async fn test_ws_receives_transaction_updates() { timestamp: Utc::now(), message: Some("Transaction processed successfully".to_string()), }; - + tx_broadcast.send(update.clone()).unwrap(); - + // Wait for the message - let msg = tokio::time::timeout( - tokio::time::Duration::from_secs(5), - ws_stream.next() - ).await; - + let msg = tokio::time::timeout(tokio::time::Duration::from_secs(5), ws_stream.next()).await; + assert!(msg.is_ok(), "Should receive message within timeout"); - + let msg = msg.unwrap().unwrap().unwrap(); - + if let Message::Text(text) = msg { let received: TransactionStatusUpdate = serde_json::from_str(&text).unwrap(); assert_eq!(received.transaction_id, transaction_id); @@ -135,26 +145,26 @@ async fn test_ws_receives_transaction_updates() { } else { panic!("Expected text message, got {:?}", msg); } - + ws_stream.close(None).await.unwrap(); } #[tokio::test] async fn test_ws_multiple_clients_receive_broadcast() { let (base_url, _pool, tx_broadcast, _container) = setup_test_app().await; - + // Connect multiple WebSocket clients let ws_url1 = format!("{}/ws?token=client1", base_url); let ws_url2 = format!("{}/ws?token=client2", base_url); let ws_url3 = format!("{}/ws?token=client3", base_url); - + let (mut ws_stream1, _) = connect_async(&ws_url1).await.unwrap(); let (mut ws_stream2, _) = connect_async(&ws_url2).await.unwrap(); let (mut ws_stream3, _) = connect_async(&ws_url3).await.unwrap(); - + // Give connections time to establish tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - + // Broadcast a transaction update let transaction_id = Uuid::new_v4(); let update = TransactionStatusUpdate { @@ -163,26 +173,29 @@ async fn test_ws_multiple_clients_receive_broadcast() { timestamp: Utc::now(), message: None, }; - + let sent_count = tx_broadcast.send(update.clone()).unwrap(); assert_eq!(sent_count, 3, "Should have 3 active subscribers"); - + // All clients should receive the message - let msg1 = tokio::time::timeout( - tokio::time::Duration::from_secs(5), - ws_stream1.next() - ).await.unwrap().unwrap().unwrap(); - - let msg2 = tokio::time::timeout( - tokio::time::Duration::from_secs(5), - ws_stream2.next() - ).await.unwrap().unwrap().unwrap(); - - let msg3 = tokio::time::timeout( - tokio::time::Duration::from_secs(5), - ws_stream3.next() - ).await.unwrap().unwrap().unwrap(); - + let msg1 = tokio::time::timeout(tokio::time::Duration::from_secs(5), ws_stream1.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + let msg2 = tokio::time::timeout(tokio::time::Duration::from_secs(5), ws_stream2.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + let msg3 = tokio::time::timeout(tokio::time::Duration::from_secs(5), ws_stream3.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + // Verify all received the same update for msg in [msg1, msg2, msg3] { if let Message::Text(text) = msg { @@ -193,7 +206,7 @@ async fn test_ws_multiple_clients_receive_broadcast() { panic!("Expected text message"); } } - + ws_stream1.close(None).await.unwrap(); ws_stream2.close(None).await.unwrap(); ws_stream3.close(None).await.unwrap(); @@ -202,14 +215,14 @@ async fn test_ws_multiple_clients_receive_broadcast() { #[tokio::test] async fn test_ws_connection_cleanup_on_disconnect() { let (base_url, _pool, tx_broadcast, _container) = setup_test_app().await; - + // Connect a client let ws_url = format!("{}/ws?token=test-client", base_url); let (ws_stream, _) = connect_async(&ws_url).await.unwrap(); - + // Give connection time to establish tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - + // Verify client is subscribed let update = TransactionStatusUpdate { transaction_id: Uuid::new_v4(), @@ -217,16 +230,16 @@ async fn test_ws_connection_cleanup_on_disconnect() { timestamp: Utc::now(), message: None, }; - + let sent_count = tx_broadcast.send(update.clone()).unwrap(); assert_eq!(sent_count, 1, "Should have 1 active subscriber"); - + // Drop the connection (simulates client disconnect) drop(ws_stream); - + // Give time for cleanup tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - + // Try to broadcast again - should have 0 subscribers let update2 = TransactionStatusUpdate { transaction_id: Uuid::new_v4(), @@ -234,123 +247,122 @@ async fn test_ws_connection_cleanup_on_disconnect() { timestamp: Utc::now(), message: None, }; - + let sent_count2 = tx_broadcast.send(update2).unwrap(); - assert_eq!(sent_count2, 0, "Should have 0 active subscribers after disconnect"); + assert_eq!( + sent_count2, 0, + "Should have 0 active subscribers after disconnect" + ); } #[tokio::test] async fn test_ws_heartbeat_keeps_connection_alive() { let (base_url, _pool, _tx, _container) = setup_test_app().await; - + // Connect WebSocket client let ws_url = format!("{}/ws?token=heartbeat-test", base_url); let (mut ws_stream, _) = connect_async(&ws_url).await.unwrap(); - + // Wait for heartbeat ping (server sends every 30 seconds, but we'll wait a bit) // Note: In real tests, you might want to mock time or reduce heartbeat interval - let msg = tokio::time::timeout( - tokio::time::Duration::from_secs(35), - async { - loop { - if let Some(Ok(msg)) = ws_stream.next().await { - if matches!(msg, Message::Ping(_)) { - return msg; - } + let msg = tokio::time::timeout(tokio::time::Duration::from_secs(35), async { + loop { + if let Some(Ok(msg)) = ws_stream.next().await { + if matches!(msg, Message::Ping(_)) { + return msg; } } } - ).await; - + }) + .await; + assert!(msg.is_ok(), "Should receive heartbeat ping"); - + ws_stream.close(None).await.unwrap(); } #[tokio::test] async fn test_ws_client_can_send_messages() { let (base_url, _pool, _tx, _container) = setup_test_app().await; - + // Connect WebSocket client let ws_url = format!("{}/ws?token=send-test", base_url); let (mut ws_stream, _) = connect_async(&ws_url).await.unwrap(); - + // Send a text message to server let test_message = r#"{"action":"subscribe","filters":{"status":"completed"}}"#; - ws_stream.send(Message::Text(test_message.to_string())).await.unwrap(); - + ws_stream + .send(Message::Text(test_message.to_string())) + .await + .unwrap(); + // Server should handle it gracefully (even if it doesn't respond) // Wait a bit to ensure no errors tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - + // Connection should still be alive ws_stream.send(Message::Ping(vec![])).await.unwrap(); - + ws_stream.close(None).await.unwrap(); } #[tokio::test] async fn test_ws_handles_rapid_broadcasts() { let (base_url, _pool, tx_broadcast, _container) = setup_test_app().await; - + // Connect WebSocket client let ws_url = format!("{}/ws?token=rapid-test", base_url); let (mut ws_stream, _) = connect_async(&ws_url).await.unwrap(); - + // Give connection time to establish tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - + // Send multiple rapid updates let mut sent_ids = Vec::new(); for i in 0..10 { let transaction_id = Uuid::new_v4(); sent_ids.push(transaction_id); - + let update = TransactionStatusUpdate { transaction_id, status: format!("status_{}", i), timestamp: Utc::now(), message: Some(format!("Update {}", i)), }; - + tx_broadcast.send(update).unwrap(); } - + // Receive all messages let mut received_count = 0; for _ in 0..10 { - let msg = tokio::time::timeout( - tokio::time::Duration::from_secs(5), - ws_stream.next() - ).await; - + let msg = tokio::time::timeout(tokio::time::Duration::from_secs(5), ws_stream.next()).await; + if let Ok(Some(Ok(Message::Text(_)))) = msg { received_count += 1; } } - + assert_eq!(received_count, 10, "Should receive all 10 rapid updates"); - + ws_stream.close(None).await.unwrap(); } #[tokio::test] async fn test_ws_connection_with_empty_token() { let (base_url, _pool, _tx, _container) = setup_test_app().await; - + // Try to connect with empty token let ws_url = format!("{}/ws?token=", base_url); let result = connect_async(&ws_url).await; - + // Should be rejected (empty token is invalid) match result { Ok((mut ws_stream, _)) => { // If it connects, it should close immediately - let msg = tokio::time::timeout( - tokio::time::Duration::from_secs(2), - ws_stream.next() - ).await; - + let msg = + tokio::time::timeout(tokio::time::Duration::from_secs(2), ws_stream.next()).await; + assert!(msg.is_err() || matches!(msg.unwrap(), Some(Ok(Message::Close(_))))); } Err(_) => { From f41ac1e04d63d52e72c80007dd5fabd0cbf88c54 Mon Sep 17 00:00:00 2001 From: HexStar Date: Thu, 26 Feb 2026 08:42:02 +0000 Subject: [PATCH 3/4] Fix: Remove unused imports in lock_manager --- src/services/lock_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/services/lock_manager.rs b/src/services/lock_manager.rs index 220c8a4..dc6009a 100644 --- a/src/services/lock_manager.rs +++ b/src/services/lock_manager.rs @@ -1,6 +1,6 @@ -use redis::{aio::MultiplexedConnection, AsyncCommands, Client, Script}; +use redis::{AsyncCommands, Client, Script}; use std::time::Duration; -use tokio::time::{sleep, timeout}; +use tokio::time::sleep; use tracing::{debug, warn}; use uuid::Uuid; From f591b6e78bc3a2d98d23e17497b562cd0bc65499 Mon Sep 17 00:00:00 2001 From: HexStar Date: Thu, 26 Feb 2026 09:23:15 +0000 Subject: [PATCH 4/4] Fix CI: Remove duplicate hyper, add missing modules, fix test AppState, remove unused imports --- Cargo.toml | 1 - src/handlers/admin.rs | 10 +++++++++- src/main.rs | 11 ++++++----- src/middleware/mod.rs | 2 ++ src/middleware/quota.rs | 2 +- src/services/mod.rs | 2 ++ src/services/reconciliation.rs | 2 +- tests/search_test.rs | 1 + tests/websocket_test.rs | 1 + 9 files changed, 23 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 43ebd8f..db2abca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,6 @@ governor = "0.6" redis = { version = "0.24", features = ["tokio-comp"] } jsonschema = "0.17" once_cell = "1.19" -hyper = "0.14" ipnet = "2.9" utoipa = { version = "4", features = ["axum_extras"] } utoipa-swagger-ui = { version = "6", features = ["axum"] } diff --git a/src/handlers/admin.rs b/src/handlers/admin.rs index 288e237..14c941d 100644 --- a/src/handlers/admin.rs +++ b/src/handlers/admin.rs @@ -2,11 +2,11 @@ use crate::middleware::quota::{Quota, QuotaManager, QuotaStatus, ResetSchedule, use axum::{ extract::{Path, State}, http::StatusCode, - response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; +#[derive(Clone)] pub struct AdminState { pub quota_manager: QuotaManager, } @@ -92,3 +92,11 @@ pub async fn reset_quota( .map(|_| StatusCode::OK) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) } + +pub fn admin_routes() -> axum::Router { + use axum::routing::{get, post}; + axum::Router::new() + .route("/quota/:key", get(get_quota_status)) + .route("/quota/:key", post(set_quota)) + .route("/quota/:key/reset", post(reset_quota)) +} diff --git a/src/main.rs b/src/main.rs index acef0e7..1b3a488 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use synapse_core::{ graphql::schema::build_schema, handlers, handlers::ws::TransactionStatusUpdate, - metrics, middleware, + metrics, middleware::idempotency::IdempotencyService, schemas, services::{FeatureFlagService, SettlementService}, @@ -258,10 +258,11 @@ async fn serve(config: config::Config) -> anyhow::Result<()> { let _dlq_routes: Router = handlers::dlq::dlq_routes().with_state(api_state.app_state.db.clone()); - let _admin_routes: Router = Router::new() - .nest("/admin/queue", handlers::admin::admin_routes()) - .layer(axum_middleware::from_fn(middleware::auth::admin_auth)) - .with_state(api_state.app_state.db.clone()); + // Admin routes disabled - requires AdminState setup + // let _admin_routes: Router = Router::new() + // .nest("/admin/queue", handlers::admin::admin_routes()) + // .layer(axum_middleware::from_fn(middleware::auth::admin_auth)) + // .with_state(api_state.app_state.db.clone()); let _search_routes: Router = Router::new() .route( diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 8ff92bc..a6ca80a 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -1,5 +1,7 @@ pub mod auth; pub mod idempotency; pub mod ip_filter; +pub mod quota; pub mod request_logger; +pub mod validate; pub mod versioning; diff --git a/src/middleware/quota.rs b/src/middleware/quota.rs index 43c8649..631fd01 100644 --- a/src/middleware/quota.rs +++ b/src/middleware/quota.rs @@ -1,6 +1,5 @@ use redis::{aio::MultiplexedConnection, AsyncCommands, Client}; use serde::{Deserialize, Serialize}; -use std::time::Duration; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Tier { @@ -43,6 +42,7 @@ impl ResetSchedule { } } +#[derive(Clone)] pub struct QuotaManager { redis_client: Client, } diff --git a/src/services/mod.rs b/src/services/mod.rs index 6c769aa..857c410 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -4,6 +4,7 @@ pub mod feature_flags; pub mod lock_manager; pub mod processor; pub mod query_cache; +pub mod reconciliation; pub mod scheduler; pub mod settlement; pub mod transaction_processor; @@ -13,6 +14,7 @@ pub use account_monitor::AccountMonitor; pub use backup::BackupService; pub use feature_flags::FeatureFlagService; pub use query_cache::{CacheConfig, QueryCache}; +pub use reconciliation::ReconciliationService; pub use scheduler::{Job, JobScheduler, JobStatus}; pub use settlement::SettlementService; pub use transaction_processor::TransactionProcessor; diff --git a/src/services/reconciliation.rs b/src/services/reconciliation.rs index bb7e61d..fc74ae6 100644 --- a/src/services/reconciliation.rs +++ b/src/services/reconciliation.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; -use tracing::{info, warn}; +use tracing::info; use uuid::Uuid; #[derive(Debug, Serialize, Deserialize)] diff --git a/tests/search_test.rs b/tests/search_test.rs index 4ab620c..b192c47 100644 --- a/tests/search_test.rs +++ b/tests/search_test.rs @@ -43,6 +43,7 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) { start_time: std::time::Instant::now(), readiness: synapse_core::ReadinessState::new(), tx_broadcast, + query_cache: synapse_core::services::QueryCache::new("redis://localhost:6379").unwrap(), }; let app = create_app(app_state); diff --git a/tests/websocket_test.rs b/tests/websocket_test.rs index 96c2ed0..347e1e5 100644 --- a/tests/websocket_test.rs +++ b/tests/websocket_test.rs @@ -49,6 +49,7 @@ async fn setup_test_app() -> ( start_time: std::time::Instant::now(), readiness: synapse_core::ReadinessState::new(), tx_broadcast: tx_broadcast.clone(), + query_cache: synapse_core::services::QueryCache::new("redis://localhost:6379").unwrap(), }; let app = create_app(app_state);