Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,17 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
}

pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
if is_zero_digest(digest) {
return Ok(Arc::new(Fe::create(
0,
0,
RwLock::new(EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Content,
key: digest.into(),
}),
)));
}
self.evicting_map
.get(&digest.into())
.await
Expand Down Expand Up @@ -860,6 +871,11 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
mut reader: DropCloserReadHalf,
_upload_size: UploadSizeInfo,
) -> Result<(), Error> {
if is_zero_digest(key.borrow()) {
// don't need to add, because zero length files are just assumed to exist
return Ok(());
}

let temp_key = make_temp_key(&key);

// There's a possibility of deadlock here where we take all of the
Expand Down Expand Up @@ -910,6 +926,10 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
.err_tip(|| format!("While reading metadata for {}", path.display()))?
.len(),
};
if file_size == 0 {
// don't need to add, because zero length files are just assumed to exist
return Ok(None);
}
let entry = Fe::create(
file_size,
self.block_size,
Expand Down
127 changes: 91 additions & 36 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ async fn wait_for_no_open_files() -> Result<(), Error> {
Ok(())
}

/// Helper function to ensure there are no temporary files left.
async fn check_temp_empty(temp_path: &str) -> Result<(), Error> {
let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}"))
/// Helper function to ensure there are no temporary or content files left.
async fn check_storage_dir_empty(storage_path: &str) -> Result<(), Error> {
let (_permit, temp_dir_handle) = fs::read_dir(format!("{storage_path}/{DIGEST_FOLDER}"))
.await
.err_tip(|| "Failed opening temp directory")?
.into_inner();
Expand All @@ -243,7 +243,7 @@ async fn check_temp_empty(temp_path: &str) -> Result<(), Error> {
);
}

let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{STR_FOLDER}"))
let (_permit, temp_dir_handle) = fs::read_dir(format!("{storage_path}/{STR_FOLDER}"))
.await
.err_tip(|| "Failed opening temp directory")?
.into_inner();
Expand Down Expand Up @@ -380,7 +380,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
"Dropped a filesystem_delete_file current_active_drop_spawns=0"
));

check_temp_empty(&temp_path).await
check_storage_dir_empty(&temp_path).await
}

// This test ensures that if a file is overridden and an open stream to the file already
Expand Down Expand Up @@ -487,7 +487,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
}

// Now ensure our temp file was cleaned up.
check_temp_empty(&temp_path).await
check_storage_dir_empty(&temp_path).await
}

// Eviction has a different code path than a file replacement, so we check that if a
Expand Down Expand Up @@ -583,7 +583,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
}

// Now ensure our temp file was cleaned up.
check_temp_empty(&temp_path).await
check_storage_dir_empty(&temp_path).await
}

// Test to ensure that if we are holding a reference to `FileEntry` and the contents are
Expand Down Expand Up @@ -805,7 +805,7 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens()

// Now it should have cleaned up its temp files.
{
check_temp_empty(&temp_path).await?;
check_storage_dir_empty(&temp_path).await?;
}

