diff --git a/Cargo.lock b/Cargo.lock index 8d90331..a3cf7e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2804,6 +2804,7 @@ dependencies = [ "log", "parking_lot", "prometheus", + "rand 0.9.2", "reqwest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index bdae2c6..85266a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ parking_lot = { version = "0.12.4", features = ["arc_lock", "send_guard"] } async-trait = "0.1.77" futures-util = "0.3.31" walkdir = "2.5.0" +rand = "0.9.2" [dev-dependencies] tempfile = "3.15.0" diff --git a/scripts/stress/README.md b/scripts/stress/README.md new file mode 100644 index 0000000..46f789e --- /dev/null +++ b/scripts/stress/README.md @@ -0,0 +1,35 @@ +# Stress Test + +Stress testing tools for s3dedup. + +## Setup + +```bash +# Start infrastructure +docker compose up -d + +# Start s3dedup with postgres config +cargo run -- server -c scripts/stress/config.postgres.json +``` + +## Run + +```bash +python3 scripts/stress/stress_test.py +``` + +### Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--url` | http://localhost:8080 | Server URL | +| `--files` | 100 | Number of test files | +| `--parallel` | 10 | Concurrent workers | +| `--size` | 1024 | File size (bytes) | +| `--skip-cleanup` | false | Don't delete files after test | + +### Example + +```bash +python3 scripts/stress/stress_test.py --files 500 --parallel 20 --size 4096 +``` diff --git a/scripts/stress/config.postgres.json b/scripts/stress/config.postgres.json new file mode 100644 index 0000000..9b59e34 --- /dev/null +++ b/scripts/stress/config.postgres.json @@ -0,0 +1,34 @@ +{ + "logging": { + "level": "info", + "json": false + }, + "kvstorage_type": "postgres", + "postgres": { + "host": "localhost", + "port": 5432, + "user": "postgres", + "password": "postgres", + "dbname": "s3dedup", + "pool_size": 20 + }, + "locks_type": "postgres", + "bucket": { + "name": "stress-test", + "address": "0.0.0.0", + "port": 8080, + "s3storage_type": "minio", + "minio": { + "endpoint": "http://localhost:9000", + "access_key": "minioadmin", + "secret_key": "minioadmin", + "force_path_style": true + }, + "cleaner": { + "enabled": false, + "interval_seconds": 3600, + "batch_size": 1000, + "max_deletes_per_run": 10000 + } + } +} diff --git a/scripts/stress/stress_test.py b/scripts/stress/stress_test.py new file mode 100755 index 0000000..66a40c8 --- /dev/null +++ b/scripts/stress/stress_test.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +""" +Stress test for s3dedup PUT/GET/DELETE operations +""" + +import argparse +import os +import random +import sys +import time +import urllib.parse +import urllib.request +import urllib.error +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime + + +def get_timestamp(): + """Generate RFC 2822 timestamp""" + return datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000') + + +def encode_timestamp(ts): + """URL encode timestamp""" + return urllib.parse.quote(ts, safe='') + + +def put_file(base_url, path, content): + """PUT a file to s3dedup""" + timestamp = encode_timestamp(get_timestamp()) + url = f"{base_url}/ft/files{path}?last_modified={timestamp}" + + req = urllib.request.Request(url, data=content, method='PUT') + req.add_header('Content-Type', 'application/octet-stream') + + try: + response = urllib.request.urlopen(req, timeout=30) + return response.status == 200 + except urllib.error.HTTPError as e: + print(f"PUT {path} failed: HTTP {e.code}") + return False + except Exception as e: + print(f"PUT {path} error: {e}") + return False + + +def get_file(base_url, path): + """GET a file from s3dedup""" + url = f"{base_url}/ft/files{path}" + + try: + response = urllib.request.urlopen(url, timeout=30) + _ = response.read() # consume body + return response.status == 200 + except urllib.error.HTTPError as e: + if e.code != 404: + print(f"GET {path} failed: HTTP {e.code}") + return False + except Exception as e: + print(f"GET {path} error: {e}") + return False + + +def delete_file(base_url, path): + """DELETE a file from s3dedup""" + timestamp = encode_timestamp(get_timestamp()) + url = f"{base_url}/ft/files{path}?last_modified={timestamp}" + + req = urllib.request.Request(url, method='DELETE') + + try: + response = urllib.request.urlopen(req, timeout=30) + return response.status == 200 + except urllib.error.HTTPError as e: + if e.code != 404: # 404 is OK for cleanup + print(f"DELETE {path} failed: HTTP {e.code}") + return e.code == 404 # 404 is acceptable + except Exception as e: + print(f"DELETE {path} error: {e}") + return False + + +def check_health(base_url): + """Check if server is healthy""" + try: + response = urllib.request.urlopen(f"{base_url}/health", timeout=5) + return response.status == 200 + except: + return False + + +def generate_test_files(num_files, file_size): + """Generate test file contents - 50% unique, 50% duplicates""" + files = {} + patterns = {} + + for i in range(1, num_files + 1): + if i % 2 == 0: + # Unique content + files[i] = os.urandom(file_size) + else: + # Duplicate content based on pattern + pattern_id = i % 10 + if pattern_id not in patterns: + patterns[pattern_id] = f"pattern_{pattern_id}_".encode() + os.urandom(file_size - 20) + files[i] = patterns[pattern_id] + + return files + + +def run_phase(name, tasks, executor, show_progress=True): + """Run a phase of operations and report results""" + print(f"\n=== {name} ===") + start = time.time() + + futures = {executor.submit(task): idx for idx, task in enumerate(tasks)} + success = 0 + errors = 0 + total = len(futures) + completed = 0 + + for future in as_completed(futures): + completed += 1 + if future.result(): + success += 1 + else: + errors += 1 + + if show_progress and completed % max(1, total // 10) == 0: + print(f" Progress: {completed}/{total}") + + elapsed = time.time() - start + rate = total / elapsed if elapsed > 0 else 0 + print(f"Completed: {total} ops in {elapsed:.2f}s ({rate:.2f} ops/sec), success: {success}, errors: {errors}") + + return success, errors + + +def main(): + parser = argparse.ArgumentParser(description='s3dedup stress test') + parser.add_argument('--url', default='http://localhost:8080', help='Base URL') + parser.add_argument('--files', type=int, default=100, help='Number of files') + parser.add_argument('--parallel', type=int, default=10, help='Parallel workers') + parser.add_argument('--size', type=int, default=1024, help='File size in bytes') + parser.add_argument('--skip-cleanup', action='store_true', help='Skip cleanup phase') + args = parser.parse_args() + + print("=== s3dedup Stress Test ===") + print(f"Base URL: {args.url}") + print(f"Number of files: {args.files}") + print(f"Parallel workers: {args.parallel}") + print(f"File size: {args.size} bytes") + + # Check server health + print("\nChecking server health...") + if not check_health(args.url): + print(f"ERROR: Server not reachable at {args.url}") + print("Start s3dedup with: cargo run -- server -c config.postgres.json") + sys.exit(1) + print("Server is healthy") + + # Generate test files + print(f"\nGenerating {args.files} test files...") + files = generate_test_files(args.files, args.size) + print("Done") + + executor = ThreadPoolExecutor(max_workers=args.parallel) + + try: + # Phase 1: PUT files + tasks = [ + lambda i=i, content=content: put_file(args.url, f"/stress/file_{i}.bin", content) + for i, content in files.items() + ] + run_phase(f"Phase 1: PUT {args.files} files", tasks, executor) + + # Phase 2: GET files + tasks = [ + lambda i=i: get_file(args.url, f"/stress/file_{i}.bin") + for i in files.keys() + ] + run_phase(f"Phase 2: GET {args.files} files", tasks, executor) + + # Phase 3: Mixed workload + mixed_ops = args.files * 2 + def mixed_op(idx): + op = random.randint(0, 3) + file_num = random.randint(1, args.files) + if op <= 1: # 50% GET + return get_file(args.url, f"/stress/file_{file_num}.bin") + elif op == 2: # 25% PUT update + return put_file(args.url, f"/stress/file_{file_num}.bin", files[file_num]) + else: # 25% new PUT + return put_file(args.url, f"/stress/new_{idx}.bin", os.urandom(args.size)) + + tasks = [lambda idx=i: mixed_op(idx) for i in range(mixed_ops)] + run_phase(f"Phase 3: Mixed workload ({mixed_ops} ops)", tasks, executor) + + # Phase 4: Concurrent PUT/DELETE race test + race_files = 50 + def race_op(idx): + path = f"/stress/race_{idx}.bin" + content = f"race_content_{idx}".encode() + for _ in range(5): + put_file(args.url, path, content) + delete_file(args.url, path) + return True + + tasks = [lambda idx=i: race_op(idx) for i in range(race_files)] + run_phase(f"Phase 4: Race test ({race_files} files x 10 ops)", tasks, executor) + + # Phase 5: DELETE files (cleanup) + if not args.skip_cleanup: + tasks = [ + lambda i=i: delete_file(args.url, f"/stress/file_{i}.bin") + for i in files.keys() + ] + run_phase(f"Phase 5: DELETE {args.files} files", tasks, executor) + + # Clean new files from mixed workload + for i in range(mixed_ops): + delete_file(args.url, f"/stress/new_{i}.bin") + + finally: + executor.shutdown(wait=True) + + print("\n=== Stress Test Complete ===") + + +if __name__ == '__main__': + main() diff --git a/src/cleaner/mod.rs b/src/cleaner/mod.rs index e94552f..52073cc 100644 --- a/src/cleaner/mod.rs +++ b/src/cleaner/mod.rs @@ -3,7 +3,6 @@ use crate::locks::{self, LocksStorage}; use crate::s3storage::S3Storage; use anyhow::Result; use serde::Deserialize; -use std::collections::HashSet; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{debug, error, info, warn}; @@ -182,6 +181,48 @@ impl Cleaner { path, hash ); + // Acquire file lock before deleting ref_file to prevent race with PUT + let lock_key = locks::file_lock(&self.bucket_name, path); + let lock = self.locks.prepare_lock(lock_key).await; + let guard = match lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + warn!("Failed to acquire file lock for cleaner {}: {}", path, e); + continue; // Skip this file, try next + } + }; + + // Re-check refcount after acquiring lock (double-check pattern) + let refcount_after_lock = self + .kvstorage + .lock() + .await + .get_ref_count(&self.bucket_name, hash) + .await; + + let refcount_after_lock = match refcount_after_lock { + Ok(r) => r, + Err(e) => { + error!("Failed to re-check refcount for {}: {}", hash, e); + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } + continue; + } + }; + + if refcount_after_lock != 0 { + // Refcount changed while we were acquiring lock, skip + debug!( + "Refcount changed for {} (now {}), skipping", + hash, refcount_after_lock + ); + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } + continue; + } + // Delete ref_file and modified entries if let Err(e) = self .kvstorage @@ -191,6 +232,9 @@ impl Cleaner { .await { error!("Failed to delete ref_file {}: {}", path, e); + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } continue; } @@ -204,6 +248,10 @@ impl Cleaner { error!("Failed to delete modified entry for {}: {}", path, e); } + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } + deleted_count += 1; if deleted_count >= self.config.max_deletes_per_run { @@ -219,36 +267,12 @@ impl Cleaner { } /// Clean refcounts that have no corresponding ref_files + /// Uses reverse lookup (database query per hash) instead of loading all hashes into memory async fn clean_unreferenced_refcounts(&self) -> Result { let mut deleted_count = 0; let mut offset = 0; - // Build a set of all hashes referenced by ref_files - let mut referenced_hashes = HashSet::new(); - let mut ref_offset = 0; - - loop { - let ref_files = self - .kvstorage - .lock() - .await - .list_ref_files_batch(&self.bucket_name, self.config.batch_size, ref_offset) - .await?; - - if ref_files.is_empty() { - break; - } - - for (_path, hash) in ref_files { - referenced_hashes.insert(hash); - } - - ref_offset += self.config.batch_size; - } - - debug!("Found {} referenced hashes", referenced_hashes.len()); - - // Now check refcounts against this set + // Process refcounts in batches, checking each hash against ref_files table loop { let refcounts = self .kvstorage @@ -262,7 +286,15 @@ impl Cleaner { } for (hash, count) in refcounts { - if !referenced_hashes.contains(&hash) { + // Check if hash is referenced by any ref_file (database lookup) + let is_referenced = self + .kvstorage + .lock() + .await + .hash_is_referenced(&self.bucket_name, &hash) + .await?; + + if !is_referenced { debug!( "Found unreferenced refcount: hash={}, count={} (no ref_files point to it)", hash, count @@ -324,32 +356,48 @@ impl Cleaner { } }; - let refcount = self + let refcount = match self .kvstorage .lock() .await .get_ref_count(&self.bucket_name, &key) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = hash_guard.release().await; + return Err(e); + } + }; if refcount == 0 { debug!("Found unused S3 object: key={} (refcount=0)", key); // Delete the S3 object if let Err(e) = self.s3storage.lock().await.delete_object(&key).await { - error!("Failed to delete S3 object {}: {}", key, e); - let _ = hash_guard.release().await; + error!( + "Failed to delete S3 object (bucket={}, key={}): {}", + self.bucket_name, key, e + ); + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } continue; } deleted_count += 1; if deleted_count >= self.config.max_deletes_per_run { - let _ = hash_guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } return Ok(deleted_count); } } - let _ = hash_guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } } continuation_token = next_token; diff --git a/src/kvstorage/mod.rs b/src/kvstorage/mod.rs index e2c6ff2..f29c5f3 100644 --- a/src/kvstorage/mod.rs +++ b/src/kvstorage/mod.rs @@ -22,45 +22,15 @@ pub(crate) trait KVStorageTrait { async fn setup(&mut self) -> Result<()>; async fn get_ref_count(&mut self, bucket: &str, hash: &str) -> Result; - async fn set_ref_count(&mut self, bucket: &str, hash: &str, ref_cnt: i32) -> Result<()>; + /// Atomically increment the reference count (database-level atomic operation) - /// Prefer this over get_ref_count + set_ref_count to avoid race conditions - async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { - // Default implementation: non-atomic. Database implementations should override. - let cnt = self.get_ref_count(bucket, hash).await?; - self.set_ref_count(bucket, hash, cnt + 1).await?; - Ok(cnt + 1) - } + /// Returns the new reference count after incrementing. + async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result; /// Atomically decrement the reference count (database-level atomic operation) - /// Prefer this over get_ref_count + set_ref_count to avoid race conditions /// If the reference count is already 0, do nothing and return 0. /// Returns the new reference count after decrementing. - async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { - // Default implementation: non-atomic. Database implementations should override. - let cnt = self.get_ref_count(bucket, hash).await?; - if cnt == 0 { - return Ok(0); - } - let new_count = cnt - 1; - self.set_ref_count(bucket, hash, new_count).await?; - Ok(new_count) - } - - async fn increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result<()> { - let cnt = self.get_ref_count(bucket, hash).await?; - self.set_ref_count(bucket, hash, cnt + 1).await - } - - async fn decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { - let cnt = self.get_ref_count(bucket, hash).await?; - if cnt == 0 { - return Ok(0); - } - let new_count = cnt - 1; - self.set_ref_count(bucket, hash, new_count).await?; - Ok(new_count as i64) - } + async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result; async fn get_modified(&mut self, bucket: &str, path: &str) -> Result; async fn set_modified(&mut self, bucket: &str, path: &str, modified: i64) -> Result<()>; @@ -115,6 +85,10 @@ pub(crate) trait KVStorageTrait { /// Delete a logical_size entry async fn delete_logical_size(&mut self, bucket: &str, hash: &str) -> Result<()>; + /// Check if a hash is referenced by any ref_file entry + /// Used by cleaner to check if a refcount entry is orphaned + async fn hash_is_referenced(&mut self, bucket: &str, hash: &str) -> Result; + // Aggregate statistics methods for metrics /// Get total number of files (count of file_modified entries) async fn get_total_files(&mut self, bucket: &str) -> Result; @@ -190,24 +164,9 @@ impl KVStorage { } } - /** - * Set the reference count for a hash. - */ - pub async fn set_ref_count(&mut self, bucket: &str, hash: &str, ref_cnt: i32) -> Result<()> { - debug!( - "Setting ref count for bucket: {}, hash: {} to {}", - bucket, hash, ref_cnt - ); - match self { - KVStorage::Postgres(storage) => storage.set_ref_count(bucket, hash, ref_cnt).await, - KVStorage::SQLite(storage) => storage.set_ref_count(bucket, hash, ref_cnt).await, - } - } - /** * Atomically increment the reference count (database-level atomic operation). * Returns the new reference count after incrementing. - * Prefer this over increment_ref_count to avoid race conditions. */ pub async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { debug!( @@ -224,7 +183,6 @@ impl KVStorage { * Atomically decrement the reference count (database-level atomic operation). * If the reference count is already 0, do nothing and return 0. * Returns the new reference count after decrementing. - * Prefer this over decrement_ref_count to avoid race conditions. */ pub async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { debug!( @@ -237,36 +195,6 @@ impl KVStorage { } } - /** - * Increment the reference count for a hash. - */ - pub async fn increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result<()> { - debug!( - "Incrementing ref count for bucket: {}, hash: {}", - bucket, hash - ); - match self { - KVStorage::Postgres(storage) => storage.increment_ref_count(bucket, hash).await, - KVStorage::SQLite(storage) => storage.increment_ref_count(bucket, hash).await, - } - } - - /** - * Decrement the reference count for a hash. - * If the reference count is already 0, do nothing. - * Returns the new reference count after decrementing. - */ - pub async fn decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { - debug!( - "Decrementing ref count for bucket: {}, hash: {}", - bucket, hash - ); - match self { - KVStorage::Postgres(storage) => storage.decrement_ref_count(bucket, hash).await, - KVStorage::SQLite(storage) => storage.decrement_ref_count(bucket, hash).await, - } - } - /** * Get the modified time for a path. * If the path does not exist, return 0. @@ -486,6 +414,14 @@ impl KVStorage { } } + /// Check if a hash is referenced by any ref_file entry + pub async fn hash_is_referenced(&mut self, bucket: &str, hash: &str) -> Result { + match self { + KVStorage::Postgres(storage) => storage.hash_is_referenced(bucket, hash).await, + KVStorage::SQLite(storage) => storage.hash_is_referenced(bucket, hash).await, + } + } + pub async fn get_total_files(&mut self, bucket: &str) -> Result { match self { KVStorage::Postgres(storage) => storage.get_total_files(bucket).await, diff --git a/src/kvstorage/postgres.rs b/src/kvstorage/postgres.rs index 834c468..dd59acd 100644 --- a/src/kvstorage/postgres.rs +++ b/src/kvstorage/postgres.rs @@ -125,6 +125,7 @@ impl KVStorageTrait for Postgres { let version_table = self.table_name("version"); sqlx::query(&format!( "CREATE TABLE IF NOT EXISTS {} ( + id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1), version VARCHAR(255) NOT NULL )", version_table @@ -132,6 +133,15 @@ impl KVStorageTrait for Postgres { .execute(&self.pool) .await?; + // Create index on ref_file(bucket, hash) for efficient hash lookups by cleaner + sqlx::query(&format!( + "CREATE INDEX IF NOT EXISTS idx_{}_hash ON {}(bucket, hash)", + ref_file_table.replace('.', "_"), + ref_file_table + )) + .execute(&self.pool) + .await?; + Ok(()) } @@ -153,30 +163,15 @@ impl KVStorageTrait for Postgres { } } - async fn set_ref_count(&mut self, bucket: &str, hash: &str, ref_cnt: i32) -> Result<()> { - let table = self.table_name("refcount"); - let query = format!( - "INSERT INTO {} (bucket, hash, refcount) VALUES ($1, $2, $3) - ON CONFLICT (bucket, hash) DO UPDATE SET refcount = $3", - table - ); - sqlx::query(&query) - .bind(bucket) - .bind(hash) - .bind(ref_cnt) - .execute(&self.pool) - .await?; - Ok(()) - } - async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { let table = self.table_name("refcount"); // PostgreSQL: atomic increment using INSERT...ON CONFLICT...DO UPDATE...RETURNING + // Must qualify refcount column with table name to avoid ambiguity let query = format!( - "INSERT INTO {} (bucket, hash, refcount) VALUES ($1, $2, 1) - ON CONFLICT (bucket, hash) DO UPDATE SET refcount = refcount + 1 + "INSERT INTO {table} (bucket, hash, refcount) VALUES ($1, $2, 1) + ON CONFLICT (bucket, hash) DO UPDATE SET refcount = {table}.refcount + 1 RETURNING refcount", - table + table = table ); let (count,): (i32,) = sqlx::query_as(&query) .bind(bucket) @@ -438,6 +433,20 @@ impl KVStorageTrait for Postgres { Ok(()) } + async fn hash_is_referenced(&mut self, bucket: &str, hash: &str) -> Result { + let table = self.table_name("ref_file"); + let query = format!( + "SELECT 1 FROM {} WHERE bucket = $1 AND hash = $2 LIMIT 1", + table + ); + let result: Option<(i32,)> = sqlx::query_as(&query) + .bind(bucket) + .bind(hash) + .fetch_optional(&self.pool) + .await?; + Ok(result.is_some()) + } + async fn get_compressed_size(&mut self, bucket: &str, hash: &str) -> Result { let table = self.table_name("logical_size"); let query = format!( @@ -498,10 +507,15 @@ impl KVStorageTrait for Postgres { } async fn get_total_storage_bytes(&mut self, bucket: &str) -> Result { - let table = self.table_name("logical_size"); + // Only count storage for blobs that are actually referenced (refcount > 0) + let refcount_table = self.table_name("refcount"); + let logical_size_table = self.table_name("logical_size"); let query = format!( - "SELECT COALESCE(SUM(compressed_size), 0)::BIGINT FROM {} WHERE bucket = $1", - table + "SELECT COALESCE(SUM(l.compressed_size), 0)::BIGINT + FROM {} l + INNER JOIN {} r ON l.bucket = r.bucket AND l.hash = r.hash + WHERE l.bucket = $1 AND r.refcount > 0", + logical_size_table, refcount_table ); let (total,): (i64,) = sqlx::query_as(&query) .bind(bucket) @@ -553,18 +567,20 @@ impl KVStorageTrait for Postgres { async fn get_version(&mut self) -> Result> { let table = self.table_name("version"); - let query = format!("SELECT version FROM {}", table); + let query = format!("SELECT version FROM {} WHERE id = 1", table); let result: Option<(String,)> = sqlx::query_as(&query).fetch_optional(&self.pool).await?; Ok(result.map(|r| r.0)) } async fn set_version(&mut self, version: &str) -> Result<()> { let table = self.table_name("version"); - // Delete existing row (if any) and insert new one - sqlx::query(&format!("DELETE FROM {}", table)) - .execute(&self.pool) - .await?; - sqlx::query(&format!("INSERT INTO {} (version) VALUES ($1)", table)) + // Use upsert to ensure only one row exists + let query = format!( + "INSERT INTO {} (id, version) VALUES (1, $1) + ON CONFLICT (id) DO UPDATE SET version = $1", + table + ); + sqlx::query(&query) .bind(version) .execute(&self.pool) .await?; diff --git a/src/kvstorage/sqlite.rs b/src/kvstorage/sqlite.rs index 8b4c3ab..55ca185 100644 --- a/src/kvstorage/sqlite.rs +++ b/src/kvstorage/sqlite.rs @@ -91,7 +91,8 @@ impl KVStorageTrait for SQLite { CREATE TABLE IF NOT EXISTS version ( bucket TEXT NOT NULL PRIMARY KEY, version TEXT NOT NULL - );", + ); + CREATE INDEX IF NOT EXISTS idx_ref_file_hash ON ref_file(bucket, hash);", ) .execute(&self.pool) .await?; @@ -112,69 +113,39 @@ impl KVStorageTrait for SQLite { } } - async fn set_ref_count(&mut self, bucket: &str, hash: &str, ref_cnt: i32) -> Result<()> { - sqlx::query("INSERT OR REPLACE INTO refcount (bucket, hash, refcount) VALUES (?1, ?2, ?3)") - .bind(bucket) - .bind(hash) - .bind(ref_cnt) - .execute(&self.pool) - .await?; - Ok(()) - } - async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { - // SQLite atomic increment using UPDATE...RETURNING (SQLite 3.35+) - // First try INSERT if not exists, then UPDATE to increment - let result: Result<(i32,), sqlx::Error> = sqlx::query_as( - "INSERT OR IGNORE INTO refcount (bucket, hash, refcount) VALUES (?1, ?2, 1); - UPDATE refcount SET refcount = refcount + 1 WHERE bucket = ?1 AND hash = ?2; - SELECT refcount FROM refcount WHERE bucket = ?1 AND hash = ?2", + // SQLite atomic increment using INSERT...ON CONFLICT...RETURNING (SQLite 3.35+) + // - New hash: INSERT with refcount=1, return 1 + // - Existing hash: UPDATE refcount = refcount + 1, return new value + let (count,): (i32,) = sqlx::query_as( + "INSERT INTO refcount (bucket, hash, refcount) VALUES (?1, ?2, 1) + ON CONFLICT (bucket, hash) DO UPDATE SET refcount = refcount + 1 + RETURNING refcount", ) .bind(bucket) .bind(hash) - .bind(bucket) - .bind(hash) - .bind(bucket) - .bind(hash) .fetch_one(&self.pool) - .await; - - match result { - Ok((count,)) => Ok(count), - Err(_) => { - // Fallback to explicit increment if UPDATE...RETURNING not supported - let cnt = self.get_ref_count(bucket, hash).await?; - self.set_ref_count(bucket, hash, cnt + 1).await?; - Ok(cnt + 1) - } - } + .await?; + Ok(count) } async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { // SQLite atomic decrement using UPDATE...RETURNING (SQLite 3.35+) - let result: Result<(i32,), sqlx::Error> = sqlx::query_as( - "UPDATE refcount SET refcount = MAX(0, refcount - 1) WHERE bucket = ?1 AND hash = ?2; - SELECT refcount FROM refcount WHERE bucket = ?1 AND hash = ?2", + // Use fetch_optional to return 0 if row doesn't exist (matches PostgreSQL behavior) + let result = sqlx::query_as::<_, (i32,)>( + "UPDATE refcount SET refcount = MAX(0, refcount - 1) + WHERE bucket = ?1 AND hash = ?2 + RETURNING refcount", ) .bind(bucket) .bind(hash) - .bind(bucket) - .bind(hash) - .fetch_one(&self.pool) + .fetch_optional(&self.pool) .await; match result { - Ok((count,)) => Ok(count), - Err(_) => { - // Fallback to explicit decrement if UPDATE...RETURNING not supported - let cnt = self.get_ref_count(bucket, hash).await?; - if cnt == 0 { - return Ok(0); - } - let new_count = cnt - 1; - self.set_ref_count(bucket, hash, new_count).await?; - Ok(new_count) - } + Ok(Some((count,))) => Ok(count), + Ok(None) => Ok(0), // Row not found, return 0 + Err(e) => Err(e.into()), } } @@ -362,6 +333,16 @@ impl KVStorageTrait for SQLite { Ok(()) } + async fn hash_is_referenced(&mut self, bucket: &str, hash: &str) -> Result { + let result: Option<(i32,)> = + sqlx::query_as("SELECT 1 FROM ref_file WHERE bucket = ?1 AND hash = ?2 LIMIT 1") + .bind(bucket) + .bind(hash) + .fetch_optional(&self.pool) + .await?; + Ok(result.is_some()) + } + async fn get_compressed_size(&mut self, bucket: &str, hash: &str) -> Result { let result: Result<(Option,), sqlx::Error> = sqlx::query_as( "SELECT compressed_size FROM logical_size WHERE bucket = ?1 AND hash = ?2", @@ -410,11 +391,16 @@ impl KVStorageTrait for SQLite { } async fn get_total_storage_bytes(&mut self, bucket: &str) -> Result { - let (total,): (Option,) = - sqlx::query_as("SELECT SUM(compressed_size) FROM logical_size WHERE bucket = ?1") - .bind(bucket) - .fetch_one(&self.pool) - .await?; + // Only count storage for blobs that are actually referenced (refcount > 0) + let (total,): (Option,) = sqlx::query_as( + "SELECT SUM(l.compressed_size) + FROM logical_size l + INNER JOIN refcount r ON l.bucket = r.bucket AND l.hash = r.hash + WHERE l.bucket = ?1 AND r.refcount > 0", + ) + .bind(bucket) + .fetch_one(&self.pool) + .await?; Ok(total.unwrap_or(0)) } diff --git a/src/lib.rs b/src/lib.rs index 1fefa25..78b7da3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,8 +110,11 @@ impl AppState { } if total_logical_bytes > 0 { - let savings_ratio = - (total_logical_bytes - total_storage_bytes) as f64 / total_logical_bytes as f64; + // Clamp to 0 minimum - negative "savings" can happen when compression + // overhead exceeds gains (tiny files), but we report 0% not negative + let savings_ratio = ((total_logical_bytes - total_storage_bytes) as f64 + / total_logical_bytes as f64) + .max(0.0); metrics::STORAGE_SAVINGS_RATIO.set(savings_ratio); } diff --git a/src/locks/postgres.rs b/src/locks/postgres.rs index c7549f8..a28d268 100644 --- a/src/locks/postgres.rs +++ b/src/locks/postgres.rs @@ -2,6 +2,7 @@ use crate::config::Config; use crate::locks::{ExclusiveLockGuard, Lock, LockStorage, SharedLockGuard}; use anyhow::{Context, Result}; use async_trait::async_trait; +use rand::Rng; use serde::Deserialize; use sqlx::{PgPool, postgres::PgPoolOptions}; use std::sync::Arc; @@ -18,10 +19,15 @@ pub struct PostgresConfig { pub pool_size: u32, } -/// PostgreSQL-based distributed locks using advisory locks +/// PostgreSQL-based distributed locks using advisory locks. +/// Uses separate connection pools for file locks and hash locks to prevent +/// deadlock when a request needs both (file lock held, waiting for hash lock). #[derive(Clone)] pub struct PostgresLocks { - pool: Arc, + /// Pool for file locks (keys starting with "file:") + file_pool: Arc, + /// Pool for hash locks (keys starting with "hash:") + hash_pool: Arc, } impl PostgresLocks { @@ -37,7 +43,10 @@ impl PostgresLocks { } impl PostgresLocks { - /// Create a new PostgreSQL locks instance with configuration + /// Create a new PostgreSQL locks instance with configuration. + /// Creates two separate connection pools to prevent deadlock: + /// - file_pool: for file locks (path-based) + /// - hash_pool: for hash locks (content-based) pub async fn new_with_config(config: &Config) -> Result> { let pg_config = config.postgres.as_ref().ok_or_else(|| { anyhow::anyhow!( @@ -55,27 +64,57 @@ impl PostgresLocks { pg_config.user, pg_config.host, pg_config.port, pg_config.dbname ); - let pool = PgPoolOptions::new() + // Create file locks pool + let file_pool = PgPoolOptions::new() + .max_connections(pg_config.pool_size) + .acquire_timeout(Duration::from_secs(30)) + .idle_timeout(Some(Duration::from_secs(600))) + .max_lifetime(Some(Duration::from_secs(1800))) + .connect(&db_url) + .await + .context("Failed to connect to PostgreSQL for file locks pool")?; + + // Validate file pool connection + sqlx::query("SELECT 1") + .execute(&file_pool) + .await + .context("PostgreSQL file locks pool validation failed")?; + + debug!("Successfully validated PostgreSQL file locks pool"); + + // Create hash locks pool (separate pool to prevent deadlock) + let hash_pool = PgPoolOptions::new() .max_connections(pg_config.pool_size) .acquire_timeout(Duration::from_secs(30)) .idle_timeout(Some(Duration::from_secs(600))) .max_lifetime(Some(Duration::from_secs(1800))) .connect(&db_url) .await - .context("Failed to connect to PostgreSQL for locks")?; + .context("Failed to connect to PostgreSQL for hash locks pool")?; - // Validate connection works + // Validate hash pool connection sqlx::query("SELECT 1") - .execute(&pool) + .execute(&hash_pool) .await - .context("PostgreSQL locks connection validation failed")?; + .context("PostgreSQL hash locks pool validation failed")?; - debug!("Successfully validated PostgreSQL locks connection"); + debug!("Successfully validated PostgreSQL hash locks pool"); Ok(Box::new(PostgresLocks { - pool: Arc::new(pool), + file_pool: Arc::new(file_pool), + hash_pool: Arc::new(hash_pool), })) } + + /// Select the appropriate pool based on lock key prefix + fn select_pool(&self, key: &str) -> Arc { + if key.starts_with("hash:") { + self.hash_pool.clone() + } else { + // Default to file pool for "file:" and any other keys + self.file_pool.clone() + } + } } #[async_trait] @@ -86,8 +125,9 @@ impl LockStorage for PostgresLocks { async fn prepare_lock<'a>(&'a self, key: String) -> Box { let key_hash = Self::hash_key(&key); + let pool = self.select_pool(&key); Box::new(PostgresLock { - pool: self.pool.clone(), + pool, key, key_hash, }) @@ -100,62 +140,127 @@ struct PostgresLock { key_hash: i64, } +/// Maximum number of retry attempts for lock acquisition +const MAX_LOCK_RETRIES: u32 = 100; +/// Initial backoff delay in milliseconds +const INITIAL_BACKOFF_MS: u64 = 10; +/// Maximum backoff delay in milliseconds +const MAX_BACKOFF_MS: u64 = 1000; + #[async_trait] impl Lock for PostgresLock { async fn acquire_shared<'a>(&'a self) -> Result + Send + 'a>> { - // Get connection from pool - let mut conn = self - .pool - .acquire() - .await - .context("Failed to acquire connection for shared lock")?; - - // Acquire shared advisory lock - // WARNING: Advisory locks are SESSION-SCOPED and persist when connections return to the pool! - // We MUST explicitly unlock using pg_advisory_unlock_shared before the connection returns. - sqlx::query("SELECT pg_advisory_lock_shared($1)") - .bind(self.key_hash) - .execute(&mut *conn) - .await - .context("Failed to acquire shared lock")?; + // Use non-blocking try_lock with retry to prevent connection pool exhaustion deadlock. + // Each lock holds a connection, so blocking waits would deadlock when requests need + // multiple locks (file + hash) but pool is exhausted. + let mut backoff_ms = INITIAL_BACKOFF_MS; + + for attempt in 0..MAX_LOCK_RETRIES { + // Get connection from pool + let mut conn = self + .pool + .acquire() + .await + .context("Failed to acquire connection for shared lock")?; + + // Try to acquire shared advisory lock (non-blocking) + let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock_shared($1)") + .bind(self.key_hash) + .fetch_one(&mut *conn) + .await + .context("Failed to try shared lock")?; + + if result.0 { + // Lock acquired successfully + debug!( + "Acquired shared lock for key: {} (attempt {})", + self.key, + attempt + 1 + ); + return Ok(Box::new(PostgresSharedLockGuard { + key: self.key.clone(), + key_hash: self.key_hash, + conn: Some(conn), + })); + } - debug!("Acquired shared lock for key: {}", self.key); + // Lock not available - drop connection and retry with backoff + drop(conn); + + if attempt < MAX_LOCK_RETRIES - 1 { + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + // Exponential backoff with jitter + backoff_ms = std::cmp::min(backoff_ms * 2, MAX_BACKOFF_MS); + // Add some jitter (±25%) + let jitter = + (backoff_ms as f64 * 0.25 * (rand::rng().random::() - 0.5)) as i64; + backoff_ms = (backoff_ms as i64 + jitter).max(1) as u64; + } + } - // Return a guard that requires explicit async release - Ok(Box::new(PostgresSharedLockGuard { - key: self.key.clone(), - key_hash: self.key_hash, - conn: Some(conn), - })) + anyhow::bail!( + "Failed to acquire shared lock for key '{}' after {} attempts", + self.key, + MAX_LOCK_RETRIES + ) } async fn acquire_exclusive<'a>( &'a self, ) -> Result + Send + 'a>> { - // Get connection from pool - let mut conn = self - .pool - .acquire() - .await - .context("Failed to acquire connection for exclusive lock")?; - - // Acquire exclusive advisory lock - // WARNING: Advisory locks are SESSION-SCOPED and persist when connections return to the pool! - // We MUST explicitly unlock using pg_advisory_unlock before the connection returns. - sqlx::query("SELECT pg_advisory_lock($1)") - .bind(self.key_hash) - .execute(&mut *conn) - .await - .context("Failed to acquire exclusive lock")?; + // Use non-blocking try_lock with retry to prevent connection pool exhaustion deadlock. + // Each lock holds a connection, so blocking waits would deadlock when requests need + // multiple locks (file + hash) but pool is exhausted. + let mut backoff_ms = INITIAL_BACKOFF_MS; + + for attempt in 0..MAX_LOCK_RETRIES { + // Get connection from pool + let mut conn = self + .pool + .acquire() + .await + .context("Failed to acquire connection for exclusive lock")?; + + // Try to acquire exclusive advisory lock (non-blocking) + let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)") + .bind(self.key_hash) + .fetch_one(&mut *conn) + .await + .context("Failed to try exclusive lock")?; + + if result.0 { + // Lock acquired successfully + debug!( + "Acquired exclusive lock for key: {} (attempt {})", + self.key, + attempt + 1 + ); + return Ok(Box::new(PostgresExclusiveLockGuard { + key: self.key.clone(), + key_hash: self.key_hash, + conn: Some(conn), + })); + } - debug!("Acquired exclusive lock for key: {}", self.key); + // Lock not available - drop connection and retry with backoff + drop(conn); + + if attempt < MAX_LOCK_RETRIES - 1 { + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + // Exponential backoff with jitter + backoff_ms = std::cmp::min(backoff_ms * 2, MAX_BACKOFF_MS); + // Add some jitter (±25%) + let jitter = + (backoff_ms as f64 * 0.25 * (rand::rng().random::() - 0.5)) as i64; + backoff_ms = (backoff_ms as i64 + jitter).max(1) as u64; + } + } - // Return a guard that requires explicit async release - Ok(Box::new(PostgresExclusiveLockGuard { - key: self.key.clone(), - key_hash: self.key_hash, - conn: Some(conn), - })) + anyhow::bail!( + "Failed to acquire exclusive lock for key '{}' after {} attempts", + self.key, + MAX_LOCK_RETRIES + ) } } @@ -186,7 +291,21 @@ impl<'a> SharedLockGuard<'a> for PostgresSharedLockGuard { impl Drop for PostgresSharedLockGuard { fn drop(&mut self) { - // Drop is called after release() removes the connection, so this is OK + // Best-effort cleanup: if connection wasn't released explicitly, spawn a task to unlock + if let Some(mut conn) = self.conn.take() { + let key_hash = self.key_hash; + let key = self.key.clone(); + tokio::spawn(async move { + let _ = sqlx::query("SELECT pg_advisory_unlock_shared($1)") + .bind(key_hash) + .execute(&mut *conn) + .await; + tracing::warn!( + "PostgreSQL shared lock guard dropped without explicit release for key: {}", + key + ); + }); + } } } @@ -217,7 +336,21 @@ impl<'a> ExclusiveLockGuard<'a> for PostgresExclusiveLockGuard { impl Drop for PostgresExclusiveLockGuard { fn drop(&mut self) { - // Drop is called after release() removes the connection, so this is OK + // Best-effort cleanup: if connection wasn't released explicitly, spawn a task to unlock + if let Some(mut conn) = self.conn.take() { + let key_hash = self.key_hash; + let key = self.key.clone(); + tokio::spawn(async move { + let _ = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(key_hash) + .execute(&mut *conn) + .await; + tracing::warn!( + "PostgreSQL exclusive lock guard dropped without explicit release for key: {}", + key + ); + }); + } } } diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 27ac7f0..b7b05e6 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -341,6 +341,7 @@ pub async fn migrate_single_file_from_metadata( // Always compress for storage let compressed_data = storage_helpers::compress_gzip(&uncompressed_data)?; + let compressed_size = compressed_data.len(); // Acquire file lock let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); @@ -352,89 +353,156 @@ pub async fn migrate_single_file_from_metadata( .context("Failed to acquire exclusive lock for migration")?; // Recheck if file was already migrated after acquiring lock (race condition protection) - let current_modified_after_lock = app_state + let current_modified_after_lock = match app_state .kvstorage .lock() .await .get_modified(&app_state.bucket_name, path) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = guard.release().await; + return Err(e); + } + }; if current_modified_after_lock >= file_metadata.last_modified { // File was migrated by another concurrent task, skip - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Ok(()); } // Acquire hash lock for S3/refcount operations let hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &digest); let hash_lock = locks.prepare_lock(hash_lock_key).await; - let hash_guard = hash_lock - .acquire_exclusive() - .await - .context("Failed to acquire hash lock for migration")?; + let hash_guard = match hash_lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + let _ = guard.release().await; + return Err(e.context("Failed to acquire hash lock for migration")); + } + }; // Check if blob already exists in S3 - let blob_exists = app_state + let blob_exists = match app_state .s3storage .lock() .await .object_exists(&digest) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + }; // Store blob if it doesn't exist - if !blob_exists { - app_state + if !blob_exists + && let Err(e) = app_state .s3storage .lock() .await .put_object(&digest, compressed_data) - .await?; + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); } // Store logical size metadata - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_logical_size(&app_state.bucket_name, &digest, logical_size) - .await?; + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } - // Increment reference count - app_state + // Store compressed size metadata + if let Err(e) = app_state .kvstorage .lock() .await - .increment_ref_count(&app_state.bucket_name, &digest) - .await?; + .set_compressed_size(&app_state.bucket_name, &digest, compressed_size) + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + + // Increment reference count atomically + if let Err(e) = app_state + .kvstorage + .lock() + .await + .atomic_increment_ref_count(&app_state.bucket_name, &digest) + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } // Release new hash lock - let _ = hash_guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } // Handle overwriting existing file if current_modified > 0 { - let old_hash = app_state + let old_hash = match app_state .kvstorage .lock() .await .get_ref_file(&app_state.bucket_name, path) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = guard.release().await; + return Err(e); + } + }; if !old_hash.is_empty() && old_hash != digest { // Acquire lock on old hash before decrement let old_hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &old_hash); let old_hash_lock = locks.prepare_lock(old_hash_lock_key).await; - let old_hash_guard = old_hash_lock - .acquire_exclusive() - .await - .context("Failed to acquire old hash lock for migration")?; + let old_hash_guard = match old_hash_lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + let _ = guard.release().await; + return Err(e.context("Failed to acquire old hash lock for migration")); + } + }; // Decrement old reference count atomically and get new count - let old_ref_count = app_state + let old_ref_count = match app_state .kvstorage .lock() .await - .decrement_ref_count(&app_state.bucket_name, &old_hash) - .await?; + .atomic_decrement_ref_count(&app_state.bucket_name, &old_hash) + .await + { + Ok(v) => v, + Err(e) => { + let _ = old_hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + }; // Delete blob if no longer referenced if old_ref_count <= 0 @@ -446,32 +514,44 @@ pub async fn migrate_single_file_from_metadata( .await { warn!( - "Failed to delete orphaned S3 object {} during migration: {}", - old_hash, e + "Failed to delete orphaned S3 object (bucket={}, key={}) during migration: {}", + app_state.bucket_name, old_hash, e ); } // Release old hash lock - let _ = old_hash_guard.release().await; + if let Err(e) = old_hash_guard.release().await { + warn!("Failed to release old hash lock: {}", e); + } } } // Update file metadata - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_ref_file(&app_state.bucket_name, path, &digest) - .await?; + .await + { + let _ = guard.release().await; + return Err(e); + } - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_modified(&app_state.bucket_name, path, file_metadata.last_modified) - .await?; + .await + { + let _ = guard.release().await; + return Err(e); + } - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } Ok(()) } @@ -510,6 +590,7 @@ async fn migrate_single_file( // Always compress for storage let compressed_data = storage_helpers::compress_gzip(&uncompressed_data)?; + let compressed_size = compressed_data.len(); // Acquire file lock let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); @@ -521,89 +602,156 @@ async fn migrate_single_file( .context("Failed to acquire exclusive lock for migration")?; // Recheck if file was already migrated after acquiring lock (race condition protection) - let current_modified_after_lock = app_state + let current_modified_after_lock = match app_state .kvstorage .lock() .await .get_modified(&app_state.bucket_name, path) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = guard.release().await; + return Err(e); + } + }; if current_modified_after_lock >= file_metadata.last_modified { // File was migrated by another concurrent task, skip - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Ok(false); } // Acquire hash lock for S3/refcount operations let hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &digest); let hash_lock = locks_storage.prepare_lock(hash_lock_key).await; - let hash_guard = hash_lock - .acquire_exclusive() - .await - .context("Failed to acquire hash lock for migration")?; + let hash_guard = match hash_lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + let _ = guard.release().await; + return Err(e.context("Failed to acquire hash lock for migration")); + } + }; // Check if blob already exists in S3 - let blob_exists = app_state + let blob_exists = match app_state .s3storage .lock() .await .object_exists(&digest) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + }; // Store blob if it doesn't exist - if !blob_exists { - app_state + if !blob_exists + && let Err(e) = app_state .s3storage .lock() .await .put_object(&digest, compressed_data) - .await?; + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); } // Store logical size metadata - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_logical_size(&app_state.bucket_name, &digest, logical_size) - .await?; + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } - // Increment reference count - app_state + // Store compressed size metadata + if let Err(e) = app_state .kvstorage .lock() .await - .increment_ref_count(&app_state.bucket_name, &digest) - .await?; + .set_compressed_size(&app_state.bucket_name, &digest, compressed_size) + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + + // Increment reference count atomically + if let Err(e) = app_state + .kvstorage + .lock() + .await + .atomic_increment_ref_count(&app_state.bucket_name, &digest) + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } // Release new hash lock - let _ = hash_guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } // Handle overwriting existing file if current_modified > 0 { - let old_hash = app_state + let old_hash = match app_state .kvstorage .lock() .await .get_ref_file(&app_state.bucket_name, path) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = guard.release().await; + return Err(e); + } + }; if !old_hash.is_empty() && old_hash != digest { // Acquire lock on old hash before decrement let old_hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &old_hash); let old_hash_lock = locks_storage.prepare_lock(old_hash_lock_key).await; - let old_hash_guard = old_hash_lock - .acquire_exclusive() - .await - .context("Failed to acquire old hash lock for migration")?; + let old_hash_guard = match old_hash_lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + let _ = guard.release().await; + return Err(e.context("Failed to acquire old hash lock for migration")); + } + }; // Decrement old reference count atomically and get new count - let old_ref_count = app_state + let old_ref_count = match app_state .kvstorage .lock() .await - .decrement_ref_count(&app_state.bucket_name, &old_hash) - .await?; + .atomic_decrement_ref_count(&app_state.bucket_name, &old_hash) + .await + { + Ok(v) => v, + Err(e) => { + let _ = old_hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + }; // Delete blob if no longer referenced if old_ref_count <= 0 @@ -615,32 +763,44 @@ async fn migrate_single_file( .await { warn!( - "Failed to delete orphaned S3 object {} during migration: {}", - old_hash, e + "Failed to delete orphaned S3 object (bucket={}, key={}) during migration: {}", + app_state.bucket_name, old_hash, e ); } // Release old hash lock - let _ = old_hash_guard.release().await; + if let Err(e) = old_hash_guard.release().await { + warn!("Failed to release old hash lock: {}", e); + } } } // Update file metadata - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_ref_file(&app_state.bucket_name, path, &digest) - .await?; + .await + { + let _ = guard.release().await; + return Err(e); + } - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_modified(&app_state.bucket_name, path, file_metadata.last_modified) - .await?; + .await + { + let _ = guard.release().await; + return Err(e); + } - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } Ok(true) } @@ -954,97 +1114,156 @@ async fn migrate_single_file_from_v1_fs( .context("Failed to acquire exclusive lock for migration")?; // Recheck if file was already migrated after acquiring lock (race condition protection) - let current_modified_after_lock = app_state + let current_modified_after_lock = match app_state .kvstorage .lock() .await .get_modified(&app_state.bucket_name, path) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = guard.release().await; + return Err(e); + } + }; if current_modified_after_lock >= file_info.last_modified { // File was migrated by another concurrent task, skip - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Ok(false); } // Acquire hash lock for S3/refcount operations let hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &digest); let hash_lock = locks_storage.prepare_lock(hash_lock_key).await; - let hash_guard = hash_lock - .acquire_exclusive() - .await - .context("Failed to acquire hash lock for migration")?; + let hash_guard = match hash_lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + let _ = guard.release().await; + return Err(e.context("Failed to acquire hash lock for migration")); + } + }; // Check if blob already exists in S3 - let blob_exists = app_state + let blob_exists = match app_state .s3storage .lock() .await .object_exists(&digest) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + }; // Store blob if it doesn't exist - if !blob_exists { - app_state + if !blob_exists + && let Err(e) = app_state .s3storage .lock() .await .put_object(&digest, compressed_data.clone()) - .await?; + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); } // Store logical size metadata - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_logical_size(&app_state.bucket_name, &digest, logical_size) - .await?; + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } // Store compressed size metadata - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_compressed_size(&app_state.bucket_name, &digest, compressed_data.len()) - .await?; + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } - // Increment reference count - app_state + // Increment reference count atomically + if let Err(e) = app_state .kvstorage .lock() .await - .increment_ref_count(&app_state.bucket_name, &digest) - .await?; + .atomic_increment_ref_count(&app_state.bucket_name, &digest) + .await + { + let _ = hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } // Release new hash lock - let _ = hash_guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } // Handle overwriting existing file if current_modified > 0 { - let old_hash = app_state + let old_hash = match app_state .kvstorage .lock() .await .get_ref_file(&app_state.bucket_name, path) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + let _ = guard.release().await; + return Err(e); + } + }; if !old_hash.is_empty() && old_hash != digest { // Acquire lock on old hash before decrement let old_hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &old_hash); let old_hash_lock = locks_storage.prepare_lock(old_hash_lock_key).await; - let old_hash_guard = old_hash_lock - .acquire_exclusive() - .await - .context("Failed to acquire old hash lock for migration")?; + let old_hash_guard = match old_hash_lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + let _ = guard.release().await; + return Err(e.context("Failed to acquire old hash lock for migration")); + } + }; // Decrement old reference count atomically and get new count - let old_ref_count = app_state + let old_ref_count = match app_state .kvstorage .lock() .await - .decrement_ref_count(&app_state.bucket_name, &old_hash) - .await?; + .atomic_decrement_ref_count(&app_state.bucket_name, &old_hash) + .await + { + Ok(v) => v, + Err(e) => { + let _ = old_hash_guard.release().await; + let _ = guard.release().await; + return Err(e); + } + }; // Delete blob if no longer referenced if old_ref_count <= 0 @@ -1056,31 +1275,43 @@ async fn migrate_single_file_from_v1_fs( .await { warn!( - "Failed to delete orphaned S3 object {} during migration: {}", - old_hash, e + "Failed to delete orphaned S3 object (bucket={}, key={}) during migration: {}", + app_state.bucket_name, old_hash, e ); } // Release old hash lock - let _ = old_hash_guard.release().await; + if let Err(e) = old_hash_guard.release().await { + warn!("Failed to release old hash lock: {}", e); + } } } // Update file metadata - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_ref_file(&app_state.bucket_name, path, &digest) - .await?; + .await + { + let _ = guard.release().await; + return Err(e); + } - app_state + if let Err(e) = app_state .kvstorage .lock() .await .set_modified(&app_state.bucket_name, path, file_info.last_modified) - .await?; + .await + { + let _ = guard.release().await; + return Err(e); + } - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } Ok(true) } diff --git a/src/routes/ft/delete_file.rs b/src/routes/ft/delete_file.rs index ebd9e51..6f2aa2d 100644 --- a/src/routes/ft/delete_file.rs +++ b/src/routes/ft/delete_file.rs @@ -5,7 +5,7 @@ use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; use std::sync::Arc; use std::time::Instant; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; pub async fn ft_delete_file( State(state): State>, @@ -73,7 +73,9 @@ pub async fn ft_delete_file( if current_modified.is_err() { error!("Failed to get current modified"); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to get current modified".to_string()) @@ -85,7 +87,9 @@ pub async fn ft_delete_file( if current_modified == 0 { debug!("File {} not found", path); record_metrics("404"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::NOT_FOUND) .body("File not found".to_string()) @@ -99,7 +103,9 @@ pub async fn ft_delete_file( path, timestamp, current_modified ); record_metrics("200"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::OK) .body("".to_string()) @@ -116,7 +122,9 @@ pub async fn ft_delete_file( if hash.is_err() { error!("Failed to get ref file"); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to get ref file".to_string()) @@ -127,7 +135,9 @@ pub async fn ft_delete_file( if hash.is_empty() { error!("File {} has no hash reference", path); record_metrics("404"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::NOT_FOUND) .body("File has no hash reference".to_string()) @@ -142,7 +152,9 @@ pub async fn ft_delete_file( Err(e) => { error!("Failed to acquire hash lock: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to acquire hash lock".to_string()) @@ -155,15 +167,19 @@ pub async fn ft_delete_file( .kvstorage .lock() .await - .decrement_ref_count(&state.bucket_name, &hash) + .atomic_decrement_ref_count(&state.bucket_name, &hash) .await { Ok(count) => count, Err(e) => { error!("Failed to decrement ref count: {}", e); record_metrics("500"); - let _ = hash_guard.release().await; - let _ = guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to decrement ref count".to_string()) @@ -176,7 +192,10 @@ pub async fn ft_delete_file( debug!("Deleting blob with hash: {}", hash); // Delete blob from S3 if let Err(e) = state.s3storage.lock().await.delete_object(&hash).await { - error!("Failed to delete object from S3: {}", e); + error!( + "Failed to delete object from S3 (bucket={}, key={}): {}", + state.bucket_name, hash, e + ); // Continue anyway - metadata cleanup is more important } @@ -190,7 +209,9 @@ pub async fn ft_delete_file( } // Release hash lock - done with refcount/S3 operations - let _ = hash_guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } // 9. Delete file metadata (path -> hash mapping and timestamp) if let Err(e) = state @@ -202,7 +223,9 @@ pub async fn ft_delete_file( { error!("Failed to delete ref file: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to delete ref file".to_string()) @@ -218,7 +241,9 @@ pub async fn ft_delete_file( { error!("Failed to delete modified time: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to delete modified time".to_string()) @@ -228,7 +253,9 @@ pub async fn ft_delete_file( debug!("Deleted file {}", path); // Release lock early since all critical metadata operations are complete - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } // 9. Dual-delete from filetracker if in live migration mode if let Some(filetracker_client) = &state.filetracker_client { diff --git a/src/routes/ft/get_file.rs b/src/routes/ft/get_file.rs index 3cb2bff..39e7fcb 100644 --- a/src/routes/ft/get_file.rs +++ b/src/routes/ft/get_file.rs @@ -5,7 +5,7 @@ use axum::extract::{Path, State}; use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; use std::sync::Arc; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; pub async fn ft_get_file( State(state): State>, @@ -55,7 +55,9 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -80,7 +82,9 @@ pub async fn ft_get_file( // Release the shared lock before migration to avoid deadlock // (migration needs exclusive lock on the same key) - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } // Migrate the file on-the-fly using migration logic let result = crate::migration::migrate_single_file_from_metadata( @@ -169,7 +173,9 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -186,7 +192,9 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::empty()) @@ -209,7 +217,9 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -220,7 +230,12 @@ pub async fn ft_get_file( // 5. Fetch the blob from S3 let blob_data = state.s3storage.lock().await.get_object(&hash).await; if blob_data.is_err() { - error!("Failed to get object from S3: {}", blob_data.err().unwrap()); + error!( + "Failed to get object from S3 (bucket={}, key={}): {}", + state.bucket_name, + hash, + blob_data.err().unwrap() + ); metrics::HTTP_REQUESTS_TOTAL .with_label_values(&["GET", "/ft/files", "500"]) .inc(); @@ -228,7 +243,9 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -236,7 +253,9 @@ pub async fn ft_get_file( } let blob_data = blob_data.unwrap(); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } // 6. Record metrics metrics::HTTP_REQUESTS_TOTAL diff --git a/src/routes/ft/put_file.rs b/src/routes/ft/put_file.rs index 9e386da..1c22866 100644 --- a/src/routes/ft/put_file.rs +++ b/src/routes/ft/put_file.rs @@ -6,7 +6,7 @@ use axum::http::{HeaderMap, Response, StatusCode}; use axum::response::IntoResponse; use std::sync::Arc; use std::time::Instant; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; pub async fn ft_put_file( State(state): State>, @@ -116,7 +116,9 @@ pub async fn ft_put_file( if current_modified.is_err() { error!("Failed to get current modified"); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to get current modified".to_string()) @@ -131,7 +133,9 @@ pub async fn ft_put_file( path, timestamp, current_modified ); record_metrics("200"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::OK) .header("Content-Type", "text/plain") @@ -157,7 +161,9 @@ pub async fn ft_put_file( Err(e) => { error!("Failed to decompress gzip data: {}", e); record_metrics("400"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::BAD_REQUEST) .body("Failed to decompress gzip data".to_string()) @@ -180,7 +186,9 @@ pub async fn ft_put_file( Err(e) => { error!("Failed to compress data: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to compress data".to_string()) @@ -200,7 +208,9 @@ pub async fn ft_put_file( Err(e) => { error!("Failed to acquire hash lock: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to acquire hash lock".to_string()) @@ -216,10 +226,17 @@ pub async fn ft_put_file( let blob_exists = match state.s3storage.lock().await.object_exists(s3_key).await { Ok(exists) => exists, Err(e) => { - error!("Failed to check object existence: {}", e); + error!( + "Failed to check object existence for path '{}' (bucket={}, key={}): {}", + path, state.bucket_name, s3_key, e + ); record_metrics("500"); - let _ = hash_guard.release().await; - let _ = guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to check object existence".to_string()) @@ -249,33 +266,44 @@ pub async fn ft_put_file( .put_object(s3_key, final_data.clone()) .await { - error!("Failed to store object in S3: {}", e); + error!( + "Failed to store object in S3 for path '{}' (bucket={}, key={}): {}", + path, state.bucket_name, s3_key, e + ); record_metrics("500"); - let _ = hash_guard.release().await; - let _ = guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to store object".to_string()) .unwrap(); } + } - // Store compressed size metadata (actual bytes in S3) - only when we upload - if let Err(e) = state - .kvstorage - .lock() - .await - .set_compressed_size(&state.bucket_name, &digest, final_data.len()) - .await - { - error!("Failed to store compressed size: {}", e); - record_metrics("500"); - let _ = hash_guard.release().await; - let _ = guard.release().await; - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body("Failed to store compressed size".to_string()) - .unwrap(); + // Store compressed size metadata (always, in case KV metadata was lost but S3 blob still exists) + if let Err(e) = state + .kvstorage + .lock() + .await + .set_compressed_size(&state.bucket_name, &digest, final_data.len()) + .await + { + error!("Failed to store compressed size: {}", e); + record_metrics("500"); + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); } + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("Failed to store compressed size".to_string()) + .unwrap(); } // Store logical size metadata (always, in case KV metadata was lost but S3 blob still exists) @@ -288,8 +316,12 @@ pub async fn ft_put_file( { error!("Failed to store logical size: {}", e); record_metrics("500"); - let _ = hash_guard.release().await; - let _ = guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to store logical size".to_string()) @@ -323,13 +355,17 @@ pub async fn ft_put_file( .kvstorage .lock() .await - .increment_ref_count(&state.bucket_name, &digest) + .atomic_increment_ref_count(&state.bucket_name, &digest) .await { error!("Failed to increment ref count: {}", e); record_metrics("500"); - let _ = hash_guard.release().await; - let _ = guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to increment ref count".to_string()) @@ -337,7 +373,9 @@ pub async fn ft_put_file( } // Release new hash lock - we're done with S3/refcount operations for new hash - let _ = hash_guard.release().await; + if let Err(e) = hash_guard.release().await { + warn!("Failed to release hash lock: {}", e); + } // If overwriting with different content, decrement old blob reference if let Some(old_hash) = old_hash @@ -357,7 +395,9 @@ pub async fn ft_put_file( Err(e) => { error!("Failed to acquire old hash lock: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to acquire old hash lock".to_string()) @@ -370,7 +410,7 @@ pub async fn ft_put_file( .kvstorage .lock() .await - .decrement_ref_count(&state.bucket_name, &old_hash) + .atomic_decrement_ref_count(&state.bucket_name, &old_hash) .await; // Delete old blob if no longer referenced @@ -382,7 +422,9 @@ pub async fn ft_put_file( } // Release old hash lock - let _ = old_hash_guard.release().await; + if let Err(e) = old_hash_guard.release().await { + warn!("Failed to release old hash lock: {}", e); + } } // 9. Update file metadata (path -> hash mapping and timestamp) @@ -395,7 +437,9 @@ pub async fn ft_put_file( { error!("Failed to set ref file: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to set ref file".to_string()) @@ -411,7 +455,9 @@ pub async fn ft_put_file( { error!("Failed to set modified time: {}", e); record_metrics("500"); - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to set modified time".to_string()) @@ -421,7 +467,9 @@ pub async fn ft_put_file( debug!("Created link {}.", path); // Release lock early since all critical metadata operations are complete - let _ = guard.release().await; + if let Err(e) = guard.release().await { + warn!("Failed to release file lock: {}", e); + } // 9. Dual-write to filetracker if in live migration mode if let Some(filetracker_client) = &state.filetracker_client { diff --git a/tests/cleaner_test.rs b/tests/cleaner_test.rs index e785297..3eca08d 100644 --- a/tests/cleaner_test.rs +++ b/tests/cleaner_test.rs @@ -203,13 +203,19 @@ async fn test_clean_unreferenced_refcounts() { kvstorage .lock() .await - .set_ref_count(&bucket_name, "hash1", 1) + .atomic_increment_ref_count(&bucket_name, "hash1") .await .unwrap(); kvstorage .lock() .await - .set_ref_count(&bucket_name, "hash2", 2) + .atomic_increment_ref_count(&bucket_name, "hash2") + .await + .unwrap(); + kvstorage + .lock() + .await + .atomic_increment_ref_count(&bucket_name, "hash2") .await .unwrap(); @@ -223,7 +229,7 @@ async fn test_clean_unreferenced_refcounts() { kvstorage .lock() .await - .set_ref_count(&bucket_name, "hash3", 1) + .atomic_increment_ref_count(&bucket_name, "hash3") .await .unwrap(); @@ -315,7 +321,7 @@ async fn test_clean_unused_s3_objects() { kvstorage .lock() .await - .set_ref_count(&bucket_name, "hash3", 1) + .atomic_increment_ref_count(&bucket_name, "hash3") .await .unwrap(); @@ -387,7 +393,7 @@ async fn test_clean_orphaned_logical_sizes() { kvstorage .lock() .await - .set_ref_count(&bucket_name, "hash3", 1) + .atomic_increment_ref_count(&bucket_name, "hash3") .await .unwrap(); @@ -594,7 +600,7 @@ async fn test_full_cleanup_cycle() { kvstorage .lock() .await - .set_ref_count(&bucket_name, "crash_hash", 1) + .atomic_increment_ref_count(&bucket_name, "crash_hash") .await .unwrap(); @@ -619,7 +625,7 @@ async fn test_full_cleanup_cycle() { kvstorage .lock() .await - .set_ref_count(&bucket_name, "good_hash", 1) + .atomic_increment_ref_count(&bucket_name, "good_hash") .await .unwrap(); kvstorage