diff --git a/crates/worker/src/executor/parameter_server.rs b/crates/worker/src/executor/parameter_server.rs index d2ff1ea0..b0d5cfc9 100644 --- a/crates/worker/src/executor/parameter_server.rs +++ b/crates/worker/src/executor/parameter_server.rs @@ -161,23 +161,35 @@ impl JobExecutor for ParameterServerExecutor { } let file_path = peer_dir.join(format!("{}.pt", Uuid::new_v4())); let mut reader = item.reader; - match fs::File::create(&file_path).await { - Ok(mut f) => { - match io::copy(&mut reader, &mut f).await { - Ok(n) => { - let _ = f.sync_all().await; - let _ = fs::set_permissions(&file_path, Permissions::from_mode(0o600)).await; - tracing::debug!(peer_id = %peer, size = n, file = %file_path.display(), "Received update"); } + let mut file = match fs::File::create(&file_path).await { + Ok(f) => f, + Err(err) => { + tracing::error!(error = %err, file = %file_path.display(), "Failed to create staging file"); + return ; + } + }; + let size= match io::copy(&mut reader, &mut file).await { + Ok(n) => n, + Err(err) => { + tracing::error!(error = %err, file = %file_path.display(), "Failed to copy resource"); + match fs::remove_file(&file_path).await{ + Ok(_) => {}, Err(err) => { - tracing::error!(error = %err, file = %file_path.display(), "Failed to write received update"); + tracing::error!(error = %err, file = %file_path.display(), "Failed to remove corrupted file"); } } + return ; } - Err(err) => { - tracing::error!(error = %err, file = %file_path.display(), "Failed to create staging file"); - } + }; + if let Err(err) = file.sync_all().await { + tracing::error!(error = %err, file = %file_path.display(), "Failed to sync file"); + return ; } - + if let Err(err) = fs::set_permissions(&file_path, Permissions::from_mode(0o600)).await { + tracing::error!(error = %err, file = %file_path.display(), "Failed to set permissions"); + return ; + } + tracing::debug!(peer_id = %peer, size = size, file = %file_path.display(), "Received update"); { let mut store = updates_store.lock().await; let pid = peer.parse().unwrap_or_else(|_| PeerId::random());