// Finally ensure that our entry is not in the store.
Expand Down Expand Up @@ -907,32 +907,6 @@ async fn get_part_is_zero_digest() -> Result<(), Error> {

#[nativelink_test]
async fn has_with_results_on_zero_digests() -> Result<(), Error> {
async fn wait_for_empty_content_file<
Fut: Future<Output = Result<(), Error>>,
F: Fn() -> Fut,
>(
content_path: &str,
digest: DigestInfo,
yield_fn: F,
) -> Result<(), Error> {
loop {
yield_fn().await?;

let empty_digest_file_name =
OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}"));

let file_metadata = fs::metadata(empty_digest_file_name)
.await
.err_tip(|| "Failed to open content file")?;

// Test that the empty digest file is created and contains an empty length.
if file_metadata.is_file() && file_metadata.len() == 0 {
return Ok(());
}
}
// Unreachable.
}

let digest = DigestInfo::new(Sha256::new().finalize().into(), 0);
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");
Expand Down Expand Up @@ -960,12 +934,93 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> {
);
assert_eq!(results, vec![Some(0)]);

wait_for_empty_content_file(&content_path, digest, || async move {
tokio::task::yield_now().await;
check_storage_dir_empty(&content_path).await?;

Ok(())
}

async fn wrap_update_zero_digest<F>(updater: F) -> Result<(), Error>
where
F: AsyncFnOnce(DigestInfo, Arc<FilesystemStore>) -> Result<(), Error>,
{
let digest = DigestInfo::new(Sha256::new().finalize().into(), 0);
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

let store = FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&FilesystemSpec {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|from, to| std::fs::rename(from, to),
)
.await?;
updater(digest, store).await?;
check_storage_dir_empty(&content_path).await?;
check_storage_dir_empty(&temp_path).await?;
Ok(())
}

#[nativelink_test]
async fn update_whole_file_with_zero_digest() -> Result<(), Error> {
wrap_update_zero_digest(async |digest, store| {
let temp_file_dir = make_temp_path("update_with_zero_digest");
std::fs::create_dir_all(&temp_file_dir)?;
let temp_file_path = Path::new(&temp_file_dir).join("zero-length-file");
std::fs::write(&temp_file_path, b"")
.err_tip(|| format!("Writing to {temp_file_path:?}"))?;
let file_slot = fs::open_file(&temp_file_path, 0, 0).await?.into_inner();
store
.update_with_whole_file(
digest,
temp_file_path.into(),
file_slot,
UploadSizeInfo::ExactSize(0),
)
.await?;
Ok(())
})
.await
}

#[nativelink_test]
async fn update_oneshot_with_zero_digest() -> Result<(), Error> {
wrap_update_zero_digest(async |digest, store| store.update_oneshot(digest, Bytes::new()).await)
.await
}

#[nativelink_test]
async fn update_with_zero_digest() -> Result<(), Error> {
wrap_update_zero_digest(async |digest, store| {
let (_writer, reader) = make_buf_channel_pair();
store
.update(digest, reader, UploadSizeInfo::ExactSize(0))
.await
})
.await
}

#[nativelink_test]
async fn get_file_entry_for_zero_digest() -> Result<(), Error> {
let digest = DigestInfo::new(Sha256::new().finalize().into(), 0);
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

let store = FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&FilesystemSpec {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|from, to| std::fs::rename(from, to),
)
.await?;

let file_entry = store.get_file_entry_for_digest(&digest).await?;
assert!(file_entry.is_empty());
Ok(())
}

Expand Down
65 changes: 36 additions & 29 deletions nativelink-worker/src/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::
use nativelink_store::ac_utils::{
ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,
};
use nativelink_store::cas_utils::is_zero_digest;
use nativelink_store::fast_slow_store::FastSlowStore;
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
use nativelink_store::grpc_store::GrpcStore;
Expand All @@ -71,7 +72,7 @@ use prost::Message;
use relative_path::RelativePath;
use scopeguard::{ScopeGuard, guard};
use serde::Deserialize;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::process;
use tokio::sync::{Notify, oneshot, watch};
use tokio::time::Instant;
Expand Down Expand Up @@ -150,34 +151,40 @@ pub fn download_to_directory<'a>(
cas_store
.populate_fast_store(digest.into())
.and_then(move |()| async move {
let file_entry = filesystem_store
.get_file_entry_for_digest(&digest)
.await
.err_tip(|| "During hard link")?;
// TODO: add a test for #2051: deadlock with large number of files
let src_path = file_entry.get_file_path_locked(|src| async move { Ok(PathBuf::from(src)) }).await?;
fs::hard_link(&src_path, &dest)
.await
.map_err(|e| {
if e.code == Code::NotFound {
make_err!(
Code::Internal,
"Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\
This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\
To fix this issue:\n\
1. Increase the 'max_bytes' value in your filesystem store configuration\n\
2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\
3. The setting is typically found in your nativelink.json config under:\n\
stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\
4. Restart NativeLink after making the change\n\n\
If this error persists after increasing max_bytes several times, please report at:\n\
https://github.com/TraceMachina/nativelink/issues\n\
Include your config file and both server and client logs to help us assist you."
)
} else {
make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}")
}
})?;
if is_zero_digest(digest) {
let mut file_slot = fs::create_file(&dest).await?;
file_slot.write_all(&[]).await?;
}
else {
let file_entry = filesystem_store
.get_file_entry_for_digest(&digest)
.await
.err_tip(|| "During hard link")?;
// TODO: add a test for #2051: deadlock with large number of files
let src_path = file_entry.get_file_path_locked(|src| async move { Ok(PathBuf::from(src)) }).await?;
fs::hard_link(&src_path, &dest)
.await
.map_err(|e| {
if e.code == Code::NotFound {
make_err!(
Code::Internal,
"Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\
This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\
To fix this issue:\n\
1. Increase the 'max_bytes' value in your filesystem store configuration\n\
2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\
3. The setting is typically found in your nativelink.json config under:\n\
stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\
4. Restart NativeLink after making the change\n\n\
If this error persists after increasing max_bytes several times, please report at:\n\
https://github.com/TraceMachina/nativelink/issues\n\
Include your config file and both server and client logs to help us assist you."
)
} else {
make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}")
}
})?;
}
#[cfg(target_family = "unix")]
if let Some(unix_mode) = unix_mode {
fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
Expand Down
Loading