Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/services/lock_examples.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::services::{LockManager, TransactionProcessor};
use std::time::Duration;
use tracing::{info, warn};
use uuid::Uuid;

/// Example: Process transaction with distributed lock
pub async fn process_transaction_with_lock(
lock_manager: &LockManager,
processor: &TransactionProcessor,
tx_id: Uuid,
) -> anyhow::Result<bool> {
let resource = format!("transaction:{}", tx_id);
let timeout = Duration::from_secs(5);

// Try to acquire lock
let lock = match lock_manager.acquire(&resource, timeout).await? {
Some(lock) => lock,
None => {
warn!("Could not acquire lock for transaction {}", tx_id);
return Ok(false);
}
};

info!("Processing transaction {} with lock", tx_id);

// Process transaction
let result = processor.process_transaction(tx_id).await;

// Release lock
lock.release().await?;

result.map(|_| true)
}

/// Example: Long-running operation with auto-renewal
pub async fn long_running_with_lock(
lock_manager: &LockManager,
resource: &str,
) -> anyhow::Result<()> {
let timeout = Duration::from_secs(5);

let lock = match lock_manager.acquire(resource, timeout).await? {
Some(lock) => lock,
None => {
return Err(anyhow::anyhow!("Could not acquire lock"));
}
};

// Spawn auto-renewal task
let renewal_lock = lock.clone();
tokio::spawn(async move {
renewal_lock.auto_renew_task().await;
});

// Do long-running work
tokio::time::sleep(Duration::from_secs(60)).await;

// Lock will be released on drop
Ok(())
}

/// Example: Using with_lock helper
pub async fn process_with_helper(
lock_manager: &LockManager,
processor: &TransactionProcessor,
tx_id: Uuid,
) -> anyhow::Result<Option<()>> {
let resource = format!("transaction:{}", tx_id);
let timeout = Duration::from_secs(5);

lock_manager
.with_lock(&resource, timeout, || {
Box::pin(async move {
processor
.process_transaction(tx_id)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
})
})
.await
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))
}
253 changes: 253 additions & 0 deletions src/services/lock_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
use redis::{AsyncCommands, Client, Script};
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, warn};
use uuid::Uuid;

pub struct LockManager {
redis_client: Client,
default_ttl: Duration,
}

#[derive(Clone)]
pub struct Lock {
key: String,
token: String,
redis_client: Client,
ttl: Duration,
}

impl LockManager {
pub fn new(redis_url: &str, default_ttl_secs: u64) -> Result<Self, redis::RedisError> {
let redis_client = Client::open(redis_url)?;
Ok(Self {
redis_client,
default_ttl: Duration::from_secs(default_ttl_secs),
})
}

pub async fn acquire(
&self,
resource: &str,
timeout_duration: Duration,
) -> Result<Option<Lock>, redis::RedisError> {
let key = format!("lock:{}", resource);
let token = Uuid::new_v4().to_string();
let ttl = self.default_ttl;

let start = tokio::time::Instant::now();

loop {
if let Some(lock) = self.try_acquire(&key, &token, ttl).await? {
debug!("Acquired lock for {}", resource);
return Ok(Some(lock));
}

if start.elapsed() >= timeout_duration {
debug!("Lock acquisition timeout for {}", resource);
return Ok(None);
}

sleep(Duration::from_millis(50)).await;
}
}

async fn try_acquire(
&self,
key: &str,
token: &str,
ttl: Duration,
) -> Result<Option<Lock>, redis::RedisError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;

// SET key token NX EX ttl_seconds
let result: Option<String> = conn
.set_options(
key,
token,
redis::SetOptions::default()
.conditional_set(redis::ExistenceCheck::NX)
.with_expiration(redis::SetExpiry::EX(ttl.as_secs() as usize)),
)
.await?;

if result.is_some() {
Ok(Some(Lock {
key: key.to_string(),
token: token.to_string(),
redis_client: self.redis_client.clone(),
ttl,
}))
} else {
Ok(None)
}
}

pub async fn with_lock<F, T>(
&self,
resource: &str,
timeout_duration: Duration,
f: F,
) -> Result<Option<T>, Box<dyn std::error::Error + Send + Sync>>
where
F: FnOnce() -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<T, Box<dyn std::error::Error + Send + Sync>>,
> + Send,
>,
>,
{
let lock = match self.acquire(resource, timeout_duration).await? {
Some(lock) => lock,
None => return Ok(None),
};

let result = f().await;

lock.release().await?;

result.map(Some)
}
}

impl Lock {
pub async fn release(self) -> Result<(), redis::RedisError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;

// Lua script to ensure we only delete if token matches
let script = Script::new(
r#"
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"#,
);

let _: i32 = script
.key(&self.key)
.arg(&self.token)
.invoke_async(&mut conn)
.await?;

debug!("Released lock for {}", self.key);
Ok(())
}

pub async fn renew(&mut self) -> Result<bool, redis::RedisError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;

// Lua script to renew only if token matches
let script = Script::new(
r#"
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"#,
);

let result: i32 = script
.key(&self.key)
.arg(&self.token)
.arg(self.ttl.as_secs() as i32)
.invoke_async(&mut conn)
.await?;

Ok(result == 1)
}

pub async fn auto_renew_task(mut self) {
let renew_interval = self.ttl / 2;

loop {
sleep(renew_interval).await;

match self.renew().await {
Ok(true) => debug!("Renewed lock for {}", self.key),
Ok(false) => {
warn!("Failed to renew lock for {} - token mismatch", self.key);
break;
}
Err(e) => {
warn!("Error renewing lock for {}: {}", self.key, e);
break;
}
}
}
}
}

impl Drop for Lock {
fn drop(&mut self) {
// Best effort release on drop
let key = self.key.clone();
let token = self.token.clone();
let client = self.redis_client.clone();

tokio::spawn(async move {
if let Ok(mut conn) = client.get_multiplexed_async_connection().await {
let script = Script::new(
r#"
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"#,
);

let _ = script
.key(&key)
.arg(&token)
.invoke_async::<_, i32>(&mut conn)
.await;
}
});
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_lock_acquire_release() {
let manager = LockManager::new("redis://localhost:6379", 30).unwrap();

let lock = manager
.acquire("test_resource", Duration::from_secs(5))
.await
.unwrap();

assert!(lock.is_some());

let lock = lock.unwrap();
lock.release().await.unwrap();
}

#[tokio::test]
async fn test_lock_prevents_duplicate() {
let manager = LockManager::new("redis://localhost:6379", 30).unwrap();

let lock1 = manager
.acquire("test_resource_2", Duration::from_secs(5))
.await
.unwrap();

assert!(lock1.is_some());

// Try to acquire same lock
let lock2 = manager
.acquire("test_resource_2", Duration::from_millis(100))
.await
.unwrap();

assert!(lock2.is_none());

lock1.unwrap().release().await.unwrap();
}
}
2 changes: 2 additions & 0 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod backup;
pub mod feature_flags;
pub mod lock_manager;
pub mod processor;
pub mod scheduler;
pub mod settlement;
Expand All @@ -8,6 +9,7 @@ pub mod transaction_processor_job;

pub use backup::BackupService;
pub use feature_flags::FeatureFlagService;
pub use lock_manager::{Lock, LockManager};
pub use scheduler::{Job, JobScheduler, JobStatus};
pub use settlement::SettlementService;
pub use transaction_processor::TransactionProcessor;
Expand Down