Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 52 additions & 38 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use nativelink_util::store_trait::{
};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};

use crate::callback_utils::RemoveItemCallbackHolder;
use crate::cas_utils::is_zero_digest;
Expand Down Expand Up @@ -129,7 +129,8 @@ impl Drop for EncodedFilePath {
.fetch_add(1, Ordering::Relaxed)
+ 1;
debug!(
?current_active_drop_spawns,
%current_active_drop_spawns,
?file_path,
"Spawned a filesystem_delete_file"
);
background_spawn!("filesystem_delete_file", async move {
Expand All @@ -148,6 +149,7 @@ impl Drop for EncodedFilePath {
- 1;
debug!(
?current_active_drop_spawns,
?file_path,
"Dropped a filesystem_delete_file"
);
});
Expand Down Expand Up @@ -220,6 +222,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
pub struct FileEntryImpl {
data_size: u64,
block_size: u64,
// We lock around this as it gets rewritten when we move between temp and content types
encoded_file_path: RwLock<EncodedFilePath>,
}

Expand Down Expand Up @@ -362,37 +365,38 @@ impl LenEntry for FileEntryImpl {
// target file location to the new temp file. `unref()` should only ever be called once.
#[inline]
async fn unref(&self) {
Copy link
Member Author

@palfrey palfrey Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this is whitespace changes as it had a double-nested {/} for some reason.

{
let mut encoded_file_path = self.encoded_file_path.write().await;
if encoded_file_path.path_type == PathType::Temp {
// We are already a temp file that is now marked for deletion on drop.
// This is very rare, but most likely the rename into the content path failed.
return;
}
let from_path = encoded_file_path.get_file_path();
let new_key = make_temp_key(&encoded_file_path.key);

let to_path =
to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);

if let Err(err) = fs::rename(&from_path, &to_path).await {
warn!(
key = ?encoded_file_path.key,
?from_path,
?to_path,
?err,
"Failed to rename file",
);
} else {
debug!(
key = ?encoded_file_path.key,
?from_path,
?to_path,
"Renamed file",
);
encoded_file_path.path_type = PathType::Temp;
encoded_file_path.key = new_key;
}
let mut encoded_file_path = self.encoded_file_path.write().await;
if encoded_file_path.path_type == PathType::Temp {
// We are already a temp file that is now marked for deletion on drop.
// This is very rare, but most likely the rename into the content path failed.
warn!(
key = ?encoded_file_path.key,
"File is already a temp file",
);
return;
}
let from_path = encoded_file_path.get_file_path();
let new_key = make_temp_key(&encoded_file_path.key);

let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);

if let Err(err) = fs::rename(&from_path, &to_path).await {
warn!(
key = ?encoded_file_path.key,
?from_path,
?to_path,
?err,
"Failed to rename file",
);
} else {
debug!(
key = ?encoded_file_path.key,
?from_path,
?to_path,
"Renamed file (unref)",
);
encoded_file_path.path_type = PathType::Temp;
encoded_file_path.key = new_key;
}
}
}
Expand Down Expand Up @@ -531,7 +535,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
if let Err(err) = rename_fn(&from_file, &to_file) {
warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
} else {
debug!(?from_file, ?to_file, "Renamed file",);
debug!(?from_file, ?to_file, "Renamed file (old cache)",);
}
}
Ok(())
Expand Down Expand Up @@ -751,6 +755,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
.await
.err_tip(|| "Failed to sync_data in filesystem store")?;

debug!(?temp_file, "Dropping file to update_file");
drop(temp_file);

