diff --git a/Cargo.toml b/Cargo.toml index 43ebd8f..db2abca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,6 @@ governor = "0.6" redis = { version = "0.24", features = ["tokio-comp"] } jsonschema = "0.17" once_cell = "1.19" -hyper = "0.14" ipnet = "2.9" utoipa = { version = "4", features = ["axum_extras"] } utoipa-swagger-ui = { version = "6", features = ["axum"] } diff --git a/src/handlers/admin.rs b/src/handlers/admin.rs index 288e237..14c941d 100644 --- a/src/handlers/admin.rs +++ b/src/handlers/admin.rs @@ -2,11 +2,11 @@ use crate::middleware::quota::{Quota, QuotaManager, QuotaStatus, ResetSchedule, use axum::{ extract::{Path, State}, http::StatusCode, - response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; +#[derive(Clone)] pub struct AdminState { pub quota_manager: QuotaManager, } @@ -92,3 +92,11 @@ pub async fn reset_quota( .map(|_| StatusCode::OK) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) } + +pub fn admin_routes() -> axum::Router { + use axum::routing::{get, post}; + axum::Router::new() + .route("/quota/:key", get(get_quota_status)) + .route("/quota/:key", post(set_quota)) + .route("/quota/:key/reset", post(reset_quota)) +} diff --git a/src/main.rs b/src/main.rs index acef0e7..1b3a488 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use synapse_core::{ graphql::schema::build_schema, handlers, handlers::ws::TransactionStatusUpdate, - metrics, middleware, + metrics, middleware::idempotency::IdempotencyService, schemas, services::{FeatureFlagService, SettlementService}, @@ -258,10 +258,11 @@ async fn serve(config: config::Config) -> anyhow::Result<()> { let _dlq_routes: Router = handlers::dlq::dlq_routes().with_state(api_state.app_state.db.clone()); - let _admin_routes: Router = Router::new() - .nest("/admin/queue", handlers::admin::admin_routes()) - .layer(axum_middleware::from_fn(middleware::auth::admin_auth)) - .with_state(api_state.app_state.db.clone()); + // Admin routes disabled - requires AdminState setup + // let _admin_routes: Router = Router::new() + // .nest("/admin/queue", handlers::admin::admin_routes()) + // .layer(axum_middleware::from_fn(middleware::auth::admin_auth)) + // .with_state(api_state.app_state.db.clone()); let _search_routes: Router = Router::new() .route( diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 8ff92bc..a6ca80a 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -1,5 +1,7 @@ pub mod auth; pub mod idempotency; pub mod ip_filter; +pub mod quota; pub mod request_logger; +pub mod validate; pub mod versioning; diff --git a/src/middleware/quota.rs b/src/middleware/quota.rs index 43c8649..631fd01 100644 --- a/src/middleware/quota.rs +++ b/src/middleware/quota.rs @@ -1,6 +1,5 @@ use redis::{aio::MultiplexedConnection, AsyncCommands, Client}; use serde::{Deserialize, Serialize}; -use std::time::Duration; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Tier { @@ -43,6 +42,7 @@ impl ResetSchedule { } } +#[derive(Clone)] pub struct QuotaManager { redis_client: Client, } diff --git a/src/services/lock_examples.rs b/src/services/lock_examples.rs new file mode 100644 index 0000000..0d7586e --- /dev/null +++ b/src/services/lock_examples.rs @@ -0,0 +1,82 @@ +use crate::services::{LockManager, TransactionProcessor}; +use std::time::Duration; +use tracing::{info, warn}; +use uuid::Uuid; + +/// Example: Process transaction with distributed lock +pub async fn process_transaction_with_lock( + lock_manager: &LockManager, + processor: &TransactionProcessor, + tx_id: Uuid, +) -> anyhow::Result { + let resource = format!("transaction:{}", tx_id); + let timeout = Duration::from_secs(5); + + // Try to acquire lock + let lock = match lock_manager.acquire(&resource, timeout).await? { + Some(lock) => lock, + None => { + warn!("Could not acquire lock for transaction {}", tx_id); + return Ok(false); + } + }; + + info!("Processing transaction {} with lock", tx_id); + + // Process transaction + let result = processor.process_transaction(tx_id).await; + + // Release lock + lock.release().await?; + + result.map(|_| true) +} + +/// Example: Long-running operation with auto-renewal +pub async fn long_running_with_lock( + lock_manager: &LockManager, + resource: &str, +) -> anyhow::Result<()> { + let timeout = Duration::from_secs(5); + + let lock = match lock_manager.acquire(resource, timeout).await? { + Some(lock) => lock, + None => { + return Err(anyhow::anyhow!("Could not acquire lock")); + } + }; + + // Spawn auto-renewal task + let renewal_lock = lock.clone(); + tokio::spawn(async move { + renewal_lock.auto_renew_task().await; + }); + + // Do long-running work + tokio::time::sleep(Duration::from_secs(60)).await; + + // Lock will be released on drop + Ok(()) +} + +/// Example: Using with_lock helper +pub async fn process_with_helper( + lock_manager: &LockManager, + processor: &TransactionProcessor, + tx_id: Uuid, +) -> anyhow::Result> { + let resource = format!("transaction:{}", tx_id); + let timeout = Duration::from_secs(5); + + lock_manager + .with_lock(&resource, timeout, || { + Box::pin(async move { + processor + .process_transaction(tx_id) + .await + .map_err(|e| Box::new(e) as Box) + }) + }) + .await + .map_err(|e| anyhow::anyhow!("Lock error: {}", e)) +} diff --git a/src/services/lock_manager.rs b/src/services/lock_manager.rs new file mode 100644 index 0000000..dc6009a --- /dev/null +++ b/src/services/lock_manager.rs @@ -0,0 +1,253 @@ +use redis::{AsyncCommands, Client, Script}; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{debug, warn}; +use uuid::Uuid; + +pub struct LockManager { + redis_client: Client, + default_ttl: Duration, +} + +#[derive(Clone)] +pub struct Lock { + key: String, + token: String, + redis_client: Client, + ttl: Duration, +} + +impl LockManager { + pub fn new(redis_url: &str, default_ttl_secs: u64) -> Result { + let redis_client = Client::open(redis_url)?; + Ok(Self { + redis_client, + default_ttl: Duration::from_secs(default_ttl_secs), + }) + } + + pub async fn acquire( + &self, + resource: &str, + timeout_duration: Duration, + ) -> Result, redis::RedisError> { + let key = format!("lock:{}", resource); + let token = Uuid::new_v4().to_string(); + let ttl = self.default_ttl; + + let start = tokio::time::Instant::now(); + + loop { + if let Some(lock) = self.try_acquire(&key, &token, ttl).await? { + debug!("Acquired lock for {}", resource); + return Ok(Some(lock)); + } + + if start.elapsed() >= timeout_duration { + debug!("Lock acquisition timeout for {}", resource); + return Ok(None); + } + + sleep(Duration::from_millis(50)).await; + } + } + + async fn try_acquire( + &self, + key: &str, + token: &str, + ttl: Duration, + ) -> Result, redis::RedisError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + // SET key token NX EX ttl_seconds + let result: Option = conn + .set_options( + key, + token, + redis::SetOptions::default() + .conditional_set(redis::ExistenceCheck::NX) + .with_expiration(redis::SetExpiry::EX(ttl.as_secs() as usize)), + ) + .await?; + + if result.is_some() { + Ok(Some(Lock { + key: key.to_string(), + token: token.to_string(), + redis_client: self.redis_client.clone(), + ttl, + })) + } else { + Ok(None) + } + } + + pub async fn with_lock( + &self, + resource: &str, + timeout_duration: Duration, + f: F, + ) -> Result, Box> + where + F: FnOnce() -> std::pin::Pin< + Box< + dyn std::future::Future< + Output = Result>, + > + Send, + >, + >, + { + let lock = match self.acquire(resource, timeout_duration).await? { + Some(lock) => lock, + None => return Ok(None), + }; + + let result = f().await; + + lock.release().await?; + + result.map(Some) + } +} + +impl Lock { + pub async fn release(self) -> Result<(), redis::RedisError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + // Lua script to ensure we only delete if token matches + let script = Script::new( + r#" + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + "#, + ); + + let _: i32 = script + .key(&self.key) + .arg(&self.token) + .invoke_async(&mut conn) + .await?; + + debug!("Released lock for {}", self.key); + Ok(()) + } + + pub async fn renew(&mut self) -> Result { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + // Lua script to renew only if token matches + let script = Script::new( + r#" + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("expire", KEYS[1], ARGV[2]) + else + return 0 + end + "#, + ); + + let result: i32 = script + .key(&self.key) + .arg(&self.token) + .arg(self.ttl.as_secs() as i32) + .invoke_async(&mut conn) + .await?; + + Ok(result == 1) + } + + pub async fn auto_renew_task(mut self) { + let renew_interval = self.ttl / 2; + + loop { + sleep(renew_interval).await; + + match self.renew().await { + Ok(true) => debug!("Renewed lock for {}", self.key), + Ok(false) => { + warn!("Failed to renew lock for {} - token mismatch", self.key); + break; + } + Err(e) => { + warn!("Error renewing lock for {}: {}", self.key, e); + break; + } + } + } + } +} + +impl Drop for Lock { + fn drop(&mut self) { + // Best effort release on drop + let key = self.key.clone(); + let token = self.token.clone(); + let client = self.redis_client.clone(); + + tokio::spawn(async move { + if let Ok(mut conn) = client.get_multiplexed_async_connection().await { + let script = Script::new( + r#" + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + "#, + ); + + let _ = script + .key(&key) + .arg(&token) + .invoke_async::<_, i32>(&mut conn) + .await; + } + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_lock_acquire_release() { + let manager = LockManager::new("redis://localhost:6379", 30).unwrap(); + + let lock = manager + .acquire("test_resource", Duration::from_secs(5)) + .await + .unwrap(); + + assert!(lock.is_some()); + + let lock = lock.unwrap(); + lock.release().await.unwrap(); + } + + #[tokio::test] + async fn test_lock_prevents_duplicate() { + let manager = LockManager::new("redis://localhost:6379", 30).unwrap(); + + let lock1 = manager + .acquire("test_resource_2", Duration::from_secs(5)) + .await + .unwrap(); + + assert!(lock1.is_some()); + + // Try to acquire same lock + let lock2 = manager + .acquire("test_resource_2", Duration::from_millis(100)) + .await + .unwrap(); + + assert!(lock2.is_none()); + + lock1.unwrap().release().await.unwrap(); + } +} diff --git a/src/services/mod.rs b/src/services/mod.rs index d33b2cd..857c410 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,8 +1,10 @@ pub mod account_monitor; pub mod backup; pub mod feature_flags; +pub mod lock_manager; pub mod processor; pub mod query_cache; +pub mod reconciliation; pub mod scheduler; pub mod settlement; pub mod transaction_processor; @@ -12,6 +14,7 @@ pub use account_monitor::AccountMonitor; pub use backup::BackupService; pub use feature_flags::FeatureFlagService; pub use query_cache::{CacheConfig, QueryCache}; +pub use reconciliation::ReconciliationService; pub use scheduler::{Job, JobScheduler, JobStatus}; pub use settlement::SettlementService; pub use transaction_processor::TransactionProcessor; diff --git a/src/services/reconciliation.rs b/src/services/reconciliation.rs index bb7e61d..fc74ae6 100644 --- a/src/services/reconciliation.rs +++ b/src/services/reconciliation.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; -use tracing::{info, warn}; +use tracing::info; use uuid::Uuid; #[derive(Debug, Serialize, Deserialize)] diff --git a/tests/search_test.rs b/tests/search_test.rs index 4ab620c..b192c47 100644 --- a/tests/search_test.rs +++ b/tests/search_test.rs @@ -43,6 +43,7 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) { start_time: std::time::Instant::now(), readiness: synapse_core::ReadinessState::new(), tx_broadcast, + query_cache: synapse_core::services::QueryCache::new("redis://localhost:6379").unwrap(), }; let app = create_app(app_state); diff --git a/tests/websocket_test.rs b/tests/websocket_test.rs index 96c2ed0..347e1e5 100644 --- a/tests/websocket_test.rs +++ b/tests/websocket_test.rs @@ -49,6 +49,7 @@ async fn setup_test_app() -> ( start_time: std::time::Instant::now(), readiness: synapse_core::ReadinessState::new(), tx_broadcast: tx_broadcast.clone(), + query_cache: synapse_core::services::QueryCache::new("redis://localhost:6379").unwrap(), }; let app = create_app(app_state);