From 0d6f24aac2d26a7299cde3076c8c6a033dd0cbe4 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Fri, 7 Nov 2025 18:27:16 +0000 Subject: [PATCH] Do not need to store zero-length filesystem files --- nativelink-store/src/filesystem_store.rs | 20 +++ .../tests/filesystem_store_test.rs | 127 +++++++++++++----- .../src/running_actions_manager.rs | 65 +++++---- 3 files changed, 147 insertions(+), 65 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 54850d747..2ded8bd4c 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -704,6 +704,17 @@ impl FilesystemStore { } pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result, Error> { + if is_zero_digest(digest) { + return Ok(Arc::new(Fe::create( + 0, + 0, + RwLock::new(EncodedFilePath { + shared_context: self.shared_context.clone(), + path_type: PathType::Content, + key: digest.into(), + }), + ))); + } self.evicting_map .get(&digest.into()) .await @@ -860,6 +871,11 @@ impl StoreDriver for FilesystemStore { mut reader: DropCloserReadHalf, _upload_size: UploadSizeInfo, ) -> Result<(), Error> { + if is_zero_digest(key.borrow()) { + // don't need to add, because zero length files are just assumed to exist + return Ok(()); + } + let temp_key = make_temp_key(&key); // There's a possibility of deadlock here where we take all of the @@ -910,6 +926,10 @@ impl StoreDriver for FilesystemStore { .err_tip(|| format!("While reading metadata for {}", path.display()))? .len(), }; + if file_size == 0 { + // don't need to add, because zero length files are just assumed to exist + return Ok(None); + } let entry = Fe::create( file_size, self.block_size, diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 2adb02c69..6f2d1b6b3 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -226,9 +226,9 @@ async fn wait_for_no_open_files() -> Result<(), Error> { Ok(()) } -/// Helper function to ensure there are no temporary files left. -async fn check_temp_empty(temp_path: &str) -> Result<(), Error> { - let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) +/// Helper function to ensure there are no temporary or content files left. +async fn check_storage_dir_empty(storage_path: &str) -> Result<(), Error> { + let (_permit, temp_dir_handle) = fs::read_dir(format!("{storage_path}/{DIGEST_FOLDER}")) .await .err_tip(|| "Failed opening temp directory")? .into_inner(); @@ -243,7 +243,7 @@ async fn check_temp_empty(temp_path: &str) -> Result<(), Error> { ); } - let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{STR_FOLDER}")) + let (_permit, temp_dir_handle) = fs::read_dir(format!("{storage_path}/{STR_FOLDER}")) .await .err_tip(|| "Failed opening temp directory")? .into_inner(); @@ -380,7 +380,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { "Dropped a filesystem_delete_file current_active_drop_spawns=0" )); - check_temp_empty(&temp_path).await + check_storage_dir_empty(&temp_path).await } // This test ensures that if a file is overridden and an open stream to the file already @@ -487,7 +487,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> } // Now ensure our temp file was cleaned up. - check_temp_empty(&temp_path).await + check_storage_dir_empty(&temp_path).await } // Eviction has a different code path than a file replacement, so we check that if a @@ -583,7 +583,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { } // Now ensure our temp file was cleaned up. - check_temp_empty(&temp_path).await + check_storage_dir_empty(&temp_path).await } // Test to ensure that if we are holding a reference to `FileEntry` and the contents are @@ -805,7 +805,7 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() // Now it should have cleaned up its temp files. { - check_temp_empty(&temp_path).await?; + check_storage_dir_empty(&temp_path).await?; } // Finally ensure that our entry is not in the store. @@ -907,32 +907,6 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { #[nativelink_test] async fn has_with_results_on_zero_digests() -> Result<(), Error> { - async fn wait_for_empty_content_file< - Fut: Future>, - F: Fn() -> Fut, - >( - content_path: &str, - digest: DigestInfo, - yield_fn: F, - ) -> Result<(), Error> { - loop { - yield_fn().await?; - - let empty_digest_file_name = - OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); - - let file_metadata = fs::metadata(empty_digest_file_name) - .await - .err_tip(|| "Failed to open content file")?; - - // Test that the empty digest file is created and contains an empty length. - if file_metadata.is_file() && file_metadata.len() == 0 { - return Ok(()); - } - } - // Unreachable. - } - let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); let content_path = make_temp_path("content_path"); let temp_path = make_temp_path("temp_path"); @@ -960,12 +934,93 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { ); assert_eq!(results, vec![Some(0)]); - wait_for_empty_content_file(&content_path, digest, || async move { - tokio::task::yield_now().await; + check_storage_dir_empty(&content_path).await?; + + Ok(()) +} + +async fn wrap_update_zero_digest(updater: F) -> Result<(), Error> +where + F: AsyncFnOnce(DigestInfo, Arc) -> Result<(), Error>, +{ + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); + let content_path = make_temp_path("content_path"); + let temp_path = make_temp_path("temp_path"); + + let store = FilesystemStore::::new_with_timeout_and_rename_fn( + &FilesystemSpec { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + read_buffer_size: 1, + ..Default::default() + }, + |from, to| std::fs::rename(from, to), + ) + .await?; + updater(digest, store).await?; + check_storage_dir_empty(&content_path).await?; + check_storage_dir_empty(&temp_path).await?; + Ok(()) +} + +#[nativelink_test] +async fn update_whole_file_with_zero_digest() -> Result<(), Error> { + wrap_update_zero_digest(async |digest, store| { + let temp_file_dir = make_temp_path("update_with_zero_digest"); + std::fs::create_dir_all(&temp_file_dir)?; + let temp_file_path = Path::new(&temp_file_dir).join("zero-length-file"); + std::fs::write(&temp_file_path, b"") + .err_tip(|| format!("Writing to {temp_file_path:?}"))?; + let file_slot = fs::open_file(&temp_file_path, 0, 0).await?.into_inner(); + store + .update_with_whole_file( + digest, + temp_file_path.into(), + file_slot, + UploadSizeInfo::ExactSize(0), + ) + .await?; Ok(()) }) + .await +} + +#[nativelink_test] +async fn update_oneshot_with_zero_digest() -> Result<(), Error> { + wrap_update_zero_digest(async |digest, store| store.update_oneshot(digest, Bytes::new()).await) + .await +} + +#[nativelink_test] +async fn update_with_zero_digest() -> Result<(), Error> { + wrap_update_zero_digest(async |digest, store| { + let (_writer, reader) = make_buf_channel_pair(); + store + .update(digest, reader, UploadSizeInfo::ExactSize(0)) + .await + }) + .await +} + +#[nativelink_test] +async fn get_file_entry_for_zero_digest() -> Result<(), Error> { + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); + let content_path = make_temp_path("content_path"); + let temp_path = make_temp_path("temp_path"); + + let store = FilesystemStore::::new_with_timeout_and_rename_fn( + &FilesystemSpec { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + read_buffer_size: 1, + ..Default::default() + }, + |from, to| std::fs::rename(from, to), + ) .await?; + let file_entry = store.get_file_entry_for_digest(&digest).await?; + assert!(file_entry.is_empty()); Ok(()) } diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 0923ac48f..a9190c6f9 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -54,6 +54,7 @@ use nativelink_proto::com::github::trace_machina::nativelink::remote_execution:: use nativelink_store::ac_utils::{ ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message, }; +use nativelink_store::cas_utils::is_zero_digest; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::{FileEntry, FilesystemStore}; use nativelink_store::grpc_store::GrpcStore; @@ -71,7 +72,7 @@ use prost::Message; use relative_path::RelativePath; use scopeguard::{ScopeGuard, guard}; use serde::Deserialize; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::process; use tokio::sync::{Notify, oneshot, watch}; use tokio::time::Instant; @@ -150,34 +151,40 @@ pub fn download_to_directory<'a>( cas_store .populate_fast_store(digest.into()) .and_then(move |()| async move { - let file_entry = filesystem_store - .get_file_entry_for_digest(&digest) - .await - .err_tip(|| "During hard link")?; - // TODO: add a test for #2051: deadlock with large number of files - let src_path = file_entry.get_file_path_locked(|src| async move { Ok(PathBuf::from(src)) }).await?; - fs::hard_link(&src_path, &dest) - .await - .map_err(|e| { - if e.code == Code::NotFound { - make_err!( - Code::Internal, - "Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\ - This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\ - To fix this issue:\n\ - 1. Increase the 'max_bytes' value in your filesystem store configuration\n\ - 2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\ - 3. The setting is typically found in your nativelink.json config under:\n\ - stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\ - 4. Restart NativeLink after making the change\n\n\ - If this error persists after increasing max_bytes several times, please report at:\n\ - https://github.com/TraceMachina/nativelink/issues\n\ - Include your config file and both server and client logs to help us assist you." - ) - } else { - make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}") - } - })?; + if is_zero_digest(digest) { + let mut file_slot = fs::create_file(&dest).await?; + file_slot.write_all(&[]).await?; + } + else { + let file_entry = filesystem_store + .get_file_entry_for_digest(&digest) + .await + .err_tip(|| "During hard link")?; + // TODO: add a test for #2051: deadlock with large number of files + let src_path = file_entry.get_file_path_locked(|src| async move { Ok(PathBuf::from(src)) }).await?; + fs::hard_link(&src_path, &dest) + .await + .map_err(|e| { + if e.code == Code::NotFound { + make_err!( + Code::Internal, + "Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\ + This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\ + To fix this issue:\n\ + 1. Increase the 'max_bytes' value in your filesystem store configuration\n\ + 2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\ + 3. The setting is typically found in your nativelink.json config under:\n\ + stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\ + 4. Restart NativeLink after making the change\n\n\ + If this error persists after increasing max_bytes several times, please report at:\n\ + https://github.com/TraceMachina/nativelink/issues\n\ + Include your config file and both server and client logs to help us assist you." + ) + } else { + make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}") + } + })?; + } #[cfg(target_family = "unix")] if let Some(unix_mode) = unix_mode { fs::set_permissions(&dest, Permissions::from_mode(unix_mode))