*entry.data_size_mut() = data_size;
Expand Down Expand Up @@ -781,17 +786,25 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
// We need to guarantee that this will get to the end even if the parent future is dropped.
// See: https://github.com/TraceMachina/nativelink/issues/495
background_spawn!("filesystem_store_emplace_file", async move {
evicting_map
.insert(key.borrow().into_owned().into(), entry.clone())
.await;

// The insert might have resulted in an eviction/unref so we need to check
// it still exists in there. But first, get the lock...
let mut encoded_file_path = entry.get_encoded_file_path().write().await;
// Then check it's still in there...
if evicting_map.get(&key).await.is_none() {
info!(%key, "Got eviction while emplacing, dropping");
return Ok(());
}

let final_path = get_file_path_raw(
&PathType::Content,
encoded_file_path.shared_context.as_ref(),
&key,
);

evicting_map
.insert(key.borrow().into_owned().into(), entry.clone())
.await;

let from_path = encoded_file_path.get_file_path();
// Internally tokio spawns fs commands onto a blocking thread anyways.
// Since we are already on a blocking thread, we just need the `fs` wrapper to manage
Expand Down Expand Up @@ -941,6 +954,7 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
);
// We are done with the file, if we hold a reference to the file here, it could
// result in a deadlock if `emplace_file()` also needs file descriptors.
debug!(?file, "Dropping file to to update_with_whole_file");
drop(file);
self.emplace_file(key.into_owned(), Arc::new(entry))
.await
Expand Down
75 changes: 69 additions & 6 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use bytes::Bytes;
use futures::executor::block_on;
use futures::task::Poll;
use futures::{Future, FutureExt, poll};
use nativelink_config::stores::FilesystemSpec;
use nativelink_config::stores::{EvictionPolicy, FilesystemSpec};
use nativelink_error::{Code, Error, ResultExt, make_err};
use nativelink_macro::nativelink_test;
use nativelink_store::filesystem_store::{
Expand All @@ -41,7 +41,8 @@ use nativelink_util::{background_spawn, spawn};
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
use parking_lot::Mutex;
use pretty_assertions::assert_eq;
use rand::Rng;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use sha2::{Digest, Sha256};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Take};
use tokio::sync::{Barrier, Semaphore};
Expand All @@ -50,6 +51,15 @@ use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReadDirStream;
use tracing::Instrument;

const VALID_HASH: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef";

fn make_random_data(sz: usize) -> Vec<u8> {
let mut value = vec![0u8; sz];
let mut rng = SmallRng::seed_from_u64(1);
rng.fill(&mut value[..]);
value
}

trait FileEntryHooks {
fn on_make_and_open(
_encoded_file_path: &EncodedFilePath,
Expand Down Expand Up @@ -331,7 +341,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
eviction_policy: Some(EvictionPolicy {
max_count: 3,
..Default::default()
}),
Expand Down Expand Up @@ -404,7 +414,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
eviction_policy: Some(EvictionPolicy {
max_count: 3,
..Default::default()
}),
Expand Down Expand Up @@ -512,7 +522,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
eviction_policy: Some(EvictionPolicy {
max_count: 1,
..Default::default()
}),
Expand Down Expand Up @@ -658,7 +668,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
eviction_policy: Some(EvictionPolicy {
max_bytes: 5,
..Default::default()
}),
Expand Down Expand Up @@ -1400,3 +1410,56 @@ async fn file_slot_taken_when_ready() -> Result<(), Error> {
.map_err(|_| make_err!(Code::Internal, "Deadlock detected"))?;
res_1.merge(res_2).merge(res_3).merge(res_4)
}

// If we insert a file larger than the max_bytes eviction policy, it should be safely
// evicted, without deadlocking.
#[nativelink_test]
async fn safe_small_safe_eviction() -> Result<(), Error> {
let store_spec = FilesystemSpec {
content_path: "/tmp/nativelink/safe_fs".into(),
temp_path: "/tmp/nativelink/safe_fs_temp".into(),
eviction_policy: Some(EvictionPolicy {
max_bytes: 1,
..Default::default()
}),
..Default::default()
};
let store = Store::new(<FilesystemStore>::new(&store_spec).await?);

// > than the max_bytes
let bytes = 2;

let data = make_random_data(bytes);
let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap();

assert_eq!(
store.has(digest).await,
Ok(None),
"Expected data to not exist in store"
);

store.update_oneshot(digest, data.clone().into()).await?;

assert_eq!(
store.has(digest).await,
Ok(None),
"Expected data to not exist in store, because eviction"
);

let (tx, mut rx) = make_buf_channel_pair();

assert_eq!(
store.get(digest, tx).await,
Err(Error {
code: Code::NotFound,
messages: vec![format!(
"{VALID_HASH}-{bytes} not found in filesystem store here"
)],
}),
"Expected data to not exist in store, because eviction"
);

assert!(rx.recv().await.is_err());

Ok(())
}
6 changes: 5 additions & 1 deletion nativelink-util/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use rlimit::increase_nofile_limit;
pub use tokio::fs::DirEntry;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
use tokio::sync::{Semaphore, SemaphorePermit};
use tracing::{error, info, warn};
use tracing::{error, info, trace, warn};

use crate::spawn_blocking;

Expand Down Expand Up @@ -121,6 +121,10 @@ pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FI
/// Try to acquire a permit from the open file semaphore.
#[inline]
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
trace!(
available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
"getting FS permit"
);
OPEN_FILE_SEMAPHORE
.acquire()
.await
Expand Down
Loading