From 0f68eab1e4f6c9fd8c96677a60e24fe918626374 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Tue, 21 Oct 2025 10:43:21 +0100 Subject: [PATCH] New filesystem test for eviction breaking --- nativelink-store/src/filesystem_store.rs | 90 +++++++++++-------- .../tests/filesystem_store_test.rs | 75 ++++++++++++++-- nativelink-util/src/fs.rs | 6 +- 3 files changed, 126 insertions(+), 45 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 2ded8bd4c..ed94bfa75 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -40,7 +40,7 @@ use nativelink_util::store_trait::{ }; use tokio::io::{AsyncReadExt, AsyncWriteExt, Take}; use tokio_stream::wrappers::ReadDirStream; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use crate::callback_utils::RemoveItemCallbackHolder; use crate::cas_utils::is_zero_digest; @@ -129,7 +129,8 @@ impl Drop for EncodedFilePath { .fetch_add(1, Ordering::Relaxed) + 1; debug!( - ?current_active_drop_spawns, + %current_active_drop_spawns, + ?file_path, "Spawned a filesystem_delete_file" ); background_spawn!("filesystem_delete_file", async move { @@ -148,6 +149,7 @@ impl Drop for EncodedFilePath { - 1; debug!( ?current_active_drop_spawns, + ?file_path, "Dropped a filesystem_delete_file" ); }); @@ -220,6 +222,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { pub struct FileEntryImpl { data_size: u64, block_size: u64, + // We lock around this as it gets rewritten when we move between temp and content types encoded_file_path: RwLock, } @@ -362,37 +365,38 @@ impl LenEntry for FileEntryImpl { // target file location to the new temp file. `unref()` should only ever be called once. #[inline] async fn unref(&self) { - { - let mut encoded_file_path = self.encoded_file_path.write().await; - if encoded_file_path.path_type == PathType::Temp { - // We are already a temp file that is now marked for deletion on drop. - // This is very rare, but most likely the rename into the content path failed. - return; - } - let from_path = encoded_file_path.get_file_path(); - let new_key = make_temp_key(&encoded_file_path.key); - - let to_path = - to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key); - - if let Err(err) = fs::rename(&from_path, &to_path).await { - warn!( - key = ?encoded_file_path.key, - ?from_path, - ?to_path, - ?err, - "Failed to rename file", - ); - } else { - debug!( - key = ?encoded_file_path.key, - ?from_path, - ?to_path, - "Renamed file", - ); - encoded_file_path.path_type = PathType::Temp; - encoded_file_path.key = new_key; - } + let mut encoded_file_path = self.encoded_file_path.write().await; + if encoded_file_path.path_type == PathType::Temp { + // We are already a temp file that is now marked for deletion on drop. + // This is very rare, but most likely the rename into the content path failed. + warn!( + key = ?encoded_file_path.key, + "File is already a temp file", + ); + return; + } + let from_path = encoded_file_path.get_file_path(); + let new_key = make_temp_key(&encoded_file_path.key); + + let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key); + + if let Err(err) = fs::rename(&from_path, &to_path).await { + warn!( + key = ?encoded_file_path.key, + ?from_path, + ?to_path, + ?err, + "Failed to rename file", + ); + } else { + debug!( + key = ?encoded_file_path.key, + ?from_path, + ?to_path, + "Renamed file (unref)", + ); + encoded_file_path.path_type = PathType::Temp; + encoded_file_path.key = new_key; } } } @@ -531,7 +535,7 @@ async fn add_files_to_cache( if let Err(err) = rename_fn(&from_file, &to_file) { warn!(?from_file, ?to_file, ?err, "Failed to rename file",); } else { - debug!(?from_file, ?to_file, "Renamed file",); + debug!(?from_file, ?to_file, "Renamed file (old cache)",); } } Ok(()) @@ -751,6 +755,7 @@ impl FilesystemStore { .await .err_tip(|| "Failed to sync_data in filesystem store")?; + debug!(?temp_file, "Dropping file to update_file"); drop(temp_file); *entry.data_size_mut() = data_size; @@ -781,17 +786,25 @@ impl FilesystemStore { // We need to guarantee that this will get to the end even if the parent future is dropped. // See: https://github.com/TraceMachina/nativelink/issues/495 background_spawn!("filesystem_store_emplace_file", async move { + evicting_map + .insert(key.borrow().into_owned().into(), entry.clone()) + .await; + + // The insert might have resulted in an eviction/unref so we need to check + // it still exists in there. But first, get the lock... let mut encoded_file_path = entry.get_encoded_file_path().write().await; + // Then check it's still in there... + if evicting_map.get(&key).await.is_none() { + info!(%key, "Got eviction while emplacing, dropping"); + return Ok(()); + } + let final_path = get_file_path_raw( &PathType::Content, encoded_file_path.shared_context.as_ref(), &key, ); - evicting_map - .insert(key.borrow().into_owned().into(), entry.clone()) - .await; - let from_path = encoded_file_path.get_file_path(); // Internally tokio spawns fs commands onto a blocking thread anyways. // Since we are already on a blocking thread, we just need the `fs` wrapper to manage @@ -941,6 +954,7 @@ impl StoreDriver for FilesystemStore { ); // We are done with the file, if we hold a reference to the file here, it could // result in a deadlock if `emplace_file()` also needs file descriptors. + debug!(?file, "Dropping file to to update_with_whole_file"); drop(file); self.emplace_file(key.into_owned(), Arc::new(entry)) .await diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 6f2d1b6b3..bceb95d5e 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -26,7 +26,7 @@ use bytes::Bytes; use futures::executor::block_on; use futures::task::Poll; use futures::{Future, FutureExt, poll}; -use nativelink_config::stores::FilesystemSpec; +use nativelink_config::stores::{EvictionPolicy, FilesystemSpec}; use nativelink_error::{Code, Error, ResultExt, make_err}; use nativelink_macro::nativelink_test; use nativelink_store::filesystem_store::{ @@ -41,7 +41,8 @@ use nativelink_util::{background_spawn, spawn}; use opentelemetry::context::{Context, FutureExt as OtelFutureExt}; use parking_lot::Mutex; use pretty_assertions::assert_eq; -use rand::Rng; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; use sha2::{Digest, Sha256}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Take}; use tokio::sync::{Barrier, Semaphore}; @@ -50,6 +51,15 @@ use tokio_stream::StreamExt; use tokio_stream::wrappers::ReadDirStream; use tracing::Instrument; +const VALID_HASH: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; + +fn make_random_data(sz: usize) -> Vec { + let mut value = vec![0u8; sz]; + let mut rng = SmallRng::seed_from_u64(1); + rng.fill(&mut value[..]); + value +} + trait FileEntryHooks { fn on_make_and_open( _encoded_file_path: &EncodedFilePath, @@ -331,7 +341,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { FilesystemStore::>::new(&FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + eviction_policy: Some(EvictionPolicy { max_count: 3, ..Default::default() }), @@ -404,7 +414,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> FilesystemStore::>::new(&FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + eviction_policy: Some(EvictionPolicy { max_count: 3, ..Default::default() }), @@ -512,7 +522,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { FilesystemStore::>::new(&FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + eviction_policy: Some(EvictionPolicy { max_count: 1, ..Default::default() }), @@ -658,7 +668,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { FilesystemStore::>::new(&FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + eviction_policy: Some(EvictionPolicy { max_bytes: 5, ..Default::default() }), @@ -1400,3 +1410,56 @@ async fn file_slot_taken_when_ready() -> Result<(), Error> { .map_err(|_| make_err!(Code::Internal, "Deadlock detected"))?; res_1.merge(res_2).merge(res_3).merge(res_4) } + +// If we insert a file larger than the max_bytes eviction policy, it should be safely +// evicted, without deadlocking. +#[nativelink_test] +async fn safe_small_safe_eviction() -> Result<(), Error> { + let store_spec = FilesystemSpec { + content_path: "/tmp/nativelink/safe_fs".into(), + temp_path: "/tmp/nativelink/safe_fs_temp".into(), + eviction_policy: Some(EvictionPolicy { + max_bytes: 1, + ..Default::default() + }), + ..Default::default() + }; + let store = Store::new(::new(&store_spec).await?); + + // > than the max_bytes + let bytes = 2; + + let data = make_random_data(bytes); + let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap(); + + assert_eq!( + store.has(digest).await, + Ok(None), + "Expected data to not exist in store" + ); + + store.update_oneshot(digest, data.clone().into()).await?; + + assert_eq!( + store.has(digest).await, + Ok(None), + "Expected data to not exist in store, because eviction" + ); + + let (tx, mut rx) = make_buf_channel_pair(); + + assert_eq!( + store.get(digest, tx).await, + Err(Error { + code: Code::NotFound, + messages: vec![format!( + "{VALID_HASH}-{bytes} not found in filesystem store here" + )], + }), + "Expected data to not exist in store, because eviction" + ); + + assert!(rx.recv().await.is_err()); + + Ok(()) +} diff --git a/nativelink-util/src/fs.rs b/nativelink-util/src/fs.rs index d22b9bba2..d29eaaef8 100644 --- a/nativelink-util/src/fs.rs +++ b/nativelink-util/src/fs.rs @@ -27,7 +27,7 @@ use rlimit::increase_nofile_limit; pub use tokio::fs::DirEntry; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take}; use tokio::sync::{Semaphore, SemaphorePermit}; -use tracing::{error, info, warn}; +use tracing::{error, info, trace, warn}; use crate::spawn_blocking; @@ -121,6 +121,10 @@ pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FI /// Try to acquire a permit from the open file semaphore. #[inline] pub async fn get_permit() -> Result, Error> { + trace!( + available_permits = OPEN_FILE_SEMAPHORE.available_permits(), + "getting FS permit" + ); OPEN_FILE_SEMAPHORE .acquire() .await